;; ipc.lisp -- IPC with SYS V message queues.
;; Written by Helmut Eller 2004
;; This package implements some Erlang-style concurrency primitives
;; with Unix processes and SYS V message queues.  The following
;; primitives are available:
;;  spawn: forks a new process and calls a function in the new process.
;;  snd: sends a message to another process. The message must be
;;       PRINTable and READable.
;;  rcv: receives a message. A test predicate can be used to
;;       emulate `selective receive'.  This operation blocks if there's no
;;       message in the mailbox, but a timeout value can be specified.
;; Some simple usage examples are at the end of this file.
;; Caution: Erlangish link/unlink is not supported, so it is quite
;; difficult to handle errors in other processes. Currently a child
;; process prints a backtrace and exits instead of entering a
;; debugger. There's also no way to discover or kill other processes.
;; You'll have to use ps etc. for this.  The SYS V message queues are
;; not reliably removed.  Again you'll have to use ipcs and ipcrm to
;; do it manually.
;; This code is written for CMUCL.

(defpackage :ipc
  (:use :cl :ext :unix :alien :c-call)
  (:export :spawn :snd :rcv :self))

(in-package :ipc)

(defvar *ipc-debug* t)

(defun dbg (fstring &rest args)
  (when *ipc-debug*
    (apply #'format *debug-io* fstring args)
    (finish-output *debug-io*)))

(defmacro ccall ((name rettype &rest argtypes) &rest args)
  "Call the C function NAME with ARGS."
    (alien:extern-alien ,name (function ,rettype , at argtypes))
    , at args))

(defmacro let-errno ((var errno value) &body body)
  "Execute VALUE; bind the result to VAR, bind ERRNO, and execute BODY."
  `(let ((,var ,value))
    (let ((,errno 
	   ;; unix-errno is suddenly a function in recent cmucls.
	   ,(cond ((fboundp 'unix:unix-errno) 
		  (t `(progn 
      , at body)))

(def-alien-type sap system-area-pointer)
(def-alien-type uint unsigned)

(defun unix-error (errno)
  (error "~A" (get-unix-error-msg errno)))

(defconstant +ipc_nowait+ #x800)
(defconstant +ipc_creat+ #x200)
(defconstant +ipc_excl+ #x400)
(defconstant +ipc_set+ #x1)
(defconstant +ipc_rmid+ #x0)

(defun %msgget (key flags)
  (ccall (msgget int uint uint) key flags))

(defun make-mq ()
  "Create a new SYS V message queue.
Return the (per process) queue handle and the public key."
  (let ((key (random #xffff)))
    (let-errno (mq errno (%msgget key (logior #o600 +ipc_creat+ +ipc_excl+)))
      (cond ((/= mq -1)            (values mq key))
	    ((= errno unix:eexist) (make-mq))  ; retry with new key
	    (t                     (unix-error errno))))))

(defun get-mq (key)
  "Get the (existing) message queue with id KEY."
  (let-errno (mq errno (%msgget key #o600))
    (cond ((= mq -1) (unix-error errno))
	  (t mq))))

(defun delete-mq (mq)
  (dbg "; Closing: ~A (pid ~A)~%" mq (unix-getpid))
  (let-errno (code errno (ccall (msgctl int int int sap) 
				mq +ipc_rmid+ (sys:int-sap 0)))
    (cond ((zerop code))
	  (t (unix-error errno)))))

(defun %msgsnd (mq string)
  (ccall (msgsnd int int sap uint uint)
	 mq (sys:vector-sap string) (length string) +ipc_nowait+))

(defun msgsnd (mq string)
  "Write STRING to message queue MQ."
  (let-errno (code errno (%msgsnd mq string))
    (cond ((/= code -1) nil)
	  (t (unix-error errno)))))

(defun %msgrcv (mq buffer)
  (ccall (msgrcv int int sap uint uint uint)
	 mq (sys:vector-sap buffer) (length buffer) 0 0))

(defun msgrcv (mq)
  "Return the first message in MQ."
  (let ((buffer (make-string 8192)))
    (let-errno (code errno (%msgrcv mq buffer))
      (cond ((/= code -1)         (subseq buffer 0 code))
	    ((= errno unix:eintr) (msgrcv mq))
	    (t                    (unix-error errno))))))

(defun decode (string)
  (with-standard-io-syntax (read-from-string string)))

(defun encode (message)
  (with-standard-io-syntax (prin1-to-string message)))

;;; Process structure (pendant to Erlangs PID)

(defstruct (process (:conc-name process.)
		    (:predicate process?))
  ;; Unix PID
  (pid (required-argument) :type fixnum :read-only t) 
  ;; The key of the SYSV message queue
  (qid (required-argument) :type fixnum :read-only t))

(defvar *queues* (make-hash-table)
  "Maps pids to SYSV IPC message queues.")

(defun process-queue (process)
  (or (gethash (process.pid process) *queues*)
      (setf (process-queue process)
	    (get-mq (process.qid process)))))
(defun (setf process-queue) (queue process)
  (setf (gethash (process.pid process) *queues*) queue))

(defvar *self* nil)
(defvar *parent* nil)
(defvar *linked-processes* '())
(defvar *trap-exit* nil)

(defun init-self ()
  "Allocate a new message queue and initialize *SELF*."
  (multiple-value-bind (mq qid) (make-mq)
    (let ((p (make-process :pid (unix-getpid) :qid qid)))
      (setf (process-queue p) mq)
      (setf *self* p))))

(defun self () (or *self* (init-self)))
(defun parent () *parent*)

(defun self? (process)
  "Is PROCESS this unix process?"
  (equalp process (self)))

(defun alive? (process)
  (unix-kill (process.pid process) :check))

(defvar *mailbox* (list))

(defun snd (message process)
  (cond ((self? process)
	 (setf *mailbox* (nconc *mailbox* (list message))))
	 (msgsnd (process-queue process) (encode message)))))

;;; SYS V message queues have no support for receive with timeouts.
;;; We use the itimer to implement timeouts.

(defun assert-timer-unused ()
  "Raise an error if the reatime is already in use."
  (multiple-value-bind (ok isec iusec vsec vusec) (unix-getitimer :real)
    (declare (ignore ok isec iusec))
    (assert (and (zerop vsec) (zerop vusec)) () "itimer already used")))

(defun decode-timeout (timeout)
  "Return the timevalue TIMEOUT as to two values integers SECS USECS.
TIMEOUT the number of seconds."
  (multiple-value-bind (secs rem) (truncate timeout)
    (let ((usecs (round (* rem 1000000))))
      (assert (or (plusp secs) (and (zerop secs) (plusp usecs))))
      (values secs usecs))))

(defun with-timeout (timeout fn timeout-fn)
  "Call FN. If executing FN completes withing TIMEOUT seconds return
the result.  Otherwise abort, unwind the stack and call TIMEOUT-FN
instead.  TIMEOUT is in seconds (can be a float).  Return either the
result of executing FN or the result of calling TIMEOUT-FN."
  (if (minusp timeout)
      (funcall timeout-fn)
      (%with-timeout timeout fn timeout-fn)))

(defun %with-timeout (timeout fn timeout-fn)
  (block abort
    (return-from %with-timeout
      (labels ((handler (signal code scp)
		 (declare (ignore signal code scp))
		 (return-from abort nil)))
	(multiple-value-bind (secs usecs) (decode-timeout timeout)
	  (sys:with-enabled-interrupts ((unix:sigalrm #'handler))
	    (unix-setitimer :real 0 0 secs usecs)
	    (unwind-protect (funcall fn)
	      (unix-setitimer :real 0 0 0 0)))))))
  (funcall timeout-fn))

(defun endtime (timeout)
  (+ (get-internal-real-time)
     (/ timeout (coerce internal-time-units-per-second 'double-float))))

(defun seconds-until (endtime)
  (* (- endtime (get-internal-real-time)) internal-time-units-per-second))

(defun rcv-from-mq (mq test endtime)
  "Return the first message from MQ satisfying the predicate TEST.

The second return value is non-nil when timeout ENDTIME expires."
  (let ((msg (if endtime
		 (with-timeout (seconds-until endtime)
		   (lambda () (decode (msgrcv mq)))
		   (lambda () (return-from rcv-from-mq (values nil t))))
		 (decode (msgrcv mq)))))
    (cond ((funcall test msg)
	     (values msg nil))
	     (setf *mailbox* (nconc *mailbox* (list msg)))
	     (rcv-from-mq mq test endtime)))))

(defun rcv (&key (test (constantly t)) timeout)
  "Receive the next message which satisfies TEST.

Return the message and a flag indicating whether the timeout expired."
  (let ((tail (member-if test *mailbox*)))
    (cond (tail
	   (setf *mailbox* (nconc (ldiff *mailbox* tail) (cdr tail)))
	   (values (car tail) nil))
	   (rcv-from-mq (process-queue (self))
			test (if timeout (endtime timeout)))))))

(in-package :vm)

(eval-when (:compile-toplevel :load-toplevel :execute)

  (defknown %reset-csp (function) nil)

  ;; Reset the control stack and call FN with the empty stack.
  ;; Unwind-protect blocks are reset and NOT executed.
  (define-vop (reset-csp)
      (:policy :fast-safe)
    (:args (fn :scs (descriptor-reg control-stack)))
    (:save-p t)
    (:translate %reset-csp)
     (move eax-tn fn)
     (load-foreign-data-symbol esp-tn "control_stack_end")
     (inst mov esp-tn (make-ea :dword :base esp-tn))
     (store-symbol-value 0 lisp::*current-catch-block*)
     (store-symbol-value 0 lisp::*current-unwind-protect-block*)
     (inst push 0)     
     (inst mov ebp-tn esp-tn)
     (inst mov esi-tn esp-tn)
     (inst jmp (make-fixup 'tail-call-variable :assembly-routine)))))

(in-package :ipc)

(defun call-with-empty-stacks (fn)
  (setf x86::*alien-stack* (kernel:make-lisp-obj mp::*alien-stack-top*))
  (setf kernel:*eval-stack-top* 0) 
  (setf (kernel:binding-stack-pointer-sap) 
	(alien:extern-alien "binding_stack" system-area-pointer))
  (vm::%reset-csp fn))

(defun child-debugger-hook (condition hook)
  "Print a backtrace and exit."
  (declare (ignore hook))
    (format *debug-io* "~2&~A~%   [Condition of type ~S]~2&"
	    (debug::safe-condition-message condition)
	    (type-of condition)))
  (ignore-errors (debug:backtrace))
  (throw '%exit-child nil))

(defun start-child (stream mq fn args)
  (let ((*standard-output* stream)
	(*debug-io* stream)
	(*error-output* stream)
	(*query-io* stream)
	(*standard-input* stream))
    (catch '%exit-child
	   (let ((*debugger-hook* (lambda (c h) (child-debugger-hook c h))))
	     (apply fn args))
	  (delete-mq mq)
	  (dbg "; Child exited: ~S~%" (self))

(defun initialize-child (parent mq qid)
  (setf *parent* parent)
  (setf *self* (make-process :pid (unix-getpid) :qid qid))
  (setf (process-queue *self*) mq)
  (setf *mailbox* (list)))

(defun %spawn (fn args)
  (multiple-value-bind (mq qid) (make-mq)
    (let ((parent (self)))
      (mapc #'force-output (list *standard-output* *debug-io* *query-io*))
      ;; CMUCL's SIGCHLD handler
      (sys:enable-interrupt unix:sigchld #'ext::sigchld-handler)
      (multiple-value-bind (pid errno) (unix-fork)
	(cond ((and pid (zerop pid))
	       (initialize-child parent mq qid)
	       (let ((io *debug-io*))
		  (lambda () (start-child io mq fn args)))))
	       (dbg "; Forked: ~D ~D (parent ~D)~%" pid qid (unix-getpid))
	       (let ((child (make-process :pid pid :qid qid)))
		 (setf (process-queue child) mq)
	      (t (error "fork: ~A" (get-unix-error-msg errno))))))))

(defun spawn (fn &rest args)
  "Create a new process and apply FN to ARGS in the new process."
  (%spawn fn args))


;; The counter process maintains the current value. Operations on the
;; counter are invoked via messges. The messages are as follows:
;;   :up   incerments the counter
;;   :quit termintates the counter process
;;   <a process>  sends the current value to the process.
(defun counter (value)
  (dbg "value: ~S~%" value)
  (let ((msg (rcv)))
    (dbg "msg: ~S~%" msg)
    (cond ((eq msg :up)
	   (counter (1+ value)))
	  ((eq msg :down)
	   (counter (1- value)))
	  ((eq msg :quit)
	   ;; return 
	  ((process? msg)
	   (snd value msg)
	   (counter value)))))

(defun test-counter ()
  (let ((p (spawn (lambda () (counter 0)))))
    (snd :up p)
    (snd (self) p)
    (format t "~&value: ~A~&" (rcv))
    (snd :up p)
    (snd (self) p)
    (format t "~&value: ~A~&" (rcv))
    (snd :quit p)))
;; This expamle counts the nodes in a cons tree by spawing a new
;; process for each subtree.
(defun count-nodes-aux (tree parent)
  "Count the number of nodes in TREE an send it to PARENT."
  (cond ((atom tree)
	 (snd 1 parent))
	 (spawn #'count-nodes-aux (car tree) (self))
	 (spawn #'count-nodes-aux (cdr tree) (self))
	 (snd (+ 1 (rcv) (rcv)) parent))))

(defun count-nodes (tree)
  (count-nodes-aux tree (self))

(defun test-count-nodes ()
  (time (count-nodes '(a ((b c)) d e ))))


;; Local Variables:
;; eval: (put 'let-errno 'common-lisp-indent-function 1)
;; End:

