[nio-cvs] r82 - in branches/home/psmith/restructure/src: io nio-logger protocol/yarpc
psmith at common-lisp.net
psmith at common-lisp.net
Sat Feb 10 23:52:33 UTC 2007
Author: psmith
Date: Sat Feb 10 18:52:32 2007
New Revision: 82
Modified:
branches/home/psmith/restructure/src/io/nio-server.lisp
branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp
branches/home/psmith/restructure/src/nio-logger/run-logging-client.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
Log:
getting there...
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp (original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp Sat Feb 10 18:52:32 2007
@@ -68,7 +68,7 @@
(defun start-server (connection-type
&key
(protocol :inet)
- (port (+ (random 60000) 1024))
+ (port 0) ;//if set then listen
(host "127.0.0.1")
(accept-connection #'trivial-accept))
(let (sock
@@ -76,7 +76,7 @@
(client-hash (make-hash-table :test 'eql))
)
- (when (not (null connection-type))
+ (when (not (eql port 0))
(format t "Binding to ~A:~A~%" host port)
(setq sock (ecase protocol
(:inet (make-inet-socket))
@@ -161,6 +161,7 @@
(when new-fd
#+nio-debug (format-log t "nio-server:start-server - adding connection to nio thread ~A~%" new-fd)
(setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd)
+ (setf (active-conn a-node) new-fd)
(add-async-fd event-queue new-fd :read-write))))
;loop over async-fd's processing where necessary
Modified: branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp
==============================================================================
--- branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp (original)
+++ branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp Sat Feb 10 18:52:32 2007
@@ -54,6 +54,7 @@
(let ((rpc (format nil "(nio-logger:remote-log \"~A\")" (cl-base64:string-to-base64-string text))))
(nio-utils:format-log t "Toplevel Submitting job~A~%" rpc)
(nio:with-connected-nodes (node)
+ (nio-utils:format-log t "Toplevel sending ~A to ~A~%" rpc node)
(nio-yarpc:remote-execute (nio:active-conn node) rpc #'callback)))))
;Runs a multithreaded system with an IO thread dealing with IO only and a 'job' thread taking and executing jobs
@@ -64,7 +65,7 @@
(setf nio-yarpc:+process-jobs-inline+ nil)
(setf +log-file-name+ out-file)
(nio:load-ips allowed-ips)
- (sb-thread:make-thread #'(lambda()(nio:start-server 'nio-yarpc:yarpc-state-machine :host listen-ip :accept-connection 'nio:check-ip)) :name "nio-server")
+ (sb-thread:make-thread #'(lambda()(nio:start-server 'nio-yarpc:yarpc-state-machine :host listen-ip :port 16323 :accept-connection 'nio:check-ip)) :name "nio-server")
(loop
;;block waiting for jobs
(nio-yarpc:run-job)))
Modified: branches/home/psmith/restructure/src/nio-logger/run-logging-client.lisp
==============================================================================
--- branches/home/psmith/restructure/src/nio-logger/run-logging-client.lisp (original)
+++ branches/home/psmith/restructure/src/nio-logger/run-logging-client.lisp Sat Feb 10 18:52:32 2007
@@ -36,4 +36,4 @@
(sb-thread:make-thread #'(lambda()(nio-logger:tail-log log-file ip)) :name "nio-server")
;;shouldn't be listenting on the client hence nil for accept SM to start-server
- (nio:start-server nil))
+ (nio:start-server 'nio-yarpc:yarpc-client-state-machine))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Sat Feb 10 18:52:32 2007
@@ -41,7 +41,8 @@
:documentation "A map from request-id (a unique id for this request) to remote-job")))
(defclass remote-job()
- ((callback :accessor callback
+ ((callback :initarg :callback
+ :accessor callback
:documentation "A function accepting one argument to call with the result of the remote operation")
(start-time :initform (get-universal-high-res)
:reader start-time
@@ -72,7 +73,7 @@
(defparameter +request-id+ 0)
(defmethod process-outgoing-packet((sm yarpc-client-state-machine))
-#+nio-debug2 (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%")
+#+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%")
(let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil)))
(when ttd
(format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd)
@@ -88,4 +89,5 @@
;Execute the call-string on the remote node and call callback with the result
(defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback)
- (nio-compat:add (job-queue sm) '((remote-job callback) call-string)))
+ (format-log t "yarpc-client-state-machine:remote-execute called :sm ~A :call-string ~A :callback ~A~%" sm call-string callback)
+ (nio-compat:add (job-queue sm) (list (remote-job callback) call-string)))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp Sat Feb 10 18:52:32 2007
@@ -40,7 +40,7 @@
(defconstant +PACKET-ID-SIZE+ 1)
(defconstant +PACKET-LENGTH-SIZE+ 4)
-;(defconstant +PACKET-REQUEST-ID+ 4)
+(defconstant +PACKET-REQUEST-ID-SIZE+ 4)
(defconstant +yarpc-packet-header-size+
(+ +PACKET-ID-SIZE+ +PACKET-LENGTH-SIZE+))
@@ -53,8 +53,8 @@
(if (<= (- packet-length +yarpc-packet-header-size+) (remaining buf)) ;is the whole packet available in the buffer?
(let* ((packet-request-id (bytebuffer-read-32 buf))
(ret-packet (ecase packet-id
- (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id)))
- (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id))))))
+ (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+ +PACKET-REQUEST-ID-SIZE+)) :request-id packet-request-id)))
+ (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+ +PACKET-REQUEST-ID-SIZE+)) :request-id packet-request-id))))))
(compact buf)
#+nio-debug (format-log t "yarpc-packet-factory:get-packet - after compact ~%~A~%" buf)
#+nio-debug (format-log t "yarpc-packet-factory:get-packet - retuirning packet ~A~%" ret-packet)
@@ -70,10 +70,12 @@
((request-id :initarg :request-id
:reader request-id)))
-(defclass call-method-packet (yarpc-packet)((call-string :initarg :call-string
- :accessor call-string)))
-(defun call-method-packet (call-string)
- (make-instance 'call-method-packet :call-string call-string))
+(defclass call-method-packet (yarpc-packet)
+ ((call-string :initarg :call-string
+ :accessor call-string)))
+
+(defun call-method-packet (call-string &key request-id)
+ (make-instance 'call-method-packet :call-string call-string :request-id request-id))
(defmethod print-object ((packet call-method-packet) stream)
(format stream "#<CALL-METHOD-PACKET ~A >" (call-string packet)))
@@ -103,8 +105,8 @@
((response :initarg :response
:accessor response)))
-(defun method-response-packet (response)
- (make-instance 'method-response-packet :response response))
+(defun method-response-packet (response &key request-id)
+ (make-instance 'method-response-packet :response response :request-id request-id))
(defmethod print-object ((packet method-response-packet) stream)
(format stream "#<METHID-RESPONSE-PACKET ~A >" (response packet)))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Sat Feb 10 18:52:32 2007
@@ -60,17 +60,20 @@
(defun run-job(&key (blocking t))
(format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%")
- (destructuring-bind (job request-id result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call blocking)
- (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job)
- (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))
+ (let ((server-job (nio-compat:take nio-yarpc:job-queue :blocking-call blocking)))
+ (when server-job
+ (destructuring-bind (job request-id result-queue) server-job
+ (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job)
+ (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
(defmethod process-outgoing-packet((sm yarpc-state-machine))
(format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" )
- (destructuring-bind (request-id result) (nio-compat:take (result-queue sm) :blocking-call nil)
- (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result)
- (when result
- (method-response-packet result :request-id request-id))))
+ (let ((server-job (nio-compat:take (result-queue sm) :blocking-call nil)))
+ (when server-job
+ (destructuring-bind (request-id result) server-job
+ (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result)
+ (method-response-packet result :request-id request-id)))))
;Process a call method packet by placing it in the job-queue
(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
More information about the Nio-cvs
mailing list