[nio-cvs] r35 - in branches/home/psmith/restructure: . src/buffer src/event src/io src/protocol/yarpc src/statemachine
psmith at common-lisp.net
psmith at common-lisp.net
Mon Jan 15 04:00:43 UTC 2007
Author: psmith
Date: Sun Jan 14 23:00:39 2007
New Revision: 35
Added:
branches/home/psmith/restructure/run-yarpc-client.lisp
Modified:
branches/home/psmith/restructure/src/buffer/buffer.lisp
branches/home/psmith/restructure/src/buffer/nio-buffer-package.lisp
branches/home/psmith/restructure/src/event/epoll.lisp
branches/home/psmith/restructure/src/io/async-fd.lisp
branches/home/psmith/restructure/src/io/nio-package.lisp
branches/home/psmith/restructure/src/io/nio-server.lisp
branches/home/psmith/restructure/src/io/packet.lisp
branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp
branches/home/psmith/restructure/src/statemachine/state-machine.lisp
Log:
yarpc - Send packet OK
Added: branches/home/psmith/restructure/run-yarpc-client.lisp
==============================================================================
--- (empty file)
+++ branches/home/psmith/restructure/run-yarpc-client.lisp Sun Jan 14 23:00:39 2007
@@ -0,0 +1,11 @@
+(push :nio-debug *features*)
+(require :asdf)
+(require :nio-yarpc)
+
+(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1" :port 9897)) :name "nio-server")
+(sleep 4)
+(let ((sm (nio:add-connection "127.0.0.1" 16323 'nio-yarpc:yarpc-state-machine)))
+(format t "toplevel adding conn ~A~%" sm)
+(format t "Result of remote-execute ~A~%" (nio-yarpc:remote-execute sm "(test-rpc-list)")))
+
+
Modified: branches/home/psmith/restructure/src/buffer/buffer.lisp
==============================================================================
--- branches/home/psmith/restructure/src/buffer/buffer.lisp (original)
+++ branches/home/psmith/restructure/src/buffer/buffer.lisp Sun Jan 14 23:00:39 2007
@@ -131,17 +131,24 @@
(sb-ext:octets-to-string (bytebuffer-read-vector bb num-bytes-to-read) :external-format external-format))
+;grrr...
+;(defmethod bytebuffer-write-byte ((bb byte-buffer) value)
+; (cffi:%mem-set value (buffer-buf bb) :unsigned-char position)
+; (inc-position bb 1))
+
;; Write bytes from vector vec to bytebuffer
(defmethod bytebuffer-write-vector((bb byte-buffer) vec)
:documentation "Returns number of bytes written to bytebuffer"
- (if (> (remaining bb) 0)
- 0
+ (format t "bytebuffer-write-vector - called with ~A ~A"bb vec)
+; (if (> (remaining bb) 0)
+; 0
(progn
- (clear bb)
- (let ((bytes-written (cffi:mem-write-vector vec (buffer-buf bb) :unsigned-char)))
- (format t "bytebuffer-write-vector - byteswritten: ~A" bytes-written)
+; (clear bb)
+ (let ((bytes-written (cffi:mem-write-vector vec (buffer-buf bb) :unsigned-char (length vec) (buffer-position bb))))
+ (format t "bytebuffer-write-vector - byteswritten: ~A~%" bytes-written)
(inc-position bb bytes-written)
- bytes-written))))
+ bytes-written)))
+;)
;; Writes data from string str to bytebuffer using specified encoding
;TODO move string-to-octets into nio-compat
Modified: branches/home/psmith/restructure/src/buffer/nio-buffer-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/buffer/nio-buffer-package.lisp (original)
+++ branches/home/psmith/restructure/src/buffer/nio-buffer-package.lisp Sun Jan 14 23:00:39 2007
@@ -27,5 +27,5 @@
(defpackage :nio-buffer (:use :cl)
(:export
- byte-buffer free-buffer remaining inc-position get-string buffer-buf bytebuffer-write-vector bytebuffer-write-string bytebuffer-read-vector bytebuffer-read-string flip
+ byte-buffer free-buffer remaining inc-position get-string buffer-buf bytebuffer-write-vector bytebuffer-write-string bytebuffer-read-vector bytebuffer-read-string flip clear buffer-position
))
Modified: branches/home/psmith/restructure/src/event/epoll.lisp
==============================================================================
--- branches/home/psmith/restructure/src/event/epoll.lisp (original)
+++ branches/home/psmith/restructure/src/event/epoll.lisp Sun Jan 14 23:00:39 2007
@@ -76,14 +76,14 @@
#+nio-debug (format t "poll-events called with :event-queue ~A~%" event-queue)
(with-foreign-object (events 'epoll-event +epoll-size+)
(memzero events (* +epoll-event-size+ +epoll-size+))
- (loop for res = (%epoll-wait event-queue events +epoll-size+ -1)
+ (loop for res = (%epoll-wait event-queue events +epoll-size+ 1000)
do
(progn
#+nio-debug (format t "poll-events - dealing with ~A~%" res)
(case res
(-1 (error 'poll-error))
- (0 nil)
+ (return nil)
(t
(let ((idents nil))
(loop for i from 0 below res do
Modified: branches/home/psmith/restructure/src/io/async-fd.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-fd.lisp (original)
+++ branches/home/psmith/restructure/src/io/async-fd.lisp Sun Jan 14 23:00:39 2007
@@ -47,18 +47,19 @@
(read-fd :initarg :read-fd
:accessor read-fd)
- (foreign-read-buffer :initform (byte-buffer 4096))
- (foreign-write-buffer :initform (byte-buffer 4096)
+ (foreign-read-buffer :initform (byte-buffer 1024)
+ :accessor foreign-read-buffer)
+ (foreign-write-buffer :initform (byte-buffer 1024)
:accessor foreign-write-buffer)
;; (lisp-read-buffer :initform (make-uint8-seq 1024))
;; (lisp-read-buffer-write-ptr :initform 0)
- (read-ready-p :initform nil
- :accessor read-ready-p
+ (read-ready :initform nil
+ :accessor read-ready
:documentation "Have we been notified as read ready and not received EAGAIN from %read?")
- (write-ready-p :initform nil
- :accessor write-ready-p
+ (write-ready :initform nil
+ :accessor write-ready
:documentation "Have we been notified as write ready and not received EAGAIN from %write?")
(close-pending :initform nil)
@@ -73,21 +74,28 @@
(defmethod print-object ((async-fd async-fd) stream)
- (with-slots (read-fd write-fd) async-fd
- (format stream "#<ASYNC-FD r/w fd: ~D/~D.>"
- read-fd write-fd)))
+ (with-slots (socket read-fd write-fd) async-fd
+ (format stream "#<ASYNC-FD :socket ~D :read-fd ~D :write-fd ~D.>"
+ socket read-fd write-fd)))
+;;Implement this in concrete SM for read
+(defgeneric process-read (async-fd))
+
+;;Implement this in concrete SM for read
+(defgeneric process-write (async-fd))
-;;SM factory
+
+
+;Loop over state machines calling process-outgoing-packets via state-machine::process-write
+
+;;SM factory
(defun create-state-machine(sm-type read-fd write-fd socket)
(let ((sm (make-instance sm-type :read-fd read-fd :write-fd write-fd :socket socket)))
(format t "create-state-machine - Created ~S~%" sm)
- (nio-buffer:flip (foreign-write-buffer sm))
+ (nio-buffer:clear (foreign-read-buffer sm))
+ (nio-buffer:clear (foreign-write-buffer sm))
sm))
-;;Implement this in concrete SM for read
-(defgeneric process-read (async-fd))
-
;;override this in concrete SM for close
;(defmethod process-close((async-fd async-fd)reason)())
(defmethod process-close((async-fd async-fd)reason)())
@@ -108,9 +116,9 @@
(with-slots (foreign-read-buffer read-fd) state-machine
(format t "read-more called with ~A~%" state-machine)
-#+nio-debug (format t "read-more - calling read()~%")
+#+nio-debug (format t "read-more - calling read() into ~A~%" foreign-read-buffer)
(let ((new-bytes (%read read-fd (buffer-buf foreign-read-buffer) (remaining foreign-read-buffer))))
-#+nio-debug (format t "read-more : Read ~A bytes~%" new-bytes)
+#+nio-debug (format t "read-more : Read ~A bytes into ~A~%" new-bytes foreign-read-buffer)
(cond
((< new-bytes 0)
(progn
@@ -124,13 +132,8 @@
nil);;(throw 'end-of-file nil))
(t
- (progn
;;Update buffer position
- (inc-position foreign-read-buffer new-bytes)
-
-#+nio-debug (format t "read-more prior to process :buffer ~A~%" foreign-read-buffer)
- (process-read state-machine)))))))
-
+ (inc-position foreign-read-buffer new-bytes))))))
(defun close-async-fd (async-fd)
"Close ASYNC-FD's fd after everything has been written from write-queue."
@@ -149,33 +152,38 @@
(defun write-more (async-fd)
"Write data from ASYNC-FD's write bytebuffer"
-#+nio-debug (format t "write-more called with ~A~%" async-fd)
+#+nio-debug (format t "async-fd:write-more - called with ~A~%" async-fd)
(with-slots (write-fd foreign-write-buffer close-pending) async-fd
- (setf (write-ready-p async-fd) t)
-#+nio-debug (format t "foreign-write-buffer b4 flip ~A~%" foreign-write-buffer)
+#+nio-debug (format t "async-fd:write-more - foreign-write-buffer b4 flip ~A~%" foreign-write-buffer)
(nio-buffer:flip foreign-write-buffer)
-#+nio-debug (format t "foreign-write-buffer after flip ~A~%" foreign-write-buffer)
+#+nio-debug (format t "async-fd:write-more -foreign-write-buffer after flip ~A~%" foreign-write-buffer)
(let ((now-written 0))
(do ((total-written 0))
((or (eql now-written -1) (eql (remaining foreign-write-buffer) 0)) total-written)
(progn
(setf now-written (%write write-fd (buffer-buf foreign-write-buffer) (remaining foreign-write-buffer)))
-
- (format t "after write :foreign-write-buffer ~A :now-written ~A :total-written ~A ~%" foreign-write-buffer now-written total-written)
+
(when (not (eql now-written -1))
(inc-position foreign-write-buffer now-written)
- (incf total-written now-written))))
+ (incf total-written now-written)))
+#+nio-debug (format t "async-fd:write-more - after write :foreign-write-buffer ~A :now-written ~A :total-written ~A ~%" foreign-write-buffer now-written total-written)
+ )
+
+ (if (eql now-written -1)
;;Deal with failure
- (when (eql now-written -1)
(let ((err (get-errno)))
(format t "write-more - write returned -1 :errno ~A~%" err)
(unless (eql err 11) ;; eagain - failed to write whole buffer need to wait for next notify
(let ((err-cond (make-instance 'write-error :error err)))
(close err-cond)
- (error err-cond))))))
+ (error err-cond))))
+ ;;update buffers
+ (if (eql (remaining foreign-write-buffer) 0)
+ (clear foreign-write-buffer)
+ (error 'not-implemented-yet))))
#+nio-debug (format t "write buffer after write :~A~%" foreign-write-buffer)
(when (eql (remaining foreign-write-buffer) 0)
Modified: branches/home/psmith/restructure/src/io/nio-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-package.lisp (original)
+++ branches/home/psmith/restructure/src/io/nio-package.lisp Sun Jan 14 23:00:39 2007
@@ -29,13 +29,13 @@
(:export
;; async-fd.lisp
- async-fd process-read foreign-read-buffer foreign-write-buffer close-sm
+ async-fd process-read process-write foreign-read-buffer foreign-write-buffer close-sm
;; async-socket.lisp
;;nio-server
- start-server
+ start-server add-connection
;;packet
- packet
+ packet write-bytes
))
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp (original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp Sun Jan 14 23:00:39 2007
@@ -33,6 +33,28 @@
;; (format t "Accepting connection from ~S:~D [~A].~%" host port proto)
t)
+;TODO thread safety
+(defparameter +connected-sockets+ nil
+ "List of sockets that have been connected and are awaiting addition to the event-notification system")
+
+;loop over hashtable
+(defun process-async-fds (client-hash)
+ (maphash #'(lambda (k async-fd)
+ (format t "Dealing with ~a => ~a~%" k async-fd)
+
+ ;process reads
+ (when (read-ready async-fd) (read-more async-fd))
+ (when (> (buffer-position (foreign-read-buffer async-fd)) 0)
+ (process-read async-fd))
+
+ ;process-writes
+ (process-write async-fd)
+ (when (write-ready async-fd) (write-more async-fd)))
+ client-hash))
+
+
+
+
(defun start-server (connection-handler accept-filter connection-type
&key
(protocol :inet)
@@ -70,9 +92,8 @@
(declare (ignore cond))
(format t "Poll-error, exiting..~%")
(throw 'poll-error-exit nil))))
-
- (loop for unix-epoll-events = (poll-events event-queue) do
-
+
+ (loop for unix-epoll-events = (poll-events event-queue) do
(loop for (fd . event) in unix-epoll-events do
(cond
@@ -113,10 +134,41 @@
(force-close-async-fd async-fd)
(throw 'error-exit nil))))
- (when (read-event-p event) (read-more async-fd))
- (when (write-event-p event) (write-more async-fd))
- )))
- ))
- )))))
+ (when (read-event-p event) (setf (read-ready async-fd) t))
+ (when (write-event-p event) (setf (write-ready async-fd) t))))))))
+ (format t "Process client adds~%")
+
+ ;add outgoing sockets to event queue
+ (format t "start-server::sockets enqueued ~A~%" +connected-sockets+)
+ (loop for new-fd in +connected-sockets+ do
+ (format t "Dealing with ~A~%" new-fd)
+ (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd)
+ (add-async-fd event-queue new-fd :read-write))
+
+ ;TODO thread safety
+ (setf +connected-sockets+ nil)
+
+ ;loop over async-fd's processing where necessary
+ (process-async-fds client-hash)
+ ))))
(ignore-errors
(close-fd sock))))
+
+
+(defun add-connection (host port connection-type
+ &key
+ (protocol :inet)
+
+ )
+ (let ((sock nil))
+ (setq sock (ecase protocol
+ (:inet (make-inet-socket))
+ (:inet6 (make-inet6-socket))))
+
+ (if (connect-inet-socket sock host port)
+ (let ((sm (create-state-machine connection-type sock sock sock)))
+ (push sm +connected-sockets+)
+ (format t "add-connection::sockets enqueued ~A~%" +connected-sockets+)
+ (return-from add-connection sm))
+ (format t "Connect failed!!~A ~%" (get-errno)))))
+
\ No newline at end of file
Modified: branches/home/psmith/restructure/src/io/packet.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/packet.lisp (original)
+++ branches/home/psmith/restructure/src/io/packet.lisp Sun Jan 14 23:00:39 2007
@@ -28,13 +28,13 @@
;; state-machines instantiate packets for the associated protocol
;; either based on incomming data from a packet factory or in
-;; preperation for sending a packet for the current protocol.
+;; preparation for sending a packet for the current protocol.
+;;
+;; All concete packets implement write-bytes for xfer to the io layer
-;; All concete packets implement get-bytes for xfer to the io layer
(defclass packet ()
())
-(defmethod get-bytes((a-packet packet))
- ())
-
+;Implement in concrete
+(defgeneric write-bytes(packet nio-buffer))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp Sun Jan 14 23:00:39 2007
@@ -29,5 +29,5 @@
(:export
;; yarpc-state-machine
- yarpc-state-machine yarpc-state-machine-factory test-rpc test-rpc-list test-rpc-string get-packet-factory
+ yarpc-state-machine yarpc-state-machine-factory test-rpc test-rpc-list test-rpc-string get-packet-factory remote-execute
))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp Sun Jan 14 23:00:39 2007
@@ -35,11 +35,11 @@
(defun yarpc-packet-factory ()
(make-instance 'yarpc-packet-factory))
-(defconstant CALL-METHOD-PACKET-ID 0)
+(defconstant CALL-METHOD-PACKET-ID #x0)
(defconstant METHOD-RESPONSE-PACKET-ID 1)
(defmethod get-packet ((pf yarpc-packet-factory) buf)
- (nio-buffer:flip buf)
+ (flip buf)
; (format t "get-packet::read string - ~A~%" (bytebuffer-read-string buf (remaining buf)))
(if (>= (remaining buf) 1) ;; First byte denotes packet ID
(ecase (elt (bytebuffer-read-vector buf 1) 0)
@@ -49,6 +49,17 @@
(defclass call-method-packet (packet)((call-string :initarg :call
:accessor get-call-string)))
+(defmethod print-object ((packet call-method-packet) stream)
+ (format stream "#<CALL-METHOD-PACKET ~A >" (get-call-string packet)))
+
+(defmethod write-bytes((packet call-method-packet) buf)
+ (format t "yarpc-packet-factory:write-bytes - writing ~A to ~A~%" packet buf)
+; (nio-buffer:flip buf)
+ (nio-buffer:bytebuffer-write-vector buf #(#x0))
+ (nio-buffer:bytebuffer-write-string buf (get-call-string packet))
+ (format t "yarpc-packet-factory:write-bytes - written ~A~%" buf) )
+
+
(defclass method-response-packet (packet)())
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Sun Jan 14 23:00:39 2007
@@ -40,7 +40,10 @@
;; (test-rpc "who" 2 's)
;; response - who 2 'S
;;
-(defclass yarpc-state-machine (state-machine)())
+(defclass yarpc-state-machine (state-machine)
+ ((outgoing-packet :initarg :outgoing-packet
+ :accessor outgoing-packet
+ :initform nil)))
(defun yarpc-state-machine ()
(make-instance 'yarpc-state-machine))
@@ -88,8 +91,16 @@
(define-condition authorization-error (error) ())
+
+(defmethod process-outgoing-packet((sm yarpc-state-machine))
+ (format t "process-outgoing-packet called~%")
+ (let ((packet (outgoing-packet sm)))
+ (setf (outgoing-packet sm) nil)
+ packet))
+
+
;Process a call method packet, returns
-(defmethod process-packet ((sm yarpc-state-machine) (call call-method-packet))
+(defmethod process-incomming-packet ((sm yarpc-state-machine) (call call-method-packet))
;todo change state, create method-response packet and return it
;(assert (eql state 0))
(handler-case
@@ -111,3 +122,7 @@
(apply (first rpc-call-list) (rest rpc-call-list))
(error 'authorization-error))))
+
+(defmethod remote-execute ((sm yarpc-state-machine) call-string)
+ (setf (outgoing-packet sm) (make-instance 'call-method-packet :call call-string)))
+
\ No newline at end of file
Modified: branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp (original)
+++ branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp Sun Jan 14 23:00:39 2007
@@ -29,5 +29,5 @@
(:export
;; state-machine
- state-machine packet-factory get-packet-factory get-packet
+ state-machine packet-factory get-packet-factory get-packet process-outgoing-packet process-incoming-packet
))
Modified: branches/home/psmith/restructure/src/statemachine/state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/statemachine/state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/statemachine/state-machine.lisp Sun Jan 14 23:00:39 2007
@@ -32,7 +32,7 @@
;Base class for state machines
;
;Converts incomming data between bytes and packets using the supplied packet-factory.
-;Converts outgoing data between packets and bytes using the get-bytes method on packet.
+;Converts outgoing data between packets and bytes using the write-bytes method on packet.
;
;This way only the protocols packet heirarchy knows about binary representations and
; the SM can deal with protocol logic and state maintenance
@@ -42,21 +42,32 @@
(defmethod print-object ((sm state-machine) stream)
(format stream "#<STATE-MACHINE ~A >" (call-next-method sm nil)))
-(defgeneric process-packet(state-machine packet))
+(defgeneric process-incomming-packet(state-machine packet))
+
+
+(defgeneric process-outgoing-packet(state-machine))
+
(defgeneric get-packet-factory(state-machine))
+;The connection is read ready.
+;Use the packet factory to obtain any valid packet and pass it through
(defmethod process-read((sm state-machine))
- (with-slots (foreign-read-buffer foreign-write-buffer) sm
+ (with-slots (foreign-read-buffer) sm
(let ((incomming-packet (get-packet (get-packet-factory sm) foreign-read-buffer)))
(format t "state-machine::process-read - incomming packet: ~A~%" incomming-packet)
(when incomming-packet
- (multiple-value-bind (ret-packet close) (process-packet sm incomming-packet)
- (format t "state-machine::process-read - return packet: ~A~%" ret-packet)
- (when ret-packet (put-packet ret-packet foreign-write-buffer))
- (if close
- (close-sm sm)
- ))))))
+ (when (not (process-incomming-packet sm incomming-packet))
+ (close-sm sm))))))
+
+;The connection is write ready.
+;See if theres anything ready to be written in the SM
+(defmethod process-write((sm state-machine))
+ (with-slots (foreign-write-buffer) sm
+ (let ((outgoing-packet (process-outgoing-packet sm)))
+ (format t "state-machine::process-write - outgoing packet: ~A~%" outgoing-packet)
+ (when outgoing-packet (write-bytes outgoing-packet foreign-write-buffer)))))
+
@@ -65,7 +76,3 @@
; Get the packet in buf using the packet factory
(defgeneric get-packet (packet-factory buf))
-
-; Write the packet to the buffer
-(defun put-packet (packet buf)
- (nio-buffer:bytebuffer-write-vector buf (get-bytes packet)))
More information about the Nio-cvs
mailing list