[nio-cvs] r41 - in branches/home/psmith/restructure: . src/buffer src/compat src/io src/protocol/yarpc src/statemachine
psmith at common-lisp.net
psmith at common-lisp.net
Thu Jan 18 04:01:11 UTC 2007
Author: psmith
Date: Wed Jan 17 23:01:11 2007
New Revision: 41
Modified:
branches/home/psmith/restructure/run-yarpc.lisp
branches/home/psmith/restructure/src/buffer/buffer.lisp
branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
branches/home/psmith/restructure/src/io/async-fd.lisp
branches/home/psmith/restructure/src/io/async-socket.lisp
branches/home/psmith/restructure/src/io/nio-server.lisp
branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp
branches/home/psmith/restructure/src/statemachine/state-machine.lisp
Log:
Yarpc working end-to-end
Modified: branches/home/psmith/restructure/run-yarpc.lisp
==============================================================================
--- branches/home/psmith/restructure/run-yarpc.lisp (original)
+++ branches/home/psmith/restructure/run-yarpc.lisp Wed Jan 17 23:01:11 2007
@@ -2,11 +2,10 @@
(require :asdf)
(require :nio-yarpc)
-(let ((jobq (nio-compat:concurrent-queue)))
- (sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity #'(lambda()(nio-yarpc:yarpc-state-machine jobq)) :host "127.0.0.1")) :name "nio-server")
- (format t "server toplevel waiting for job~%" )
- (loop
+(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1")) :name "nio-server")
+(loop
;;block waiting for jobs
- (multiple-value-bind (job result-queue) (nio-compat:take jobq)
+ (format t "Server toplevel waiting for job~%" )
+ (destructuring-bind (job result-queue) (nio-compat:take nio-yarpc:job-queue)
(format t "Server received job ~A~%" job)
- (nio-compat:add result-queue (nio-yarpc:execute-call job)))))
+ (nio-compat:add result-queue (nio-yarpc:execute-call job))))
Modified: branches/home/psmith/restructure/src/buffer/buffer.lisp
==============================================================================
--- branches/home/psmith/restructure/src/buffer/buffer.lisp (original)
+++ branches/home/psmith/restructure/src/buffer/buffer.lisp Wed Jan 17 23:01:11 2007
@@ -55,25 +55,45 @@
;;Utils by slyrus (http://paste.lisp.org/display/11149)
(defun hex-dump-byte (address)
- (format nil "~2,'0X"
- (sb-alien:deref
- (sb-alien:sap-alien
- (sb-alien::int-sap address)
- (* (sb-alien:unsigned 8))))))
+ (format nil "~2,'0X" (byte-value address)))
+
+(defun byte-value (address)
+ (sb-alien:deref
+ (sb-alien:sap-alien
+ (sb-alien::int-sap address)
+ (* (sb-alien:unsigned 8)))))
(defun hex-dump-memory (start-address length)
(loop for i from start-address below (+ start-address length)
collect (format nil (hex-dump-byte i))))
+;;-- end utils
+
+
+(defun pretty-hex-dump (start-address length)
+; (format t "start: ~A length ~A~%" start-address length)
+ (with-output-to-string (str)
+ (let ((rows (floor (/ length 16))))
+; (format t "rows: ~A remainder ~A~%" rows remainder)
+ (dotimes (row-index (+ 1 rows))
+ (format str "~A~%"
+ (with-output-to-string (readable)
+ (dotimes (column-index 16)
+ (let ((address (+ start-address (* row-index 16) column-index)))
+ ; (format t "Current address : ~A~%" address)
+ (if (>= address (+ start-address length))
+ (progn
+ (format str "--")
+ (format readable "--"))
+ (progn
+ (format str (if (eql column-index 7) "~A " "~A ") (hex-dump-byte address))
+ (format readable "~A" (code-char (byte-value address)))))))))))))
(defun make-uint8-seq (size)
"Make uint8 sequence."
(make-sequence '(vector (unsigned-byte 8)) size :initial-element 0))
-;;-- end utils
-
-
;;A buffer that deals with bytes
(defclass byte-buffer (buffer)())
@@ -83,7 +103,7 @@
(defmethod print-object ((byte-buffer byte-buffer) stream)
(with-slots (capacity position limit buf) byte-buffer
- (format stream "<byte-buffer :capacity ~A :position ~A :limit ~A :buf ~%~A>~%" capacity position limit (if buf (hex-dump-memory (cffi:pointer-address buf) limit) nil))))
+ (format stream "<byte-buffer :capacity ~A :position ~A :limit ~A :buf ~%~A>~%" capacity position limit (if buf (pretty-hex-dump (cffi:pointer-address buf) limit) nil))))
(defmethod free-buffer((byte-buffer byte-buffer))
(with-slots (capacity position limit buf) byte-buffer
Modified: branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
==============================================================================
--- branches/home/psmith/restructure/src/compat/concurrent-queue.lisp (original)
+++ branches/home/psmith/restructure/src/compat/concurrent-queue.lisp Wed Jan 17 23:01:11 2007
@@ -47,7 +47,7 @@
`(if ,a-buffer
(let ((head (car ,a-buffer)))
(setf ,a-buffer (cdr ,a-buffer))
-#+nio-debug (format t "reader ~A woke, read ~A as ~A~%" sb-thread:*current-thread* head ,loc)
+#+nio-debug (format t "concurent-queue:take - (~A) read ~A at ~A~%" sb-thread:*current-thread* head ,loc)
head)
nil))
@@ -64,6 +64,7 @@
;Append the element to the tail of this queue
(defmethod add ((queue concurrent-queue) elt)
+#+nio-debug (format t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt)
(sb-thread:with-mutex ((buffer-lock queue))
(setf (buffer queue) (append (buffer queue) (list elt)) )
(sb-thread:condition-notify (buffer-queue queue))))
Modified: branches/home/psmith/restructure/src/io/async-fd.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-fd.lisp (original)
+++ branches/home/psmith/restructure/src/io/async-fd.lisp Wed Jan 17 23:01:11 2007
@@ -125,11 +125,12 @@
(error 'read-error)))
((= new-bytes 0)
- nil);;(throw 'end-of-file nil))
+ nil);;(throw 'end-of-file nil)
(t
;;Update buffer position
- (inc-position foreign-read-buffer new-bytes))))))
+ (inc-position foreign-read-buffer new-bytes)
+ (setf (read-ready state-machine) nil))))))
(defun close-async-fd (async-fd)
"Close ASYNC-FD's fd after everything has been written from write-queue."
Modified: branches/home/psmith/restructure/src/io/async-socket.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-socket.lisp (original)
+++ branches/home/psmith/restructure/src/io/async-socket.lisp Wed Jan 17 23:01:11 2007
@@ -178,7 +178,7 @@
-(defun socket-accept (socket-fd connection-factory)
+(defun socket-accept (socket-fd connection-type)
"Accept connection from SOCKET-FD. Allocates and returns socket structure denoting the connection."
(flet ((parse-inet6-addr (addr)
@@ -202,7 +202,7 @@
;; accept connection
(let* ((res (%accept socket-fd addr len))
;; (async-socket-fd (make-instance 'async-socket-fd :read-fd res :write-fd res)))
- (async-socket-fd (create-state-machine connection-factory res res (make-instance 'async-socket-fd))))
+ (async-socket-fd (create-state-machine connection-type res res (make-instance 'async-socket-fd))))
(unless (< res 0)
(let ((len-value (mem-ref len :unsigned-int)))
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp (original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp Wed Jan 17 23:01:11 2007
@@ -55,7 +55,7 @@
-(defun start-server (connection-handler accept-filter connection-factory
+(defun start-server (connection-handler accept-filter connection-type
&key
(protocol :inet)
(port (+ (random 60000) 1024))
@@ -99,7 +99,7 @@
(cond
;; new connection
((= fd sock)
- (let ((async-fd (socket-accept fd connection-factory)))
+ (let ((async-fd (socket-accept fd connection-type)))
#+nio-debug (format t "start-server - New conn: ~A~%" async-fd)
(cond
((null async-fd)
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp Wed Jan 17 23:01:11 2007
@@ -32,7 +32,7 @@
yarpc-state-machine-factory get-packet-factory
;; yarpc-state-machine
- yarpc-state-machine
+ yarpc-state-machine job-queue
;to be moved
test-rpc test-rpc-list test-rpc-string execute-call
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Wed Jan 17 23:01:11 2007
@@ -38,7 +38,7 @@
:documentation "The queue used to hand off work from an external thread to the io thread")
(result-queue :initform (nio-compat:concurrent-queue)
:accessor result-queue
- :documentation "The queue used to hand off work from an external thread to the io thread")))
+ :documentation "The queue used to return results from the io thread to an external thread")))
(defun yarpc-client-state-machine ()
(make-instance 'yarpc-client-state-machine))
@@ -55,26 +55,26 @@
(defconstant STATE-INITIALISED 0)
(defconstant STATE-SENT-REQUEST 1)
-(defparameter state STATE-INITIALISED)
-
(defmethod process-outgoing-packet((sm yarpc-client-state-machine))
(format t "process-outgoing-packet called, polling the job-queue ~%")
(let ((packet (nio-compat:take (job-queue sm) :blocking-call nil)))
(when packet
(format t "process-outgoing-packet got job ~A ~%" packet)
- (setf state STATE-SENT-REQUEST))
+ (setf (state sm) STATE-SENT-REQUEST))
packet))
(defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet))
- (assert (eql state STATE-SENT-REQUEST))
+ (assert (eql (state sm) STATE-SENT-REQUEST))
(format t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response)
- (nio-compat:add (result-queue sm) response)
- (setf state STATE-INITIALISED))
+ (let* ((*package* (find-package :nio-yarpc))
+ (result (read-from-string (response response))))
+ (nio-compat:add (result-queue sm) result)
+ (setf (state sm) STATE-INITIALISED)))
;Called from an external thread i.e. *not* the nio thread
;Blocks calling thread on the remote m/c's response
(defmethod remote-execute ((sm yarpc-client-state-machine) call-string)
; (queue-outgoing-packet
- (assert (eql state STATE-INITIALISED))
+ (assert (eql (state sm) STATE-INITIALISED))
(nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string))
(nio-compat:take (result-queue sm)))
\ No newline at end of file
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Wed Jan 17 23:01:11 2007
@@ -33,20 +33,13 @@
;; A server that processes remote procedure calls and returns results
;;
(defclass yarpc-state-machine (state-machine)
- ((job-queue :initarg :job-queue
- :initform (error "Must supply a job queue to write work to.")
- :accessor job-queue
- :documentation "The queue used to hand off work from the NIO thread to an external thread for execution")
+ (
(result-queue :initform (nio-compat:concurrent-queue)
:accessor result-queue
:documentation "The queue used to return results from an external thread to the nio thread")))
-(defun yarpc-state-machine (read-fd write-fd socket job-queue)
- (let ((sm (make-instance 'yarpc-state-machine :read-fd read-fd :write-fd write-fd :socket socket :job-queue job-queue)))
- (nio-buffer:clear (foreign-read-buffer sm))
- (nio-buffer:clear (foreign-write-buffer sm))
- (format t "yarpc-state-machine - Created ~S~%" sm)
- sm))
+(defparameter job-queue (nio-compat:concurrent-queue)
+ "The queue used to hand off work from the NIO thread to an external thread for execution")
(defparameter yarpc-pf (yarpc-packet-factory))
@@ -59,27 +52,18 @@
(defconstant STATE-INITIALISED 0)
(defconstant STATE-SEND-RESPONSE 1)
-(defparameter state STATE-INITIALISED)
-
-
(defmethod process-outgoing-packet((sm yarpc-state-machine))
(format t "yarpc-state-machine: process-outgoing-packet called, polling the results-queue ~%")
- (let ((packet (nio-compat:take (result-queue sm) :blocking-call nil)))
- (format t "yarpc-state-machine: process-outgoing-packet got result ~A ~%" packet)
- packet))
-
+ (let ((result (nio-compat:take (result-queue sm) :blocking-call nil)))
+ (format t "yarpc-state-machine: process-outgoing-packet got result ~A ~%" result)
+ (when result
+ (method-response-packet result))))
-;Process a call method packet, returns
+;Process a call method packet by placing it in the job-queue
(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
- (assert (eql state STATE-INITIALISED))
+ (assert (eql (state sm) STATE-INITIALISED))
(format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call)
- (nio-compat:add (job-queue sm) (cons (call-string call) (result-queue sm))))
-
-
-;Called from an external thread i.e. *not* the nio thread
-;Blocks waiting for a job (call-string,result-queue) to process and return the result into the result queue
-;(defmethod get-job ((sm yarpc-state-machine))
-; (values (nio-compat:take (job-queue sm)) (result-queue sm)))
+ (nio-compat:add job-queue (list (call-string call) (result-queue sm))))
Modified: branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp (original)
+++ branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp Wed Jan 17 23:01:11 2007
@@ -29,5 +29,5 @@
(:export
;; state-machine
- state-machine packet-factory get-packet-factory get-packet process-outgoing-packet process-incoming-packet
+ state-machine packet-factory get-packet-factory get-packet process-outgoing-packet process-incoming-packet state
))
Modified: branches/home/psmith/restructure/src/statemachine/state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/statemachine/state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/statemachine/state-machine.lisp Wed Jan 17 23:01:11 2007
@@ -37,7 +37,9 @@
;This way only the protocols packet heirarchy knows about binary representations and
; the SM can deal with protocol logic and state maintenance
;
-(defclass state-machine (async-fd)())
+(defclass state-machine (async-fd)
+ ((state :initform 0
+ :accessor state)))
(defmethod print-object ((sm state-machine) stream)
(format stream "#<STATE-MACHINE ~A >" (call-next-method sm nil)))
More information about the Nio-cvs
mailing list