[nio-cvs] r94 - in branches/home/psmith/restructure/src: compat io protocol/yarpc utils

psmith at common-lisp.net psmith at common-lisp.net
Thu Feb 22 22:50:57 UTC 2007


Author: psmith
Date: Thu Feb 22 17:50:56 2007
New Revision: 94

Added:
   branches/home/psmith/restructure/src/utils/concurrent-queue.lisp
      - copied, changed from r93, branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
Removed:
   branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
Modified:
   branches/home/psmith/restructure/src/compat/nio-compat-package.lisp
   branches/home/psmith/restructure/src/compat/nio-compat.asd
   branches/home/psmith/restructure/src/io/nio-server.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
   branches/home/psmith/restructure/src/utils/nio-utils-package.lisp
   branches/home/psmith/restructure/src/utils/nio-utils.asd
   branches/home/psmith/restructure/src/utils/utils.lisp
Log:
moved threadsafe queue and added more tests.

Modified: branches/home/psmith/restructure/src/compat/nio-compat-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/compat/nio-compat-package.lisp	(original)
+++ branches/home/psmith/restructure/src/compat/nio-compat-package.lisp	Thu Feb 22 17:50:56 2007
@@ -30,7 +30,6 @@
 
 	     ;; errno.lisp
 	     get-errno +ERRNO_EAGAIN+ perror
-	     
-	     ;;concurrent-queue
-	     concurrent-queue add take
+             ;;threading
+             with-mutex make-mutex
 	     ))

Modified: branches/home/psmith/restructure/src/compat/nio-compat.asd
==============================================================================
--- branches/home/psmith/restructure/src/compat/nio-compat.asd	(original)
+++ branches/home/psmith/restructure/src/compat/nio-compat.asd	Thu Feb 22 17:50:56 2007
@@ -6,7 +6,7 @@
 
     :components ((:file "nio-compat-package")
 		 (:file "errno" :depends-on ("nio-compat-package"))
-		 (:file "concurrent-queue" :depends-on ("nio-compat-package"))
+		 (:file "threading" :depends-on ("nio-compat-package"))
 		 )
 
     :depends-on (:cffi))

Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp	(original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp	Thu Feb 22 17:50:56 2007
@@ -34,7 +34,7 @@
   t)
 
 ;TODO thread safety
-(defparameter +connected-sockets-queue+ (nio-compat:concurrent-queue)
+(defparameter +connected-sockets-queue+ (nio-utils:concurrent-queue)
   "List of node objects that are to be connected to")
 
 ;loop over hashtable 
@@ -155,7 +155,7 @@
 
 ;add outgoing sockets to event queue
 #+nio-debug2     (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+)
-             (loop for node = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null node) do
+             (loop for node = (nio-utils:take +connected-sockets-queue+ :blocking-call nil) until (null node) do
 #+nio-debug	  (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node)
 		  (push node *nodes-list*))
 
@@ -192,4 +192,4 @@
   	(format t "Connect failed!!~A ~%" (get-errno)))))
 
 (defun add-connection(node)
-  (nio-compat:add +connected-sockets-queue+ node))
\ No newline at end of file
+  (nio-utils:add +connected-sockets-queue+ node))
\ No newline at end of file

Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd	Thu Feb 22 17:50:56 2007
@@ -10,4 +10,4 @@
 		 (:file "yarpc-client-state-machine" :depends-on ("yarpc-packet-factory"))
 		 )
 
