Improvements to the CL-STOMP library, by Scott McKay <swm@itasoftware.com> qres-r452867
authorFrancois-Rene Rideau <fare@tunes.org>
Fri, 1 Apr 2011 20:12:07 +0000 (16:12 -0400)
committerFrancois-Rene Rideau <fare@tunes.org>
Fri, 1 Apr 2011 20:12:07 +0000 (16:12 -0400)
 - Make the API functions more robust against bad inputs
 - Fix some inefficiencies
 - Generally make it use better Lisp style
 - Get the example code working

Testing: The STOMP tests that Matt already wrote still work correctly.

cl-stomp-example.asd
cl-stomp.asd
cl-stomp.lisp
example.lisp

index 8f1b776..44edb1f 100644 (file)
@@ -1,7 +1,8 @@
 ;; -*- mode: lisp; -*-
 
+(in-package :asdf)
+
 (defsystem :example
     :version "0.1.0"
-    :depends-on (cl-ppcre cl-stomp)
-    :components
-    ((:file "example")))
+    :depends-on (cl-stomp)
+    :components ((:file "example")))
index 854b5a3..6339dc6 100644 (file)
@@ -7,10 +7,8 @@
 
 (defsystem cl-stomp
   :description "Implements the STOMP protocol for connecting to a message broker."
-  :author ""
+  :author "Keith Irwin, Matt Reklaitis"
   :version ""
-  :licence ""
-  :depends-on (cl-ppcre usocket)
+  :licence "MIT-style License"
+  :depends-on (usocket)
   :components ((:file "cl-stomp")))
-
-
index d312535..c80abce 100644 (file)
@@ -1,7 +1,7 @@
-;; -*- mode: lisp; indent-tabs-mode: nil; -*-
-;;
-;; Available under MIT-style License. see COPYING.
-;;
+;;; -*- mode: lisp; indent-tabs-mode: nil; -*-
+;;;
+;;; Available under MIT-style License. see COPYING.
+;;;
 
 (defpackage :cl-stomp
 
            :start
            :stop
            :frame
-           :frame-body-of
-           :frame-name-of
-           :frame-headers-of
-           :set-header
+           :frame-body
+           :frame-name
+           :frame-headers
            :get-header
-           :set-destination
+           :set-header
            :get-destination
+           :set-destination
            :register
            :register-error
            :subscribe
 
 (in-package :cl-stomp)
 
-;;-------------------------------------------------------------------------
-;; convenience utils
+
+;;;-------------------------------------------------------------------------
+;;; Convenience utils
 
 #+nil
 (defun log-debug (fmt &rest args)
+  (fresh-line *standard-output*)
   (apply #'format *standard-output* fmt args)
-  (finish-output *standard-output*))
+  (fresh-line *standard-output*)
+  (force-output *standard-output*))
 
 #-nil
 (defun log-debug (fmt &rest args)
   (map 'string #'code-char bytes))
 
 (defun string-to-bytes (string)
-  (map 'sequence 'char-code string))
+  (map 'sequence #'char-code string))
 
