[nio-cvs] r40 - in branches/home/psmith/restructure: . src/compat src/io src/protocol/yarpc
psmith at common-lisp.net
psmith at common-lisp.net
Wed Jan 17 05:06:15 UTC 2007
Author: psmith
Date: Wed Jan 17 00:06:13 2007
New Revision: 40
Added:
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
- copied, changed from r37, branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
Modified:
branches/home/psmith/restructure/run-yarpc-client.lisp
branches/home/psmith/restructure/run-yarpc.lisp
branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
branches/home/psmith/restructure/src/io/async-fd.lisp
branches/home/psmith/restructure/src/io/async-socket.lisp
branches/home/psmith/restructure/src/io/nio-server.lisp
branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
Log:
yarpc work, saving...
Modified: branches/home/psmith/restructure/run-yarpc-client.lisp
==============================================================================
--- branches/home/psmith/restructure/run-yarpc-client.lisp (original)
+++ branches/home/psmith/restructure/run-yarpc-client.lisp Wed Jan 17 00:06:13 2007
@@ -2,10 +2,9 @@
(require :asdf)
(require :nio-yarpc)
-(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1" :port 9897)) :name "nio-server")
+;;shouldn't be listenting on the client hence nil for accept SM to start-server
+(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity nil :host "127.0.0.1" :port 9897)) :name "nio-server")
(sleep 4)
-(let ((sm (nio:add-connection "127.0.0.1" 16323 'nio-yarpc:yarpc-state-machine)))
+(let ((sm (nio:add-connection "127.0.0.1" 16323 'nio-yarpc:yarpc-client-state-machine)))
(format t "toplevel adding conn ~A~%" sm)
(format t "Result of remote-execute ~A~%" (nio-yarpc:remote-execute sm "(nio-yarpc:test-rpc-list)")))
-
-
Modified: branches/home/psmith/restructure/run-yarpc.lisp
==============================================================================
--- branches/home/psmith/restructure/run-yarpc.lisp (original)
+++ branches/home/psmith/restructure/run-yarpc.lisp Wed Jan 17 00:06:13 2007
@@ -2,4 +2,11 @@
(require :asdf)
(require :nio-yarpc)
-(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1")
+(let ((jobq (nio-compat:concurrent-queue)))
+ (sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity #'(lambda()(nio-yarpc:yarpc-state-machine jobq)) :host "127.0.0.1")) :name "nio-server")
+ (format t "server toplevel waiting for job~%" )
+ (loop
+ ;;block waiting for jobs
+ (multiple-value-bind (job result-queue) (nio-compat:take jobq)
+ (format t "Server received job ~A~%" job)
+ (nio-compat:add result-queue (nio-yarpc:execute-call job)))))
Modified: branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
==============================================================================
--- branches/home/psmith/restructure/src/compat/concurrent-queue.lisp (original)
+++ branches/home/psmith/restructure/src/compat/concurrent-queue.lisp Wed Jan 17 00:06:13 2007
@@ -51,18 +51,18 @@
head)
nil))
-
-(defmethod take ((queue concurrent-queue))
+;Do an (optionally blocking) remove of the element at the head of this queue
+(defmethod take ((queue concurrent-queue) &key (blocking-call t))
(sb-thread:with-mutex ((buffer-lock queue))
;if its there, pop it
(let ((ret (pop-elt (buffer queue) "1sttry")))
- (if ret
+ (if (or ret (not blocking-call))
ret
(progn
(sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue))
(pop-elt (buffer queue) "2ndtry"))))))
-
+;Append the element to the tail of this queue
(defmethod add ((queue concurrent-queue) elt)
(sb-thread:with-mutex ((buffer-lock queue))
(setf (buffer queue) (append (buffer queue) (list elt)) )
Modified: branches/home/psmith/restructure/src/io/async-fd.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-fd.lisp (original)
+++ branches/home/psmith/restructure/src/io/async-fd.lisp Wed Jan 17 00:06:13 2007
@@ -84,10 +84,6 @@
;;Implement this in concrete SM for read
(defgeneric process-write (async-fd))
-
-
-;Loop over state machines calling process-outgoing-packets via state-machine::process-write
-
;;SM factory
(defun create-state-machine(sm-type read-fd write-fd socket)
(let ((sm (make-instance sm-type :read-fd read-fd :write-fd write-fd :socket socket)))
Modified: branches/home/psmith/restructure/src/io/async-socket.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-socket.lisp (original)
+++ branches/home/psmith/restructure/src/io/async-socket.lisp Wed Jan 17 00:06:13 2007
@@ -178,7 +178,7 @@
-(defun socket-accept (socket-fd connection-type)
+(defun socket-accept (socket-fd connection-factory)
"Accept connection from SOCKET-FD. Allocates and returns socket structure denoting the connection."
(flet ((parse-inet6-addr (addr)
@@ -202,7 +202,7 @@
;; accept connection
(let* ((res (%accept socket-fd addr len))
;; (async-socket-fd (make-instance 'async-socket-fd :read-fd res :write-fd res)))
- (async-socket-fd (create-state-machine connection-type res res (make-instance 'async-socket-fd))))
+ (async-socket-fd (create-state-machine connection-factory res res (make-instance 'async-socket-fd))))
(unless (< res 0)
(let ((len-value (mem-ref len :unsigned-int)))
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp (original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp Wed Jan 17 00:06:13 2007
@@ -55,7 +55,7 @@
-(defun start-server (connection-handler accept-filter connection-type
+(defun start-server (connection-handler accept-filter connection-factory
&key
(protocol :inet)
(port (+ (random 60000) 1024))
@@ -99,7 +99,7 @@
(cond
;; new connection
((= fd sock)
- (let ((async-fd (socket-accept fd connection-type)))
+ (let ((async-fd (socket-accept fd connection-factory)))
#+nio-debug (format t "start-server - New conn: ~A~%" async-fd)
(cond
((null async-fd)
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp Wed Jan 17 00:06:13 2007
@@ -28,6 +28,14 @@
(:export
+ ;;base
+ yarpc-state-machine-factory get-packet-factory
+
;; yarpc-state-machine
- yarpc-state-machine yarpc-state-machine-factory test-rpc test-rpc-list test-rpc-string get-packet-factory remote-execute
+ yarpc-state-machine
+ ;to be moved
+ test-rpc test-rpc-list test-rpc-string execute-call
+
+ ;;yarpc-client-state-machine
+ yarpc-client-state-machine remote-execute
))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd Wed Jan 17 00:06:13 2007
@@ -7,6 +7,7 @@
:components ((:file "nio-yarpc-package")
(:file "yarpc-packet-factory" :depends-on ("nio-yarpc-package"))
(:file "yarpc-state-machine" :depends-on ("yarpc-packet-factory"))
+ (:file "yarpc-client-state-machine" :depends-on ("yarpc-packet-factory"))
)
- :depends-on (:nio :nio-sm))
\ No newline at end of file
+ :depends-on (:nio :nio-sm :nio-compat))
\ No newline at end of file
Copied: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (from r37, branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp)
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Wed Jan 17 00:06:13 2007
@@ -28,108 +28,53 @@
(declaim (optimize (debug 3) (speed 3) (space 0)))
-;; YetAnotherRPC state machine
+;; YetAnotherRPC Client state machine
;;
-;; A server that processes remote procedure calls and returns results
+;; A client that accepts jobs to be run via a threadsafe queue and then submits them to the remote end for execution
;;
-;; Test with:
-;; > telnet 127.0.0.1 16323
-;; Trying 127.0.0.1...
-;; Connected to 127.0.0.1.
-;; Escape character is '^]'.
-;; (test-rpc "who" 2 's)
-;; response - who 2 'S
-;;
-(defclass yarpc-state-machine (state-machine)
- ((outgoing-packet :initarg :outgoing-packet
- :accessor outgoing-packet
- :initform nil)))
+(defclass yarpc-client-state-machine (state-machine)
+ ((job-queue :initform (nio-compat:concurrent-queue)
+ :accessor job-queue
+ :documentation "The queue used to hand off work from an external thread to the io thread")
+ (result-queue :initform (nio-compat:concurrent-queue)
+ :accessor result-queue
+ :documentation "The queue used to hand off work from an external thread to the io thread")))
-(defun yarpc-state-machine ()
- (make-instance 'yarpc-state-machine))
+(defun yarpc-client-state-machine ()
+ (make-instance 'yarpc-client-state-machine))
(defparameter yarpc-pf (yarpc-packet-factory))
-(defmethod get-packet-factory((sm yarpc-state-machine))
+(defmethod get-packet-factory((sm yarpc-client-state-machine))
yarpc-pf)
-;;TODO move somewhere suitable
-
-(defparameter *remote-fns* nil)
-
-(defun register-remote-fn(name)
- (push name *remote-fns*))
-
-(defmacro defremote (name args &rest body)
- `(progn
- (defun ,name (, at args) , at body)
- (register-remote-fn #',name)))
-
-(defremote test-rpc-list()
- (list 3 "as" 's (code-char #x2211)))
-
-(defremote test-rpc-string(a b c)
- (format nil "response - ~A ~A ~A ~A~%" a b c (code-char #x2211)))
-
-;;end move TODO
-
-
-;;;Utils
-
-(defun print-hashtable (table &optional (stream t))
- (maphash #'(lambda (k v) (format stream "~a -> ~a~%" k v)) table))
-;;;
-
-(defmethod print-object ((sm yarpc-state-machine) stream)
- (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil)))
+(defmethod print-object ((sm yarpc-client-state-machine) stream)
+ (format stream "#<YARPC-CLIENT-STATE-MACHINE ~A >" (call-next-method sm nil)))
(defconstant STATE-INITIALISED 0)
-(defconstant STATE-SEND-RESPONSE 1)
+(defconstant STATE-SENT-REQUEST 1)
(defparameter state STATE-INITIALISED)
-(define-condition authorization-error (error) ())
-
-
-(defmethod process-outgoing-packet((sm yarpc-state-machine))
- (format t "process-outgoing-packet called~%")
- (let ((packet (outgoing-packet sm)))
- (setf (outgoing-packet sm) nil)
- packet))
-
-;TODO queue and thread stuf
-(defmethod queue-outgoing-packet((sm yarpc-state-machine) packet)
- (setf (outgoing-packet sm) packet))
-
-;Process a call method packet, returns
-(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
- (assert (eql state STATE-INITIALISED))
- (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call)
- (handler-case
- (let ((result (execute-call (call-string call))))
- (when result
- (let ((response-packet (progn
- (setf state STATE-SEND-RESPONSE)
- (queue-outgoing-packet sm (method-response-packet result)))))
- t)))
- (reader-error (re) (format t "No such function ~A~%" (call-string call)))
- (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call)))))
-
-(defmethod process-incoming-packet ((sm yarpc-state-machine) (response method-response-packet))
- (assert (eql state STATE-INITIALISED))
- (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response))
+(defmethod process-outgoing-packet((sm yarpc-client-state-machine))
+ (format t "process-outgoing-packet called, polling the job-queue ~%")
+ (let ((packet (nio-compat:take (job-queue sm) :blocking-call nil)))
+ (when packet
+ (format t "process-outgoing-packet got job ~A ~%" packet)
+ (setf state STATE-SENT-REQUEST))
+ packet))
+
+(defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet))
+ (assert (eql state STATE-SENT-REQUEST))
+ (format t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response)
+ (nio-compat:add (result-queue sm) response)
+ (setf state STATE-INITIALISED))
-
-(defun execute-call (call-string)
- (let* ((rpc-call-list (read-from-string call-string ))
- (fn (member (symbol-function (first rpc-call-list)) *remote-fns* )))
- (format t "fn - ~A authorised? : ~A~%" (symbol-function (first rpc-call-list)) fn)
- (if fn
- (apply (first rpc-call-list) (rest rpc-call-list))
- (error 'authorization-error))))
-
-
-(defmethod remote-execute ((sm yarpc-state-machine) call-string)
- (queue-outgoing-packet sm (make-instance 'call-method-packet :call-string call-string)))
-
\ No newline at end of file
+;Called from an external thread i.e. *not* the nio thread
+;Blocks calling thread on the remote m/c's response
+(defmethod remote-execute ((sm yarpc-client-state-machine) call-string)
+; (queue-outgoing-packet
+ (assert (eql state STATE-INITIALISED))
+ (nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string))
+ (nio-compat:take (result-queue sm)))
\ No newline at end of file
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Wed Jan 17 00:06:13 2007
@@ -32,27 +32,57 @@
;;
;; A server that processes remote procedure calls and returns results
;;
-;; Test with:
-;; > telnet 127.0.0.1 16323
-;; Trying 127.0.0.1...
-;; Connected to 127.0.0.1.
-;; Escape character is '^]'.
-;; (test-rpc "who" 2 's)
-;; response - who 2 'S
-;;
(defclass yarpc-state-machine (state-machine)
- ((outgoing-packet :initarg :outgoing-packet
- :accessor outgoing-packet
- :initform nil)))
-
-(defun yarpc-state-machine ()
- (make-instance 'yarpc-state-machine))
+ ((job-queue :initarg :job-queue
+ :initform (error "Must supply a job queue to write work to.")
+ :accessor job-queue
+ :documentation "The queue used to hand off work from the NIO thread to an external thread for execution")
+ (result-queue :initform (nio-compat:concurrent-queue)
+ :accessor result-queue
+ :documentation "The queue used to return results from an external thread to the nio thread")))
+
+(defun yarpc-state-machine (read-fd write-fd socket job-queue)
+ (let ((sm (make-instance 'yarpc-state-machine :read-fd read-fd :write-fd write-fd :socket socket :job-queue job-queue)))
+ (nio-buffer:clear (foreign-read-buffer sm))
+ (nio-buffer:clear (foreign-write-buffer sm))
+ (format t "yarpc-state-machine - Created ~S~%" sm)
+ sm))
(defparameter yarpc-pf (yarpc-packet-factory))
(defmethod get-packet-factory((sm yarpc-state-machine))
yarpc-pf)
+(defmethod print-object ((sm yarpc-state-machine) stream)
+ (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil)))
+
+(defconstant STATE-INITIALISED 0)
+(defconstant STATE-SEND-RESPONSE 1)
+
+(defparameter state STATE-INITIALISED)
+
+
+(defmethod process-outgoing-packet((sm yarpc-state-machine))
+ (format t "yarpc-state-machine: process-outgoing-packet called, polling the results-queue ~%")
+ (let ((packet (nio-compat:take (result-queue sm) :blocking-call nil)))
+ (format t "yarpc-state-machine: process-outgoing-packet got result ~A ~%" packet)
+ packet))
+
+
+;Process a call method packet, returns
+(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
+ (assert (eql state STATE-INITIALISED))
+ (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call)
+ (nio-compat:add (job-queue sm) (cons (call-string call) (result-queue sm))))
+
+
+;Called from an external thread i.e. *not* the nio thread
+;Blocks waiting for a job (call-string,result-queue) to process and return the result into the result queue
+;(defmethod get-job ((sm yarpc-state-machine))
+; (values (nio-compat:take (job-queue sm)) (result-queue sm)))
+
+
+
;;TODO move somewhere suitable
(defparameter *remote-fns* nil)
@@ -71,56 +101,8 @@
(defremote test-rpc-string(a b c)
(format nil "response - ~A ~A ~A ~A~%" a b c (code-char #x2211)))
-;;end move TODO
-
-
-;;;Utils
-
-(defun print-hashtable (table &optional (stream t))
- (maphash #'(lambda (k v) (format stream "~a -> ~a~%" k v)) table))
-;;;
-
-
-(defmethod print-object ((sm yarpc-state-machine) stream)
- (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil)))
-
-(defconstant STATE-INITIALISED 0)
-(defconstant STATE-SEND-RESPONSE 1)
-
-(defparameter state STATE-INITIALISED)
-
(define-condition authorization-error (error) ())
-
-(defmethod process-outgoing-packet((sm yarpc-state-machine))
- (format t "process-outgoing-packet called~%")
- (let ((packet (outgoing-packet sm)))
- (setf (outgoing-packet sm) nil)
- packet))
-
-;TODO queue and thread stuf
-(defmethod queue-outgoing-packet((sm yarpc-state-machine) packet)
- (setf (outgoing-packet sm) packet))
-
-;Process a call method packet, returns
-(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
- (assert (eql state STATE-INITIALISED))
- (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call)
- (handler-case
- (let ((result (execute-call (call-string call))))
- (when result
- (let ((response-packet (progn
- (setf state STATE-SEND-RESPONSE)
- (queue-outgoing-packet sm (method-response-packet result)))))
- t)))
- (reader-error (re) (format t "No such function ~A~%" (call-string call)))
- (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call)))))
-
-(defmethod process-incoming-packet ((sm yarpc-state-machine) (response method-response-packet))
- (assert (eql state STATE-INITIALISED))
- (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response))
-
-
(defun execute-call (call-string)
(let* ((rpc-call-list (read-from-string call-string ))
(fn (member (symbol-function (first rpc-call-list)) *remote-fns* )))
@@ -129,7 +111,18 @@
(apply (first rpc-call-list) (rest rpc-call-list))
(error 'authorization-error))))
+;;end move TODO
+
+
+
+
-(defmethod remote-execute ((sm yarpc-state-machine) call-string)
- (queue-outgoing-packet sm (make-instance 'call-method-packet :call-string call-string)))
-
\ No newline at end of file
+; (handler-case
+; (let ((result (execute-call (call-string call))))
+; (when result
+; (let ((response-packet (progn
+; (setf state STATE-SEND-RESPONSE)
+; (queue-outgoing-packet sm (method-response-packet result)))))
+; t)))
+; (reader-error (re) (format t "No such function ~A~%" (call-string call)))
+; (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call))))
\ No newline at end of file
More information about the Nio-cvs
mailing list