-    :depends-on (:nio :nio-sm :nio-compat))
\ No newline at end of file
+    :depends-on (:nio :nio-sm :nio-utils))
\ No newline at end of file

Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp	Thu Feb 22 17:50:56 2007
@@ -33,7 +33,7 @@
 ;; A client that accepts jobs to be run via a threadsafe queue and then submits them to the remote end for execution
 ;;
 (defclass yarpc-client-state-machine (state-machine)
-  ((job-queue :initform (nio-compat:concurrent-queue)
+  ((job-queue :initform (nio-utils:concurrent-queue)
 	      :accessor job-queue
 	      :documentation "The queue used to hand off work from an external thread to the io thread")
    (request-map :initform (make-hash-table)
@@ -74,7 +74,7 @@
 
 (defmethod process-outgoing-packet((sm yarpc-client-state-machine))
 #+nio-debug  (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%")
-  (let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil)))
+  (let ((ttd (nio-utils:take (job-queue sm) :blocking-call nil)))
     (when ttd
 #+nio-debug      (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd)
       (destructuring-bind (job call-string) ttd
@@ -94,4 +94,4 @@
 ;Execute the call-string on the remote node and call callback with the result
 (defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback)
 #+nio-debug  (format-log t "yarpc-client-state-machine:remote-execute called :sm ~A :call-string ~A :callback ~A~%" sm call-string callback)
-  (nio-compat:add (job-queue sm) (list (remote-job callback) call-string)))
+  (nio-utils:add (job-queue sm) (list (remote-job callback) call-string)))

Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	Thu Feb 22 17:50:56 2007
@@ -34,11 +34,11 @@
 ;;
 (defclass yarpc-state-machine (state-machine)
   (
-   (result-queue :initform (nio-compat:concurrent-queue)
+   (result-queue :initform (nio-utils:concurrent-queue)
 		 :accessor result-queue
 		 :documentation "The queue used to return results from an external thread to the nio thread")))
 
-(defparameter job-queue (nio-compat:concurrent-queue)
+(defparameter job-queue (nio-utils:concurrent-queue)
   "The queue used to hand off work from the NIO thread to an external thread for execution")
 
 (defparameter yarpc-pf (yarpc-packet-factory))
@@ -60,16 +60,16 @@
 
 (defun run-job(&key (blocking t))
 #+nio-debug  (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%")
-  (let ((server-job (nio-compat:take nio-yarpc:job-queue :blocking-call blocking)))
+  (let ((server-job (nio-utils:take nio-yarpc:job-queue :blocking-call blocking)))
     (when server-job 
       (destructuring-bind (job request-id result-queue) server-job
 #+nio-debug	(format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job)
-	(nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
+	(nio-utils:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
 
 
 (defmethod process-outgoing-packet((sm yarpc-state-machine))
 #+nio-debug2  (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" )
-  (let ((server-job (nio-compat:take (result-queue sm) :blocking-call nil)))
+  (let ((server-job (nio-utils:take (result-queue sm) :blocking-call nil)))
     (when server-job
       (destructuring-bind (request-id result) server-job
 #+nio-debug	(format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result)
@@ -78,7 +78,7 @@
 ;Process a call method packet by placing it in the job-queue
 (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
 #+nio-debug  (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call)
-  (nio-compat:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
+  (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
   (when +process-jobs-inline+ (run-job :blocking nil)))
 
 

Copied: branches/home/psmith/restructure/src/utils/concurrent-queue.lisp (from r93, branches/home/psmith/restructure/src/compat/concurrent-queue.lisp)
==============================================================================
--- branches/home/psmith/restructure/src/compat/concurrent-queue.lisp	(original)
+++ branches/home/psmith/restructure/src/utils/concurrent-queue.lisp	Thu Feb 22 17:50:56 2007
@@ -25,16 +25,19 @@
 THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 |#
 
-(in-package :nio-compat)
+(in-package :nio-utils)
 
 (declaim (optimize (debug 3) (speed 3) (space 0)))
 
 ;Implements a threadsafe queue where readers wait for elements of a FIFO queue to appear using a waitqueue
 ;Modified from sbcl manual example
 
+
 (defclass concurrent-queue()
   ((buffer-queue :initform (sb-thread:make-waitqueue)
 		 :reader buffer-queue)
+;   (buffer-queue-mutex :initform (sb-thread:make-mutex :name "buffer queue mutex")
+;		       :reader buffer-queue-mutex)
    (buffer-lock :initform (sb-thread:make-mutex :name "buffer lock")
 		:reader buffer-lock)
    (buffer :initform nil
@@ -43,47 +46,80 @@
 (defun concurrent-queue()
   (make-instance 'concurrent-queue))
 
+
+
 (defmacro pop-elt(a-buffer loc)
   `(if ,a-buffer 
        (let ((head (car ,a-buffer)))
 	 (setf ,a-buffer (cdr ,a-buffer))
-#+nio-debug (format t "concurent-queue:take - (~A) read ~A at ~A~%" sb-thread:*current-thread* head ,loc)
+#+nio-debug (format-threadsafe t "concurent-queue:take - (~A,~A) read ~A at ~A~%" sb-thread:*current-thread* (length (buffer queue)) head ,loc)
 	 head)
        nil))
 
+
 ;Do an (optionally blocking) remove of the element at the head of this queue
 (defmethod take ((queue concurrent-queue) &key (blocking-call t))
+#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) attempting to obtain mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue))
   (sb-thread:with-mutex ((buffer-lock queue))
+#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) aquired mutex mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue))
     ;if its there, pop it
     (let ((ret (pop-elt (buffer queue) "1sttry")))
       (if (or ret (not blocking-call))
 	  ret
 	  (progn
+#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) about to wait on queue~%" sb-thread:*current-thread*)
 	    (sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue))
+#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) notified on queue~%" sb-thread:*current-thread*)
 	    (pop-elt (buffer queue) "2ndtry"))))))
 
 ;Append the element to the tail of this queue
 (defmethod add ((queue concurrent-queue) elt)
-#+nio-debug (format t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt)
+#+nio-debug (format-threadsafe t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt)
   (sb-thread:with-mutex ((buffer-lock queue))
     (setf (buffer queue) (append (buffer queue) (list elt)) )
-    (sb-thread:condition-notify (buffer-queue queue))))
-
+    (sb-thread:condition-broadcast (buffer-queue queue))))
 
 
 (defun test-writer(queue)
-  (loop for i from 0 to 999 do
-       (sleep 0.1)
+  (loop for i from 0 to 100 do
+;       (sleep (random 0.1))
+       (format-threadsafe t "Adding ~A~%" i)
        (add queue i)))
        
-(defun test-reader(queue)
+(defun test-reader(queue results)
+  (format-threadsafe t "Started reader ~A~%" sb-thread:*current-thread*)
   (loop
-       (format t "reader on ~A got elt ~A~%" 
-	       sb-thread:*current-thread* (take queue))))
+     (let ((elt (take queue)))
+       (push elt results)
+       (format-threadsafe t "reader on ~A got elt ~A~%" 
+	       sb-thread:*current-thread* 
+	       results))))
+
+(defparameter *results1* (list 999999))
+(defparameter *results2* (list 888888))
 
 (defun test-queue()
   (let ((queue (make-instance 'concurrent-queue)))
     (sb-thread:make-thread #'(lambda()(test-writer queue)))
-    (sleep 10)
+;    (sleep 10)
+    (let ((t1 (sb-thread:make-thread #'(lambda()(test-reader queue *results1*)))))
+	  ;(t2 (sb-thread:make-thread #'(lambda()(test-reader queue *results2*)))))
+      (sleep 5) ;;wait for it to probably complete
+      (format-threadsafe t "t1 got: ~A~%" *results1*)
+      (format-threadsafe t "t2 got: ~A~%" *results2*)
+      (sb-thread:destroy-thread t1)
+;      (sb-thread:destroy-thread t2)
+)
+    (sb-thread:with-mutex ((buffer-lock queue))
+      (assert (eql (length (buffer queue)) 0)))))
+
+(defun test-queue2()
+  (let ((queue (make-instance 'concurrent-queue)))
+    (sb-thread:make-thread #'(lambda()(test-reader queue)))
+    (sb-thread:make-thread #'(lambda()(test-writer queue)))
     (sb-thread:make-thread #'(lambda()(test-reader queue)))
-    (sb-thread:make-thread #'(lambda()(test-reader queue)))))
+    (sb-thread:make-thread #'(lambda()(test-writer queue)))
+    (sleep 10)
+    (format-threadsafe t "running asserts")
+    (sb-thread:with-mutex ((buffer-lock queue))
+      (assert (eql (length (buffer queue)) 0)))))

Modified: branches/home/psmith/restructure/src/utils/nio-utils-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/utils/nio-utils-package.lisp	(original)
+++ branches/home/psmith/restructure/src/utils/nio-utils-package.lisp	Thu Feb 22 17:50:56 2007
@@ -30,4 +30,9 @@
 
 	     ;;utils
 	     format-log get-universal-high-res get-readable-time
+	     
+	     ;;concurrent-queue
+	     concurrent-queue add take
+
+	     
 	     ))

Modified: branches/home/psmith/restructure/src/utils/nio-utils.asd
==============================================================================
--- branches/home/psmith/restructure/src/utils/nio-utils.asd	(original)
+++ branches/home/psmith/restructure/src/utils/nio-utils.asd	Thu Feb 22 17:50:56 2007
@@ -6,7 +6,8 @@
 
     :components ((:file "nio-utils-package")
 		 (:file "utils" :depends-on ("nio-utils-package"))
+		 (:file "concurrent-queue" :depends-on ("utils"))
 		 )
 
-    :depends-on ())
+    :depends-on (:nio-compat))
 

Modified: branches/home/psmith/restructure/src/utils/utils.lisp
==============================================================================
--- branches/home/psmith/restructure/src/utils/utils.lisp	(original)
+++ branches/home/psmith/restructure/src/utils/utils.lisp	Thu Feb 22 17:50:56 2007
@@ -59,6 +59,9 @@
   
   )
 
+(defparameter *format-mutex* (nio-compat:make-mutex "format lock"))
+
 ;Format the message to destination but prepend a high res time to the message, useful for logging
 (defmacro format-log (destination control-string &rest format-arguments)
-  `(format ,destination (concatenate 'string "~A - " ,control-string) (get-readable-high-res-time) , at format-arguments))
+  `(nio-compat:with-mutex (*format-mutex*)
+     (format ,destination (concatenate 'string "~A - " ,control-string) (get-readable-high-res-time) , at format-arguments)))



More information about the Nio-cvs mailing list