[Small-cl-src] ipc.lisp
Helmut Eller
e9626484 at stud3.tuwien.ac.at
Sat Aug 28 10:30:01 UTC 2004
;; ipc.lisp -- IPC with SYS V message queues.
;;
;; Written by Helmut Eller 2004
;;
;; This package implements some Erlang-style concurrency primitives
;; with Unix processes and SYS V message queues. The following
;; primitives are available:
;;
;; spawn: forks a new process and calls a function in the new process.
;;
;; snd: sends a message to another process. The message must be
;; PRINTable and READable.
;;
;; rcv: receives a message. A test predicate can be used to
;; emulate `selective receive'. This operation blocks if there's no
;; message in the mailbox, but a timeout value can be specified.
;;
;; Some simple usage examples are at the end of this file.
;;
;; Caution: Erlangish link/unlink is not supported, so it is quite
;; difficult to handle errors in other processes. Currently a child
;; process prints a backtrace and exits instead of entering a
;; debugger. There's also no way to discover or kill other processes.
;; You'll have to use ps etc. for this. The SYS V message queues are
;; not reliably removed. Again you'll have to use ipcs and ipcrm to
;; do it manually.
;;
;; This code is written for CMUCL.
(defpackage :ipc
(:use :cl :ext :unix :alien :c-call)
(:export :spawn :snd :rcv :self))
(in-package :ipc)
(defvar *ipc-debug* t)
(defun dbg (fstring &rest args)
(when *ipc-debug*
(apply #'format *debug-io* fstring args)
(finish-output *debug-io*)))
(defmacro ccall ((name rettype &rest argtypes) &rest args)
"Call the C function NAME with ARGS."
`(alien:alien-funcall
(alien:extern-alien ,name (function ,rettype , at argtypes))
, at args))
(defmacro let-errno ((var errno value) &body body)
"Execute VALUE; bind the result to VAR, bind ERRNO, and execute BODY."
`(let ((,var ,value))
(let ((,errno
;; unix-errno is suddenly a function in recent cmucls.
,(cond ((fboundp 'unix:unix-errno)
'(unix:unix-errno))
(t `(progn
(unix::unix-get-errno)
unix:unix-errno)))))
, at body)))
(def-alien-type sap system-area-pointer)
(def-alien-type uint unsigned)
(defun unix-error (errno)
(error "~A" (get-unix-error-msg errno)))
(defconstant +ipc_nowait+ #x800)
(defconstant +ipc_creat+ #x200)
(defconstant +ipc_excl+ #x400)
(defconstant +ipc_set+ #x1)
(defconstant +ipc_rmid+ #x0)
(defun %msgget (key flags)
(ccall (msgget int uint uint) key flags))
(defun make-mq ()
"Create a new SYS V message queue.
Return the (per process) queue handle and the public key."
(let ((key (random #xffff)))
(let-errno (mq errno (%msgget key (logior #o600 +ipc_creat+ +ipc_excl+)))
(cond ((/= mq -1) (values mq key))
((= errno unix:eexist) (make-mq)) ; retry with new key
(t (unix-error errno))))))
(defun get-mq (key)
"Get the (existing) message queue with id KEY."
(let-errno (mq errno (%msgget key #o600))
(cond ((= mq -1) (unix-error errno))
(t mq))))
(defun delete-mq (mq)
(dbg "; Closing: ~A (pid ~A)~%" mq (unix-getpid))
(let-errno (code errno (ccall (msgctl int int int sap)
mq +ipc_rmid+ (sys:int-sap 0)))
(cond ((zerop code))
(t (unix-error errno)))))
(defun %msgsnd (mq string)
(ccall (msgsnd int int sap uint uint)
mq (sys:vector-sap string) (length string) +ipc_nowait+))
(defun msgsnd (mq string)
"Write STRING to message queue MQ."
(let-errno (code errno (%msgsnd mq string))
(cond ((/= code -1) nil)
(t (unix-error errno)))))
(defun %msgrcv (mq buffer)
(ccall (msgrcv int int sap uint uint uint)
mq (sys:vector-sap buffer) (length buffer) 0 0))
(defun msgrcv (mq)
"Return the first message in MQ."
(let ((buffer (make-string 8192)))
(let-errno (code errno (%msgrcv mq buffer))
(cond ((/= code -1) (subseq buffer 0 code))
((= errno unix:eintr) (msgrcv mq))
(t (unix-error errno))))))
(defun decode (string)
(with-standard-io-syntax (read-from-string string)))
(defun encode (message)
(with-standard-io-syntax (prin1-to-string message)))
;;; Process structure (pendant to Erlangs PID)
(defstruct (process (:conc-name process.)
(:predicate process?))
;; Unix PID
(pid (required-argument) :type fixnum :read-only t)
;; The key of the SYSV message queue
(qid (required-argument) :type fixnum :read-only t))
(defvar *queues* (make-hash-table)
"Maps pids to SYSV IPC message queues.")
(defun process-queue (process)
(or (gethash (process.pid process) *queues*)
(setf (process-queue process)
(get-mq (process.qid process)))))
(defun (setf process-queue) (queue process)
(setf (gethash (process.pid process) *queues*) queue))
(defvar *self* nil)
(defvar *parent* nil)
(defvar *linked-processes* '())
(defvar *trap-exit* nil)
(defun init-self ()
"Allocate a new message queue and initialize *SELF*."
(multiple-value-bind (mq qid) (make-mq)
(let ((p (make-process :pid (unix-getpid) :qid qid)))
(setf (process-queue p) mq)
(setf *self* p))))
(defun self () (or *self* (init-self)))
(defun parent () *parent*)
(defun self? (process)
"Is PROCESS this unix process?"
(equalp process (self)))
(defun alive? (process)
(unix-kill (process.pid process) :check))
(defvar *mailbox* (list))
(defun snd (message process)
"Send MESSAGE to PROCESS."
(cond ((self? process)
(setf *mailbox* (nconc *mailbox* (list message))))
(t
(msgsnd (process-queue process) (encode message)))))
;;; SYS V message queues have no support for receive with timeouts.
;;; We use the itimer to implement timeouts.
(defun assert-timer-unused ()
"Raise an error if the reatime is already in use."
(multiple-value-bind (ok isec iusec vsec vusec) (unix-getitimer :real)
(declare (ignore ok isec iusec))
(assert (and (zerop vsec) (zerop vusec)) () "itimer already used")))
(defun decode-timeout (timeout)
"Return the timevalue TIMEOUT as to two values integers SECS USECS.
TIMEOUT the number of seconds."
(multiple-value-bind (secs rem) (truncate timeout)
(let ((usecs (round (* rem 1000000))))
(assert (or (plusp secs) (and (zerop secs) (plusp usecs))))
(values secs usecs))))
(defun with-timeout (timeout fn timeout-fn)
"Call FN. If executing FN completes withing TIMEOUT seconds return
the result. Otherwise abort, unwind the stack and call TIMEOUT-FN
instead. TIMEOUT is in seconds (can be a float). Return either the
result of executing FN or the result of calling TIMEOUT-FN."
(if (minusp timeout)
(funcall timeout-fn)
(%with-timeout timeout fn timeout-fn)))
(defun %with-timeout (timeout fn timeout-fn)
(block abort
(return-from %with-timeout
(labels ((handler (signal code scp)
(declare (ignore signal code scp))
(return-from abort nil)))
(multiple-value-bind (secs usecs) (decode-timeout timeout)
(assert-timer-unused)
(sys:with-enabled-interrupts ((unix:sigalrm #'handler))
(unix-setitimer :real 0 0 secs usecs)
(unwind-protect (funcall fn)
(unix-setitimer :real 0 0 0 0)))))))
(funcall timeout-fn))
(defun endtime (timeout)
(+ (get-internal-real-time)
(/ timeout (coerce internal-time-units-per-second 'double-float))))
(defun seconds-until (endtime)
(* (- endtime (get-internal-real-time)) internal-time-units-per-second))
(defun rcv-from-mq (mq test endtime)
"Return the first message from MQ satisfying the predicate TEST.
The second return value is non-nil when timeout ENDTIME expires."
(let ((msg (if endtime
(with-timeout (seconds-until endtime)
(lambda () (decode (msgrcv mq)))
(lambda () (return-from rcv-from-mq (values nil t))))
(decode (msgrcv mq)))))
(cond ((funcall test msg)
(values msg nil))
(t
(setf *mailbox* (nconc *mailbox* (list msg)))
(rcv-from-mq mq test endtime)))))
(defun rcv (&key (test (constantly t)) timeout)
"Receive the next message which satisfies TEST.
Return the message and a flag indicating whether the timeout expired."
(let ((tail (member-if test *mailbox*)))
(cond (tail
(setf *mailbox* (nconc (ldiff *mailbox* tail) (cdr tail)))
(values (car tail) nil))
(t
(rcv-from-mq (process-queue (self))
test (if timeout (endtime timeout)))))))
(in-package :vm)
(eval-when (:compile-toplevel :load-toplevel :execute)
(defknown %reset-csp (function) nil)
;; Reset the control stack and call FN with the empty stack.
;; Unwind-protect blocks are reset and NOT executed.
(define-vop (reset-csp)
(:policy :fast-safe)
(:args (fn :scs (descriptor-reg control-stack)))
(:save-p t)
(:translate %reset-csp)
(:generator
0
(move eax-tn fn)
(load-foreign-data-symbol esp-tn "control_stack_end")
(inst mov esp-tn (make-ea :dword :base esp-tn))
(store-symbol-value 0 lisp::*current-catch-block*)
(store-symbol-value 0 lisp::*current-unwind-protect-block*)
(inst push 0)
(inst mov ebp-tn esp-tn)
(inst mov esi-tn esp-tn)
(inst jmp (make-fixup 'tail-call-variable :assembly-routine)))))
(in-package :ipc)
(defun call-with-empty-stacks (fn)
(mp::unbind-binding-stack)
(setf x86::*alien-stack* (kernel:make-lisp-obj mp::*alien-stack-top*))
(setf kernel:*eval-stack-top* 0)
(setf (kernel:binding-stack-pointer-sap)
(alien:extern-alien "binding_stack" system-area-pointer))
(vm::%reset-csp fn))
(defun child-debugger-hook (condition hook)
"Print a backtrace and exit."
(declare (ignore hook))
(ignore-errors
(format *debug-io* "~2&~A~% [Condition of type ~S]~2&"
(debug::safe-condition-message condition)
(type-of condition)))
(ignore-errors (debug:backtrace))
(throw '%exit-child nil))
(defun start-child (stream mq fn args)
(let ((*standard-output* stream)
(*debug-io* stream)
(*error-output* stream)
(*query-io* stream)
(*standard-input* stream))
(catch '%exit-child
(unwind-protect
(let ((*debugger-hook* (lambda (c h) (child-debugger-hook c h))))
(apply fn args))
(ignore-errors
(delete-mq mq)
(dbg "; Child exited: ~S~%" (self))
(lisp::finish-standard-output-streams))
(unix:unix-exit)))))
(defun initialize-child (parent mq qid)
(setf *parent* parent)
(setf *self* (make-process :pid (unix-getpid) :qid qid))
(setf (process-queue *self*) mq)
(setf *mailbox* (list)))
(defun %spawn (fn args)
(multiple-value-bind (mq qid) (make-mq)
(let ((parent (self)))
(mapc #'force-output (list *standard-output* *debug-io* *query-io*))
;; CMUCL's SIGCHLD handler
(sys:enable-interrupt unix:sigchld #'ext::sigchld-handler)
(multiple-value-bind (pid errno) (unix-fork)
(cond ((and pid (zerop pid))
(initialize-child parent mq qid)
(let ((io *debug-io*))
(call-with-empty-stacks
(lambda () (start-child io mq fn args)))))
(pid
(dbg "; Forked: ~D ~D (parent ~D)~%" pid qid (unix-getpid))
(let ((child (make-process :pid pid :qid qid)))
(setf (process-queue child) mq)
child))
(t (error "fork: ~A" (get-unix-error-msg errno))))))))
(defun spawn (fn &rest args)
"Create a new process and apply FN to ARGS in the new process."
(%spawn fn args))
#|
;; The counter process maintains the current value. Operations on the
;; counter are invoked via messges. The messages are as follows:
;; :up incerments the counter
;; :quit termintates the counter process
;; <a process> sends the current value to the process.
(defun counter (value)
(dbg "value: ~S~%" value)
(let ((msg (rcv)))
(dbg "msg: ~S~%" msg)
(cond ((eq msg :up)
(counter (1+ value)))
((eq msg :down)
(counter (1- value)))
((eq msg :quit)
;; return
)
((process? msg)
(snd value msg)
(counter value)))))
(defun test-counter ()
(let ((p (spawn (lambda () (counter 0)))))
(snd :up p)
(snd (self) p)
(format t "~&value: ~A~&" (rcv))
(snd :up p)
(snd (self) p)
(format t "~&value: ~A~&" (rcv))
(snd :quit p)))
;; This expamle counts the nodes in a cons tree by spawing a new
;; process for each subtree.
(defun count-nodes-aux (tree parent)
"Count the number of nodes in TREE an send it to PARENT."
(cond ((atom tree)
(snd 1 parent))
(t
(spawn #'count-nodes-aux (car tree) (self))
(spawn #'count-nodes-aux (cdr tree) (self))
(snd (+ 1 (rcv) (rcv)) parent))))
(defun count-nodes (tree)
(count-nodes-aux tree (self))
(rcv))
(defun test-count-nodes ()
(time (count-nodes '(a ((b c)) d e ))))
|#
;; Local Variables:
;; eval: (put 'let-errno 'common-lisp-indent-function 1)
;; End:
More information about the Small-cl-src
mailing list