[nio-cvs] r81 - in branches/home/psmith/restructure/src: io nio-logger protocol/yarpc
psmith at common-lisp.net
psmith at common-lisp.net
Sat Feb 10 20:36:44 UTC 2007
Author: psmith
Date: Sat Feb 10 15:36:43 2007
New Revision: 81
Modified:
branches/home/psmith/restructure/src/io/nio-package.lisp
branches/home/psmith/restructure/src/io/nio-server.lisp
branches/home/psmith/restructure/src/io/nio.asd
branches/home/psmith/restructure/src/io/nodes.lisp
branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
Log:
First stab at rpc multiplexing
Modified: branches/home/psmith/restructure/src/io/nio-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-package.lisp (original)
+++ branches/home/psmith/restructure/src/io/nio-package.lisp Sat Feb 10 15:36:43 2007
@@ -42,4 +42,7 @@
;;ip-authorisation
check-ip load-ips
+
+ ;;nodes
+ node with-connected-nodes active-conn
))
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp (original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp Sat Feb 10 15:36:43 2007
@@ -35,7 +35,7 @@
;TODO thread safety
(defparameter +connected-sockets-queue+ (nio-compat:concurrent-queue)
- "List of sockets that have been connected and are awaiting addition to the event-notification system")
+ "List of node objects that are to be connected to")
;loop over hashtable
(defun process-async-fds (client-hash)
@@ -150,12 +150,18 @@
(when (write-event-p event) (setf (write-ready async-fd) t)))))))))
;add outgoing sockets to event queue
-#+nio-debug2 (format-log t "nio-server:start-server - Processing client add ~A~%" +connected-sockets-queue+)
-
- (loop for new-fd = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null new-fd) do
+#+nio-debug2 (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+)
+ (loop for node = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null node) do
+#+nio-debug (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node)
+ (push node *nodes-list*))
+ (with-connect-ready-nodes (a-node)
+#+nio-debug (format-log t "nio-server:start-server - attempting connection to node ~A~%" a-node)
+ (let ((new-fd (connect (host a-node) (port a-node) connection-type)))
+ (update-last-connect-attempt a-node)
+ (when new-fd
#+nio-debug (format-log t "nio-server:start-server - adding connection to nio thread ~A~%" new-fd)
- (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd)
- (add-async-fd event-queue new-fd :read-write))
+ (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd)
+ (add-async-fd event-queue new-fd :read-write))))
;loop over async-fd's processing where necessary
(process-async-fds client-hash)
@@ -164,10 +170,10 @@
(close-fd sock))))
-(defun add-connection (host port connection-type
- &key
- (protocol :inet))
- (format-log t "nio-server:add-connection - Called with: ~A:~A:~A ~%" protocol host port)
+(defun connect(host port connection-type
+ &key
+ (protocol :inet))
+ (format-log t "nio-server:connect - Called with: ~A:~A:~A ~%" protocol host port)
(let ((sock nil))
(setq sock (ecase protocol
(:inet (make-inet-socket))
@@ -175,8 +181,10 @@
(if (connect-inet-socket sock host port)
(let ((sm (create-state-machine connection-type sock sock sock)))
- (nio-compat:add +connected-sockets-queue+ sm)
- (format-log t "nio-server:add-connection - Socket enqueued: ~A~%" +connected-sockets-queue+)
- (return-from add-connection sm))
+; (nio-compat:add +connected-sockets-queue+ sm)
+; (format-log t "nio-server:connect - Socket enqueued: ~A~%" +connected-sockets-queue+)
+ (return-from connect sm))
(format t "Connect failed!!~A ~%" (get-errno)))))
-
\ No newline at end of file
+
+(defun add-connection(node)
+ (nio-compat:add +connected-sockets-queue+ node))
\ No newline at end of file
Modified: branches/home/psmith/restructure/src/io/nio.asd
==============================================================================
--- branches/home/psmith/restructure/src/io/nio.asd (original)
+++ branches/home/psmith/restructure/src/io/nio.asd Sat Feb 10 15:36:43 2007
@@ -9,9 +9,9 @@
(:file "packet" :depends-on ("nio-package"))
(:file "async-fd" :depends-on ("fd-helper"))
(:file "async-socket" :depends-on ("async-fd"))
- (:file "nio-server" :depends-on ("async-socket"))
+ (:file "nodes" :depends-on ("nio-package"))
+ (:file "nio-server" :depends-on ("async-socket" "nodes"))
(:file "ip-authorisation" :depends-on ("nio-package"))
- (:file "nodes" :depends-on ("nio-package"))
)
:depends-on (:cffi :event-notification :nio-buffer :nio-compat :nio-utils))
Modified: branches/home/psmith/restructure/src/io/nodes.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nodes.lisp (original)
+++ branches/home/psmith/restructure/src/io/nodes.lisp Sat Feb 10 15:36:43 2007
@@ -75,6 +75,21 @@
(get-universal-high-res)
(+ (last-connect-attempt node) (retry-delay node))))
+(defun allowed-to-connect(node)
+ (if (null (last-connect-attempt node))
+ t
+ (and (not (active-conn node)) (< (+ (last-connect-attempt node) (retry-delay node)) (get-universal-high-res)))))
+
(defun update-last-connect-attempt(node)
(setf (last-connect-attempt node) (get-universal-high-res)))
+;;iterates over the nodes list looking for nodes that are ready to be connected to
+;;i.e. the SM is null and the next-allowed-connect time has expired
+(defmacro with-connect-ready-nodes ((node) &rest body)
+ `(dolist (,node *nodes-list*)
+ (when (allowed-to-connect ,node) , at body)))
+
+
+(defmacro with-connected-nodes ((node) &rest body)
+ `(dolist (,node *nodes-list*)
+ (when (active-conn ,node) , at body)))
Modified: branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp
==============================================================================
--- branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp (original)
+++ branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp Sat Feb 10 15:36:43 2007
@@ -41,16 +41,20 @@
(sleep ,delay))))))
+(defun callback(result)
+ (nio-utils:format-log t "Result of remote-log ~A~%" result))
+
+
;;Tail the given log and write to remote logger
;;e.g. (tail-log "/var/log/httpd/access_log" "192.168.1.1")
(defun tail-log(filename ip-address)
(sleep 4)
- (let ((sm (nio:add-connection ip-address 16323 'nio-yarpc:yarpc-client-state-machine)))
- (nio-utils:format-log t "toplevel adding conn ~A to ~A~%" sm ip-address)
- (with-line-from-tailed-file (text filename 1)
- (let ((rpc (format nil "(nio-logger:remote-log \"~A\")" (cl-base64:string-to-base64-string text))))
- (nio-utils:format-log t "Toplevel Submitting job~A~%" rpc)
- (nio-utils:format-log t "Result of remote-log ~A~%" (nio-yarpc:remote-execute sm rpc))))))
+ (nio:add-connection (nio:node ip-address 16323))
+ (with-line-from-tailed-file (text filename 1)
+ (let ((rpc (format nil "(nio-logger:remote-log \"~A\")" (cl-base64:string-to-base64-string text))))
+ (nio-utils:format-log t "Toplevel Submitting job~A~%" rpc)
+ (nio:with-connected-nodes (node)
+ (nio-yarpc:remote-execute (nio:active-conn node) rpc #'callback)))))
;Runs a multithreaded system with an IO thread dealing with IO only and a 'job' thread taking and executing jobs
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Sat Feb 10 15:36:43 2007
@@ -36,9 +36,23 @@
((job-queue :initform (nio-compat:concurrent-queue)
:accessor job-queue
:documentation "The queue used to hand off work from an external thread to the io thread")
- (result-queue :initform (nio-compat:concurrent-queue)
- :accessor result-queue
- :documentation "The queue used to return results from the io thread to an external thread")))
+ (request-map :initform (make-hash-table)
+ :reader request-map
+ :documentation "A map from request-id (a unique id for this request) to remote-job")))
+
+(defclass remote-job()
+ ((callback :accessor callback
+ :documentation "A function accepting one argument to call with the result of the remote operation")
+ (start-time :initform (get-universal-high-res)
+ :reader start-time
+ :documentation "The (floating point) start time")
+ (timeout :initarg :timeout
+ :initform 1.5
+ :documentation "The time in seconds before a timeout should occur, abviously we dont guarantee that this will be honored, it depends on other processing but should be close.")))
+
+(defun remote-job(callback)
+ (make-instance 'remote-job :callback callback))
+
(defun yarpc-client-state-machine ()
(make-instance 'yarpc-client-state-machine))
@@ -55,25 +69,23 @@
(defconstant STATE-INITIALISED 0)
(defconstant STATE-SENT-REQUEST 1)
+(defparameter +request-id+ 0)
+
(defmethod process-outgoing-packet((sm yarpc-client-state-machine))
#+nio-debug2 (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%")
- (let ((packet (nio-compat:take (job-queue sm) :blocking-call nil)))
- (when packet
- (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" packet)
- (setf (state sm) STATE-SENT-REQUEST))
- packet))
+ (let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil)))
+ (when ttd
+ (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd)
+ (destructuring-bind (job call-string) ttd
+ (setf (gethash (1+ +request-id+) (request-map sm)) job)
+ (make-instance 'call-method-packet :call-string call-string :request-id +request-id+)))))
(defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet))
- (assert (eql (state sm) STATE-SENT-REQUEST))
(format-log t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response)
(let* ((*package* (find-package :nio-yarpc))
(result (read-from-string (response response))))
- (setf (state sm) STATE-INITIALISED)
(nio-compat:add (result-queue sm) result)))
-
-;Called from an external thread i.e. *not* the nio thread
-;Blocks calling thread on the remote m/c's response
-(defmethod remote-execute ((sm yarpc-client-state-machine) call-string)
- (assert (eql (state sm) STATE-INITIALISED))
- (nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string))
- (nio-compat:take (result-queue sm)))
\ No newline at end of file
+
+;Execute the call-string on the remote node and call callback with the result
+(defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback)
+ (nio-compat:add (job-queue sm) '((remote-job callback) call-string)))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp Sat Feb 10 15:36:43 2007
@@ -40,19 +40,21 @@
(defconstant +PACKET-ID-SIZE+ 1)
(defconstant +PACKET-LENGTH-SIZE+ 4)
+;(defconstant +PACKET-REQUEST-ID+ 4)
(defconstant +yarpc-packet-header-size+
(+ +PACKET-ID-SIZE+ +PACKET-LENGTH-SIZE+))
(defmethod get-packet ((pf yarpc-packet-factory) buf)
(flip buf)
- (if (>= (remaining buf) +yarpc-packet-header-size+) ;; First byte denotes packet ID ;;bytes 2,3,4,5 denote packet size
+ (if (>= (remaining buf) +yarpc-packet-header-size+) ;; First byte denotes packet ID ;;bytes 2,3,4,5 denote packet size ;; 6,7,8,9 request-id
(let ((packet-id (bytebuffer-read-8 buf))
(packet-length (bytebuffer-read-32 buf)))
(if (<= (- packet-length +yarpc-packet-header-size+) (remaining buf)) ;is the whole packet available in the buffer?
- (let ((ret-packet (ecase packet-id
- (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)))))
- (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+))))))))
+ (let* ((packet-request-id (bytebuffer-read-32 buf))
+ (ret-packet (ecase packet-id
+ (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id)))
+ (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id))))))
(compact buf)
#+nio-debug (format-log t "yarpc-packet-factory:get-packet - after compact ~%~A~%" buf)
#+nio-debug (format-log t "yarpc-packet-factory:get-packet - retuirning packet ~A~%" ret-packet)
@@ -64,7 +66,11 @@
-(defclass call-method-packet (packet)((call-string :initarg :call-string
+(defclass yarpc-packet(packet)
+ ((request-id :initarg :request-id
+ :reader request-id)))
+
+(defclass call-method-packet (yarpc-packet)((call-string :initarg :call-string
:accessor call-string)))
(defun call-method-packet (call-string)
(make-instance 'call-method-packet :call-string call-string))
@@ -79,6 +85,7 @@
(progn
(nio-buffer:bytebuffer-write-8 buf +CALL-METHOD-PACKET-ID+)
(nio-buffer:bytebuffer-write-32 buf 0) ; come back and write length later
+ (nio-buffer:bytebuffer-write-32 buf (request-id packet))
(nio-buffer:bytebuffer-write-string buf (call-string packet))
(nio-buffer:bytebuffer-insert-32 buf (buffer-position buf) 1)
#+nio-debug (format-log t "yarpc-packet-factory:write-bytes(call-method-packet) - written ~%~A ~%" buf)
@@ -92,7 +99,7 @@
(+ +yarpc-packet-header-size+
(length (sb-ext:string-to-octets (write-to-string (call-string packet))))))
-(defclass method-response-packet (packet)
+(defclass method-response-packet (yarpc-packet)
((response :initarg :response
:accessor response)))
@@ -109,6 +116,7 @@
(progn
(nio-buffer:bytebuffer-write-8 buf +METHOD-RESPONSE-PACKET-ID+)
(nio-buffer:bytebuffer-write-32 buf 0) ; come back and write length later
+ (nio-buffer:bytebuffer-write-32 buf (request-id packet))
(nio-buffer:bytebuffer-write-string buf (write-to-string (response packet)))
(nio-buffer:bytebuffer-insert-32 buf (buffer-position buf) 1)
#+nio-debug (format-log t "yarpc-packet-factory:write-bytes - written ~A~%" buf)
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Sat Feb 10 15:36:43 2007
@@ -58,26 +58,25 @@
-(defun run-job(&key (wait-on-job-pdw t))
+(defun run-job(&key (blocking t))
(format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%")
- (destructuring-bind (job result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call wait-on-job-pdw)
+ (destructuring-bind (job request-id result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call blocking)
(format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job)
- (nio-compat:add result-queue (nio-yarpc:execute-call job))))
+ (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))
(defmethod process-outgoing-packet((sm yarpc-state-machine))
(format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" )
- (let ((result (nio-compat:take (result-queue sm) :blocking-call nil)))
- (format-log t "yarpc-state-machine:process-outgoing-packet - got result ~A ~%" result)
+ (destructuring-bind (request-id result) (nio-compat:take (result-queue sm) :blocking-call nil)
+ (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result)
(when result
- (method-response-packet result))))
+ (method-response-packet result :request-id request-id))))
;Process a call method packet by placing it in the job-queue
(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
- (assert (eql (state sm) STATE-INITIALISED))
(format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call)
- (nio-compat:add job-queue (list (call-string call) (result-queue sm)))
- (when +process-jobs-inline+ (run-job :wait-on-job-pdw nil)))
+ (nio-compat:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
+ (when +process-jobs-inline+ (run-job :blocking nil)))
More information about the Nio-cvs
mailing list