-(defun string-strip (str)
+(defun string-strip (string)
   "Remove spaces, tabs and line enders from a string."
-  (declare (string str))
-  (string-trim (list #\Space #\NewLine #\Return #\Tab #\Nul) str))
+  (check-type string string)
+  (string-trim '(#\Space #\NewLine #\Return #\Tab #\Nul) string))
 
 (defun string-join (lines)
   (let ((result (make-array '(0) :element-type 'character :fill-pointer 0 :adjustable t)))
@@ -63,7 +66,7 @@
         (write-line line stream)))))
 
 (defun string-split (string delim)
-  "Splits STRING at the first occurance of DELIM and returns the before and after subsequences.
+  "Splits STRING at the first occurrence of DELIM and returns the substrings before and after it.
    If DELIM is not found in STRING, returns STRING and NIL."
   (when string
     (let ((start (search delim string :test 'string=)))
           (values (subseq string 0 start)
                   (subseq string start2)))))))
 
-(defun string-lines (string)
-  (let ((result ()))
-    (with-input-from-string (stream string)
-      (loop 
-         :for line = (read-line stream nil 'eof)
-         :while (not (eql line 'eof))
-         :do (setf result (pushnew line result))))
-    (nreverse result)))
 
-;;-------------------------------------------------------------------------
-;; frame
+;;;-------------------------------------------------------------------------
+;;; The CL-STOMP API
+
+(defgeneric get-header (frame key)
+  (:documentation "Return the value of the header named KEY, or NIL if it doesn't exist."))
+
+(defgeneric set-header (frame key value)
+  (:documentation "Add a header named KEY to FRAME with VALUE.
+                   If the header already exists, VALUE replaces the existing value."))
+
+(defgeneric get-destination (frame)
+  (:documentation "Return the destination header for FRAME."))
+
+(defgeneric set-destination (frame destination)
+  (:documentation "Set the destination header for FRAME."))
+
+(defgeneric start (connection &key username passcode)
+  (:documentation "Start listening for messages from STOMP."))
+
+(defgeneric stop (connection)
+  (:documentation "Stop the connection with STOMP."))
+
+(defgeneric register (connection callback destination &optional client-ack?)
+  (:documentation "Register a listener for messages to a destination.
+                   CALLBACK should be a function which accepts a frame argument.
+                   If CLIENT-ACK? is T, the client is responsible for sending ACK."))
+
+(defgeneric register-error (connection callback)
+  (:documentation "Register a listener for STOMP error frames."))
+
+(defgeneric subscribe (connection destination &optional client-ack?)
+  (:documentation "Subscribe to a topic or queue."))
+
+(defgeneric unsubscribe (connection destination)
+  (:documentation "Unsubscribe from a topic or queue."))
+
+(defgeneric post (connection message destination &optional headers)
+  (:documentation "Post a message to a destination.
+                   HEADERS is an alist of header name and value pairs."))
+
+(defgeneric ack (connection frame &optional transaction)
+  (:documentation "Send the client an ACK for frame."))
+
+(defgeneric begin (connection transaction)
+  (:documentation "Start a transaction with the given name."))
+
+(defgeneric commit (connection transaction)
+  (:documentation "Commit a transaction with the given name."))
+
+(defgeneric rollback (connection transaction)
+  (:documentation "Abort a transaction with the given name."))
+
+
+;;;-------------------------------------------------------------------------
+;;; Frames
 
 (defclass frame ()
-  ((name
-    :initform "MESSAGE"
-    :initarg :name
-    :accessor frame-name-of)
-   (headers
-    :initform ()
-    :initarg :headers
-    :accessor frame-headers-of)
-   (body
-    :initform ""
-    :initarg :body
-    :accessor frame-body-of)))
-
-(defun make-frame (string)
-  "Construct a frame by parsing STRING according to protocol."
-  ;; declare some useful local functions
-  (labels (
-           (get-line (stream)
+  ((name :type string
+         :initform "MESSAGE"
+         :initarg :name
+         :accessor frame-name)
+   (headers :type list
+            :initform ()
+            :initarg :headers
+            :accessor frame-headers)
+   (body :type string
+         :initform ""
+         :initarg :body
+         :accessor frame-body)))
+
+(defun make-frame-from-string (string)
+  "Construct a frame by parsing STRING according to the STOMP protocol."
+  ;; Declare some useful local functions
+  (labels ((get-line (stream)
              (let ((line (read-line stream nil 'eof)))
                (unless (eql line 'eof)
                  line)))
-
            (make-header (line)
              (multiple-value-bind (before after) 
                  (string-split line ":")
                (list (string-strip before) (string-strip after))))
-
-           ;; frame name is first line
+           ;; Frame name is first line
            (get-name (stream)
              (get-line stream))
-
-           ;; frame headers are second lines through to empty line
+           ;; Frame headers are second lines through to empty line
            (get-headers (stream)
-             (loop
-               :for line = (get-line stream)
-               :while (> (length line) 0)
-               :collect (make-header line)))
-
-           ;; frame body is all lines after the empty line
+             (loop for line = (get-line stream)
+                   while (> (length line) 0)
+                   collect (make-header line)))
+           ;; Frame body is all the lines after the empty line
            (get-body (stream)
-             (coerce (loop
-                       :for c = (read-char stream nil 'eof)
-                       :while (not (eql c 'eof))
-                       :collect c)
-               'string)))
-
+             (coerce (loop for c = (read-char stream nil 'eof)
+                           while (not (eql c 'eof))
+                           collect c)
+                     'string)))
     (with-input-from-string (stream string)
       (let ((name (get-name stream))
             (headers (get-headers stream))
             (body (get-body stream)))
-        (make-instance 'frame :name name :headers headers :body body)))))
-
-(defmethod print-object ((self frame) stream)
-  "Print FRAME to STREAM according to protocol."
-  (with-slots (name headers body) self
+        (make-instance 'frame
+          :name name
+          :headers headers
+          :body body)))))
+
+;; Makes a frame with the given name and headers,
+;; evaluates the body,
+;; and then sends the frame over the connection
+(defmacro sending-frame ((connection vframe name &rest headers) &body body)
+  `(let ((,vframe (make-instance 'frame :name ,name)))
+     ,@(loop for (key val) on headers by #'cddr
+             collect (list 'set-header vframe key val))
+     (progn ,@body)
+     (send ,connection ,vframe)))
+
+;; Note that this is not a pretty-printing function
+;; The format of the output is what's expected by the STOMP protocol
+(defmethod print-object ((frame frame) stream)
+  "Print FRAME to STREAM according to the STOMP protocol."
+  (with-slots (name headers body) frame
     (format stream "~A~%" name)
     (dolist (header headers)
-      (format stream "~a:~a~%" (first header) (second header)))
-    (format stream "~%~a~%~a~%" body (code-char 0))))
-
-(defmethod set-destination ((self frame) destination)
-  "Set the destination header for FRAME."
-  (set-header self "destination" destination))
-
-(defmethod set-client-ack ((self frame))
-  "Specify a 'client' ack header for FRAME"
-  (set-header self "ack" "client"))
+      (format stream "~A:~A~%" (first header) (second header)))
+    (format stream "~%~A~%~A~%" body (code-char 0))))
 
 (defun header= (header1 header2)
-  "Case insensitive comparison func for headers."
-  (string= (string-downcase header1) (string-downcase header2)))
+  "Case insensitive comparison function for headers."
+  (string-equal (string header1) (string header2)))
 
-(defmethod get-header ((self frame) key)
+(defmethod get-header ((frame frame) (key string))
   "Return the value of the header named KEY, or NIL if it doesn't exist."
-  (with-slots (headers) self
+  (with-slots (headers) frame
     (second (assoc key headers :test #'header=))))
 
-(defmethod get-destination ((self frame))
-  "Return the value of the destination header."
-  (get-header self "destination"))
-
-(defmethod set-header ((self frame) key value)
-  "Add a header named KEY to FRAME with VALUE.
-   If the header already exists, VALUE is appended to the existing value(s)."
+(defmethod set-header ((frame frame) (key string) value)
+  "Add a header named KEY to FRAME with VALUE, which can be of any type.
+   If the header already exists, VALUE replaces the existing value."
   (when value
-    (with-slots (headers) self
+    (with-slots (headers) frame
       (if (not (assoc key headers :test #'header=))
         (setf headers (append (list (list key value)) headers))
-        (let ((result))
+        (let ((result ()))
           (dolist (header headers)
             (if (header= (first header) key)
               (push (list key value) result)
               (push header result)))
           (setf headers result))))))
 
-(defmethod error-frame-p ((self frame))
-  (string= (string-downcase (frame-name-of self)) "error"))
-
-;;-------------------------------------------------------------------------
-;; stomp
-
-(defgeneric get-header (frame key)
-  (:documentation ""))
-
-(defgeneric set-header (frame key value)
-  (:documentation ""))
-
-(defgeneric get-destination (frame)
-  (:documentation ""))
-
-(defgeneric set-destination (frame destination)
-  (:documentation "Set the destination header for the frame."))
-
-(defgeneric post (connection message destination &optional headers)
-  (:documentation "Post a message to a destination."))
+(defmethod get-destination ((frame frame))
+  "Return the destination header for FRAME."
+  (get-header frame "destination"))
 
-(defgeneric ack (connection frame &optional transaction)
-  (:documentation "Send the client ack for frame."))
-
-(defgeneric begin (connection transaction)
-  (:documentation "Start a transaction for the given name"))
-
-(defgeneric commit (connection transaction)
-  (:documentation "Commit a transaction for the given name"))
-
-(defgeneric rollback (connection transaction)
-  (:documentation "Abort a transaction for the given name"))
+(defmethod set-destination ((frame frame) (destination string))
+  "Set the destination header for FRAME."
+  (set-header frame "destination" destination))
 
-(defgeneric register (connection callback destination &optional client-ack?)
-  (:documentation "Register a listener for messages to a destination.
-                   Callback should be a lambda which accepts a frame argument.
-                   If client-ack? is t, client is responsible for sending ACK."))
+(defmethod set-client-ack ((frame frame))
+  "Specify a 'client' ack header for FRAME"
+  (set-header frame "ack" "client"))
 
-(defgeneric register-error (connection callback)
-  (:documentation "Register a listener for STOMP error frames."))
+(defmethod error-frame-p ((frame frame))
+  (string-equal (frame-name frame) "error"))
 
-(defgeneric subscribe (connection destination &optional client-ack?)
-  (:documentation "Subscribe to a topic or queue."))
 
-(defgeneric unsubscribe (connection destination)
-  (:documentation "Unsubscribe from a topic or queue."))
+;;;-------------------------------------------------------------------------
+;;; Registrations
 
-(defgeneric start (connection &key username passcode)
-  (:documentation "Start listening for messages from stomp."))
+(defclass registration ()
+  ((callback    :type (or null function) ;the callback function
+                :initform nil
+                :initarg :callback)
+   (destination :type string             ;the topic/queue name
+                :initarg :destination) 
+   (client-ack? :initarg :client-ack?))) ;use client (or auto) ack?
 
-(defgeneric stop (connection)
-  (:documentation "Stop the connection with stomp."))
 
-(defclass registration ()
-  ((callback    :initarg :callback)       ; The callback func
-   (destination :initarg :destination)    ; The topic/queue name
-   (client-ack? :initarg :client-ack?)))  ; Use client (or auto) ack?
+;;;-------------------------------------------------------------------------
+;;; Connections
 
 (defclass connection ()
-  ((host
-    :initform "localhost"
-    :initarg :host)
-   (port
-    :initform 61613
-    :initarg :port)
-   (stream
-    :initform nil
-    :initarg :stream)
-   (registrations
-    :initform '()
-    :initarg :registrations)
-   (error-callback
-     :initform nil)
-   (terminate
-    :initform nil)))
+  ((host :type string
+         :initform "localhost"
+         :initarg :host)
+   (port :type integer
+         :initform 61613
+         :initarg :port)
+   (stream :initform nil
+           :initarg :stream)
+   (registrations :type list
+                  :initform ()
+                  :initarg :registrations)
+   (error-callback :type (or null function)
+                   :initform nil)
+   (terminate :initform nil)))
 
 (defun make-connection (host port)
-  (make-instance 'connection :host host :port port))
+  (check-type host string)
+  (check-type port integer)
+  (make-instance 'connection
+    :host host
+    :port port))
+
+;;;-------------------------------------------------------------------------
+;;; Implementation of the API
 
-(defmethod start ((self connection) &key username passcode)
+(defmethod start ((conn connection) &key username passcode)
   "Connects to the message broker, sends subscriptions for any existing registrations,
    and enters a receive loop."
   (handler-bind
       ((t (lambda (e)
-            (disconnect self)
-            (log-debug "error: ~a" e))))
-    (with-slots (host port stream registrations terminate) self
+            (disconnect conn)
+            (log-debug "Error: ~A" e))))
+    (with-slots (host port stream registrations terminate) conn
       (usocket:with-client-socket (socket strm host port
-                                    :protocol :stream
-                                    :element-type '(unsigned-byte 8))
+                                   :protocol :stream
+                                   :element-type '(unsigned-byte 8))
         (setf stream strm)
-        ;;send CONNECT frame
-        (let ((frame (make-instance 'frame :name "CONNECT")))
-          (set-header frame "login" username)
-          (set-header frame "passcode" passcode)
-          (send self frame))
-        
-        ;;send SUBSCRIBE frames
-        (loop 
-          :for reg :in registrations
-          :do (with-slots (destination client-ack?) reg
-                (subscribe self destination client-ack?)))
-
-        ;;receive loop
+        ;; Send CONNECT frame
+        (connect conn username passcode)
+        ;; Send SUBSCRIBE frames
+        (loop for reg in registrations
+              do (with-slots (destination client-ack?) reg
+                   (subscribe conn destination client-ack?)))
+        ;; The receive loop
         (setf terminate nil)
         (let ((recvbuf ""))
-          (loop
-            :until terminate
-            :do (let ((sock (car (usocket:wait-for-input socket :timeout 0))))
-                  (if sock
-                    (let* ((recvstr (receive self))
-                           (newbuf (if (= (length recvbuf) 0)  ;; only call string-join when necessary
-                                     recvstr
-                                     (string-join (list recvbuf recvstr)))))
-                      (setf recvbuf (process_receive_buffer self newbuf)))
-                    (sleep 0.1)))))
-        (log-debug "terminated")))))
+          (loop until terminate
+                do (let ((sock (car (usocket:wait-for-input socket :timeout 1))))
+                     (when sock
+                       (let* ((recvstr (receive conn))
+                              (newbuf (if (= (length recvbuf) 0) ;only call string-join when necessary
+                                        recvstr
+                                        (string-join (list recvbuf recvstr)))))
+                         (setf recvbuf (process-receive-buffer conn newbuf)))))))
+        (log-debug "Terminated")))))
         
-(defmethod stop ((self connection))
+(defmethod stop ((conn connection))
   "Gracefully terminates the current receive loop and closes the connection to the message broker."
-  (with-slots (terminate) self
+  (with-slots (terminate) conn
     (setf terminate t)))
 
-(defmethod apply-callbacks ((self connection) (frame frame))
-  "Send FRAME to any matching registered callbacks."
-  (with-slots (registrations error-callback) self
-    (if (error-frame-p frame)
-      (when error-callback
-        (funcall error-callback frame))
-      (let ((dest (get-destination frame)))
-        (loop 
-          :for reg :in registrations
-          :do (with-slots (callback destination) reg
-                (if (string= dest destination)
-                  (funcall callback frame))))))))
-
-(defmethod send ((self connection) (frame frame))
-  (send self (with-output-to-string (stream)
+(defmethod connect ((conn connection) &optional username passcode)
+  (check-type username (or null string))
+  (check-type passcode (or null string))
+  (sending-frame (conn frame "CONNECT"
+                       "login" username
+                       "passcode" passcode)
+    ))
+
+(defmethod disconnect ((conn connection))
+  (with-slots (stream) conn
+    (when (and stream (open-stream-p stream))
+      (sending-frame (conn frame "DISCONNECT")
+        )
+      (close stream)
+      (setf stream nil))))
+
+(defmethod send ((conn connection) (frame frame))
+  (send conn (with-output-to-string (stream)
+               ;; We use 'print' because the 'print-object' method on frames
+               ;; does its output using the STOMP protocol
                (print frame stream))))
 
-(defmethod send ((self connection) string)
-  (with-slots (stream) self
+(defmethod send ((conn connection) (string string))
+  (with-slots (stream) conn 
     (write-sequence (string-to-bytes string) stream)
     (finish-output stream)))
-  
-(defmethod receive ((self connection))
+
+(defmethod receive ((conn connection))
   "Called whenever there's activity on the connection stream.
    Reads from the stream and returns the received buffer as a string."
-  (with-slots (stream) self
-      (let ((buffer (loop
-                      :for b = (read-byte stream nil 'eof)
-                      :while (listen stream)
-                      :collect b)))
-
-        (if (> (length buffer) 0)
-          
-          ;; return the buffer as a string
-          (string-from-bytes buffer)
-
-          ;; otherwise, it means the other end has terminated,
-          ;; so close things down
-          (progn
-            (log-debug "nothing to read from socket, closing.~%")
-            (disconnect self))))))
-
-(defmethod process_receive_buffer ((self connection) buffer)
+  (with-slots (stream) conn
+    (let ((buffer (loop for b = (read-byte stream nil 'eof)
+                        while (listen stream)
+                        collect b)))
+      (if (> (length buffer) 0)
+        ;; Return the buffer as a string
+        (string-from-bytes buffer)
+        ;; Otherwise, it means the other end has terminated,
+        ;; so close things down
+        (progn
+          (log-debug "Nothing to read from socket, closing.")
+          (disconnect conn))))))
+
+(defmethod process-receive-buffer ((conn connection) buffer)
   "Try to extract and process frame(s) from recvbuf.  Returns unprocessed buffer."
   (labels ((process-frame (frame)
-             (log-debug "frame: ~a" frame)
-             (apply-callbacks self frame))
-
-            (extract-frame ()
-              (multiple-value-bind (before after)
-                  (string-split buffer (string (code-char 0)))
-                (when after
-                  ;;got one
-                  (process-frame (make-frame before))
-                  (setf buffer after)))))
-
-    (loop 
-      :while (extract-frame))
-
+             (log-debug "Frame: ~A" frame)
+             (apply-callbacks conn frame))
+           (extract-frame ()
+             (multiple-value-bind (before after)
+                 (string-split buffer (string (code-char 0)))
+               (when after
+                 ;; Got one
+                 (process-frame (make-frame-from-string (string-strip before)))
+                 (setf buffer after)))))
+    (loop while (extract-frame))
     buffer))
 
-(defmethod subscribe ((self connection) destination &optional client-ack?)
-  (let ((frame (make-instance 'frame :name "SUBSCRIBE")))
-    (set-destination frame destination)
-    (when client-ack?
-      (set-client-ack frame))
-    (send self frame)))
-
-(defmethod unsubscribe ((self connection) destination)
-  (let ((frame (make-instance 'frame :name "UNSUBSCRIBE")))
-    (set-destination frame destination)
-    (send self frame)))
-
-(defmethod register ((self connection) callback destination &optional client-ack?)
-  "Register a callback func for a destination.  A subscription to the destination using
-   the optional client-ack? is issued for all callbacks as part of connecting to the mq server."
-  (with-slots (stream registrations) self
-    (if stream
-      (subscribe self destination client-ack?))
+(defmethod apply-callbacks ((conn connection) (frame frame))
+  "Send FRAME to any matching registered callbacks."
+  (with-slots (registrations error-callback) conn
+    (if (error-frame-p frame)
+      (when error-callback
+        (funcall error-callback frame))
+      (let ((dest (get-destination frame)))
+        (loop for reg in registrations
+              do (with-slots (callback destination) reg
+                   (when (and callback (string-equal dest destination))
+                     (funcall callback frame))))))))
+
+(defmethod register ((conn connection) callback (destination string) &optional client-ack?)
+  "Register a callback for a destination.  A subscription to the destination using the
+   optional client-ack? is issued for all callbacks as part of connecting to the MQ server."
+  (check-type callback (or null function))
+  (with-slots (stream registrations) conn
+    (when stream
+      (subscribe conn destination client-ack?))
     (setf registrations (append registrations (list (make-instance 'registration 
                                                       :callback callback
                                                       :destination destination
                                                       :client-ack? client-ack?))))))
 
-(defmethod register-error ((self connection) callback)
-  "Register a callback func for STOMP error frames."
-  (with-slots (error-callback) self
+(defmethod register-error ((conn connection) callback)
+  "Register an error callback for STOMP error frames."
+  (check-type callback (or null function))
+  (with-slots (error-callback) conn
     (setf error-callback callback)))
 
-(defmethod post ((self connection) message destination &optional headers)
-  (let ((frame (make-instance 'frame :name "SEND" :body (string-strip message))))
-    (set-destination frame destination)
-    (loop 
-      :for (key value) :in headers
-      :do (set-header frame key value))
-    (send self frame)))
+(defmethod subscribe ((conn connection) (destination string) &optional client-ack?)
+  (sending-frame (conn frame "SUBSCRIBE"
+                       "destination" destination)
+    (when client-ack?
+      (set-client-ack frame))))
+
+(defmethod unsubscribe ((conn connection) (destination string))
+  (sending-frame (conn frame "UNSUBSCRIBE"
+                       "destination" destination)
+    ))
+
+(defmethod post ((conn connection) (message string) (destination string) &optional headers)
+  (sending-frame (conn frame "SEND"
+                       "destination" destination)
+    (loop for (key value) in headers
+          do (set-header frame key value))
+    (setf (frame-body frame) (string-strip message))))
 
-(defmethod ack ((self connection) frame &optional transaction)
+(defmethod ack ((conn connection) (for-frame frame) &optional transaction)
   "Send the client ack for FRAME and optional TRANSACTION"
-  (let ((ack-frame (make-instance 'frame :name "ACK")))
-    (set-header ack-frame "message-id" (get-header frame "message-id"))
-    (set-header ack-frame "transaction" transaction)
-    (send self ack-frame)))
+  (check-type transaction (or null string))
+  (sending-frame (conn frame "ACK"
+                       "message-id" (get-header for-frame "message-id")
+                       "transaction" transaction)
+    ))
 
-(defmethod begin ((self connection) transaction)
+(defmethod begin ((conn connection) (transaction string))
   "Begin a transaction with name TRANSACTION"
-  (let ((frame (make-instance 'frame :name "BEGIN")))
-    (set-header frame "transaction" transaction)
-    (send self frame)))
+  (sending-frame (conn frame "BEGIN"
+                       "transaction" transaction)
+    ))
 
-(defmethod commit ((self connection) transaction)
+(defmethod commit ((conn connection) (transaction string))
   "Commit a transaction with name TRANSACTION"
-  (let ((frame (make-instance 'frame :name "COMMIT")))
-    (set-header frame "transaction" transaction)
-    (send self frame)))
-
-;; naming this method 'abort' is not a good idea, so naming it 'rollback' instead
-(defmethod rollback ((self connection) transaction)
-  "Abort a transaction with name TRANSACTION"
-  (let ((frame (make-instance 'frame :name "ABORT")))
-    (set-header frame "transaction" transaction)
-    (send self frame)))
-
-(defmethod disconnect ((self connection))
-  (with-slots (stream) self
-    (when (and stream (open-stream-p stream))
-      (let ((frame (make-instance 'frame :name "DISCONNECT")))
-        (send self frame))
-      (close stream)
-      (setf stream nil))))
+  (sending-frame (conn frame "COMMIT"
+                       "transaction" transaction)
+    ))
+
+;; Naming this method 'abort' is not a good idea, so calling it 'rollback' instead
+(defmethod rollback ((conn connection) (transaction string))
+  "Abort a transaction with name TRANSACTION."
+  (sending-frame (conn frame "ABORT"
+                       "transaction" transaction)
+    ))
index bc6d5c3..f34e224 100644 (file)
@@ -1,48 +1,48 @@
-;; -*- mode: lisp; indent-tabs-mode: nil; -*-
+;;; -*- mode: lisp; indent-tabs-mode: nil; -*-
 
-(defpackage :example
+(defpackage :stomp-example
   (:use :cl
-       :cl-user))
+        :cl-user))
+
+(in-package :stomp-example)
 
-(in-package :example)
 
 (defparameter *stomp* nil)
 (defparameter *health-request* "/topic/health-request")
 (defparameter *health-response* "/topic/health-response")
-(defparameter *host* "localhost")
-(defparameter *port* 61613)
+(defparameter *host* "mjr")     ;or "localhost"
+(defparameter *port* 61613)     ;by convention
 (defparameter *counter* 0)
 
 (defun out (fmt &rest arguments)
   (apply #'format *standard-output* fmt arguments)
-  (finish-output *standard-output*)
-  (force-output *standard-output*))
+  (finish-output *standard-output*))
 
 (defun callback (frame)
   (incf *counter*)
-  (out "[~a]~%" (stomp:frame-body-of frame))
+  (out "[~a]~%" (stomp:frame-body frame))
   (let ((msg (format nil "<pong><name>foo</name><count>~a</count></pong>" *counter*)))
     (stomp:post *stomp* msg *health-response*)))
 
 (defun bark (frame)
-  (let ((body (stomp:frame-body-of frame)))
+  (let ((body (stomp:frame-body frame)))
     (out "--> bark : ~a~%" body)))
 
 (defun chirp (frame)
-  (let ((body (stomp:frame-body-of frame)))
+  (let ((body (stomp:frame-body frame)))
     (out "--> chirp: ~a~%" body)))
 
 (defun run ()
   (setf *stomp* (stomp:make-connection *host* *port*))
-  (stomp:connect *stomp*)
   (stomp:register *stomp* #'callback *health-request*)
   (stomp:register *stomp* #'chirp *health-response*)
   (stomp:register *stomp* #'bark *health-response*)
   (stomp:start *stomp*))
 
 (defun start ()
-  (sb-thread:make-thread (lambda () (run)) :name "stomp-subscribe"))
+  #+sbcl (sb-thread:make-thread (lambda () (run)) :name "stomp-subscribe")
+  #+ccl  (ccl:process-run-function "stomp-subscribe" (lambda () (run))))
 
 (defun stop ()
-  (cl-stomp:stop *stomp*))
-
+  (stomp:stop *stomp*)
+  (setf *stomp* nil))