;; -*- mode: lisp; indent-tabs-mode: nil; -*-
+;;
+;; Available under MIT-style License. see COPYING.
+;;
(defpackage :cl-stomp
(:use :cl
:cl-user)
- (:export :frame
+ (:export :make-connection
+ :start
+ :stop
+ :frame
+ :frame-body-of
+ :frame-name-of
+ :frame-headers-of
:set-header
:get-header
:set-destination
:get-destination
- :frame-body-of
- :frame-name-of
- :stomp-connection
- :make-connection
- :connect
:register
+ :register-error
+ :subscribe
+ :unsubscribe
:post
- :start
- :stop))
+ :ack
+ :begin
+ :commit
+ :rollback))
(in-package :cl-stomp)
;;-------------------------------------------------------------------------
-;; socket interface
-
-(defun ip-addr-of (host-name)
- (let ((host (sb-bsd-sockets:get-host-by-name host-name)))
- (sb-bsd-sockets:host-ent-address host)))
-
-(defun inet-addr (ip-address)
- (sb-bsd-sockets:make-inet-address ip-address))
-
-(defun sk-connect (sock address port)
- (let ((ip-address (if (stringp address) (inet-addr address) address)))
- (sb-bsd-sockets:socket-connect sock ip-address port)))
-
-(defun sk-file-desc (sock)
- (sb-bsd-sockets:socket-file-descriptor sock))
-
-(defun sk-close (sock)
- (let ((fd (sb-bsd-sockets:socket-file-descriptor sock)))
- (sb-sys:invalidate-descriptor fd)
- (sb-bsd-sockets:socket-close sock)))
-
-(defun sk-make ()
- (let ((sock (make-instance 'sb-bsd-sockets:inet-socket
- :type :stream :protocol :tcp)))
- (setf (sb-bsd-sockets:sockopt-reuse-address sock) t)
- sock))
-
-;;-------------------------------------------------------------------------
;; convenience utils
+#+nil
(defun log-debug (fmt &rest args)
(apply #'format *standard-output* fmt args)
(finish-output *standard-output*))
+#-nil
+(defun log-debug (fmt &rest args)
+ (declare (ignore fmt args)))
+
(defun string-from-bytes (bytes)
(map 'string #'code-char bytes))
+(defun string-to-bytes (string)
+ (map 'sequence 'char-code string))
+
(defun string-strip (str)
"Remove spaces, tabs and line enders from a string."
(declare (string str))
(let ((result (make-array '(0) :element-type 'character :fill-pointer 0 :adjustable t)))
(with-output-to-string (stream result)
(dolist (line lines)
- (write-line line stream)))
- (string-strip result)))
-
-(defun string-split (str &key (delim " ") (limit nil))
- "Returns a list of words in STR broken at the DELIM boundary."
-
- (labels ((pop-word (str &key (delim " "))
- "Returns the first word and rest of STR broken at DELIM."
- (let ((start (position delim str :test 'string=)))
- (if (null start)
- (values str nil)
- (let ((start2 (min (length str)
- (+ 1 start))))
- (values (subseq str 0 start)
- (subseq str start2))))))
-
- (splitter (str delim limit count)
- (if (null str)
- nil
- (if (and (not (null limit))
- (>= count limit))
- (list str)
- (multiple-value-bind (word rest) (pop-word str :delim delim)
- (if (not (zerop (length word)))
- (append (list (string-strip word))
- (splitter rest delim limit (incf count)))
- (splitter rest delim limit (incf count))))))))
-
- (splitter str delim limit 0)))
+ (write-line line stream)))))
+
+(defun string-split (string delim)
+ "Splits STRING at the first occurance of DELIM and returns the before and after subsequences.
+ If DELIM is not found in STRING, returns STRING and NIL."
+ (when string
+ (let ((start (search delim string :test 'string=)))
+ (if (null start)
+ (values string nil)
+ (let ((start2 (min (length string) (+ (length delim) start))))
+ (values (subseq string 0 start)
+ (subseq string start2)))))))
(defun string-lines (string)
(let ((result ()))
:do (setf result (pushnew line result))))
(nreverse result)))
-(defun string-contains (string str-or-char-list)
- "Returns true if STR-OR-CHAR-LIST is contained in STRING."
- (declare (type string string))
- (search (coerce str-or-char-list 'string) string :test #'string=))
-
;;-------------------------------------------------------------------------
;; frame
-(defgeneric set-destination (frame destination)
- (:documentation "Set the destination header for the frame."))
-
-(defgeneric get-header (frame key)
- (:documentation ""))
-
-(defgeneric get-destination (frame)
- (:documentation ""))
-
-(defgeneric set-header (frame key value)
- (:documentation ""))
-
(defclass frame ()
((name
:initform "MESSAGE"
:accessor frame-body-of)))
(defun make-frame (string)
-
+ "Construct a frame by parsing STRING according to protocol."
;; declare some useful local functions
(labels (
+ (get-line (stream)
+ (let ((line (read-line stream nil 'eof)))
+ (unless (eql line 'eof)
+ line)))
(make-header (line)
- (map 'list #'string-strip (string-split line :delim ":" :limit 1)))
+ (multiple-value-bind (before after)
+ (string-split line ":")
+ (list (string-strip before) (string-strip after))))
;; frame name is first line
- (find-name (source)
- (first source))
+ (get-name (stream)
+ (get-line stream))
;; frame headers are second lines through to empty line
- (find-headers (source)
- (let ((lines (rest source)))
- (loop
- :for line :in lines
- :while (> (length line) 0)
- :collect (make-header line))))
+ (get-headers (stream)
+ (loop
+ :for line = (get-line stream)
+ :while (> (length line) 0)
+ :collect (make-header line)))
;; frame body is all lines after the empty line
- (find-body (source)
- (let* ((lines (reverse source))
- (result (loop
- :for line :in lines
- :while (> (length line) 0)
- :collect line)))
- (string-join (nreverse result)))))
-
- (let ((lines (string-lines string)))
- (let ((name (find-name lines))
- (headers (find-headers lines))
- (body (find-body lines)))
+ (get-body (stream)
+ (coerce (loop
+ :for c = (read-char stream nil 'eof)
+ :while (not (eql c 'eof))
+ :collect c)
+ 'string)))
+
+ (with-input-from-string (stream string)
+ (let ((name (get-name stream))
+ (headers (get-headers stream))
+ (body (get-body stream)))
(make-instance 'frame :name name :headers headers :body body)))))
(defmethod print-object ((self frame) stream)
+ "Print FRAME to STREAM according to protocol."
(with-slots (name headers body) self
(format stream "~A~%" name)
(dolist (header headers)
(format stream "~%~a~%~a~%" body (code-char 0))))
(defmethod set-destination ((self frame) destination)
+ "Set the destination header for FRAME."
(set-header self "destination" destination))
+(defmethod set-client-ack ((self frame))
+ "Specify a 'client' ack header for FRAME"
+ (set-header self "ack" "client"))
+
+(defun header= (header1 header2)
+ "Case insensitive comparison func for headers."
+ (string= (string-downcase header1) (string-downcase header2)))
+
(defmethod get-header ((self frame) key)
+ "Return the value of the header named KEY, or NIL if it doesn't exist."
(with-slots (headers) self
- (second (assoc key headers :test #'string=))))
+ (second (assoc key headers :test #'header=))))
(defmethod get-destination ((self frame))
+ "Return the value of the destination header."
(get-header self "destination"))
(defmethod set-header ((self frame) key value)
- (with-slots (headers) self
- (if (not (assoc key headers :test #'string=))
+ "Add a header named KEY to FRAME with VALUE.
+ If the header already exists, VALUE is appended to the existing value(s)."
+ (when value
+ (with-slots (headers) self
+ (if (not (assoc key headers :test #'header=))
(setf headers (append (list (list key value)) headers))
(let ((result))
(dolist (header headers)
- (if (string= (first header) key)
- (push (list key value) result)
- (push header result)))
- (format t "result: ~s~%" result)
- (setf headers result)))))
+ (if (header= (first header) key)
+ (push (list key value) result)
+ (push header result)))
+ (setf headers result))))))
+
+(defmethod error-frame-p ((self frame))
+ (string= (string-downcase (frame-name-of self)) "error"))
;;-------------------------------------------------------------------------
;; stomp
-(defgeneric connect (connection &key user pass)
- (:documentation "Connect to the stomp server."))
+(defgeneric get-header (frame key)
+ (:documentation ""))
+
+(defgeneric set-header (frame key value)
+ (:documentation ""))
-(defgeneric disconnect (connection)
- (:documentation "Disconnection from the stomp server."))
+(defgeneric get-destination (frame)
+ (:documentation ""))
-(defgeneric send (connection frame)
- (:documentation "Send a string to stomp."))
+(defgeneric set-destination (frame destination)
+ (:documentation "Set the destination header for the frame."))
-(defgeneric post (connection message destination)
+(defgeneric post (connection message destination &optional headers)
(:documentation "Post a message to a destination."))
-(defgeneric register (connection callback destination)
- (:documentation "Register a listener for messages to a destination."))
+(defgeneric ack (connection frame &optional transaction)
+ (:documentation "Send the client ack for frame."))
-(defgeneric receive (connection)
- (:documentation "Read data from stomp."))
+(defgeneric begin (connection transaction)
+ (:documentation "Start a transaction for the given name"))
-(defgeneric subscribe (connection destination)
- (:documentation "Subscribe to a topic."))
+(defgeneric commit (connection transaction)
+ (:documentation "Commit a transaction for the given name"))
-(defgeneric start (connection)
- (:documentation "Start listening for messages from stomp."))
+(defgeneric rollback (connection transaction)
+ (:documentation "Abort a transaction for the given name"))
+
+(defgeneric register (connection callback destination &optional client-ack?)
+ (:documentation "Register a listener for messages to a destination.
+ Callback should be a lambda which accepts a frame argument.
+ If client-ack? is t, client is responsible for sending ACK."))
+
+(defgeneric register-error (connection callback)
+ (:documentation "Register a listener for STOMP error frames."))
+
+(defgeneric subscribe (connection destination &optional client-ack?)
+ (:documentation "Subscribe to a topic or queue."))
-(defgeneric apply-callbacks (instance frame)
- (:documentation "Send message to registered callbacks."))
+(defgeneric unsubscribe (connection destination)
+ (:documentation "Unsubscribe from a topic or queue."))
+
+(defgeneric start (connection &key username passcode)
+ (:documentation "Start listening for messages from stomp."))
(defgeneric stop (connection)
(:documentation "Stop the connection with stomp."))
-(defclass stomp-connection ()
+(defclass registration ()
+ ((callback :initarg :callback) ; The callback func
+ (destination :initarg :destination) ; The topic/queue name
+ (client-ack? :initarg :client-ack?))) ; Use client (or auto) ack?
+
+(defclass connection ()
((host
:initform "localhost"
- :initarg :host
- :accessor host-of)
+ :initarg :host)
(port
:initform 61613
- :initarg :port
- :accessor port-of)
- (ip
- :initform "127.0.0.1"
- :initarg :ip
- :accessor ip-of)
- (socket
- :initform nil
- :initarg :socket
- :accessor socket-of)
- (fd
- :initform nil
- :initarg :fd
- :accessor fd-of)
- (handler
+ :initarg :port)
+ (stream
:initform nil
- :initarg :handler
- :accessor handler-of)
- (callbacks
- :initform ()
- :initarg :callbacks
- :accessor callbacks-of)
+ :initarg :stream)
+ (registrations
+ :initform '()
+ :initarg :registrations)
+ (error-callback
+ :initform nil)
(terminate
:initform nil)))
(defun make-connection (host port)
- (make-instance 'stomp-connection :host host :port port))
-
-(defmethod connect ((self stomp-connection) &key user pass)
- (handler-case
-
- ;; open the socket and send the connect string
-
- (with-slots (host port ip socket fd handler) self
- (setf ip (ip-addr-of host))
- (setf socket (sk-make))
- (sk-connect socket ip port)
- (setf fd (sk-file-desc socket))
- (setf handler (sb-sys:add-fd-handler fd :input (lambda (x)
- (declare (ignore x))
- (receive self))))
+ (make-instance 'connection :host host :port port))
+
+(defmethod start ((self connection) &key username passcode)
+ "Connects to the message broker, sends subscriptions for any existing registrations,
+ and enters a receive loop."
+ (handler-bind
+ ((t (lambda (e)
+ (disconnect self)
+ (log-debug "error: ~a" e))))
+ (with-slots (host port stream registrations terminate) self
+ (usocket:with-client-socket (socket strm host port
+ :protocol :stream
+ :element-type '(unsigned-byte 8))
+ (setf stream strm)
+ ;;send CONNECT frame
(let ((frame (make-instance 'frame :name "CONNECT")))
- (if user
- (set-header frame "login" user))
- (if pass
- (set-header frame "passcode" pass))
- (send self frame)))
-
- ;; just log the error for now
-
- (condition (c)
- (setf (socket-of self) nil)
- (log-debug "error-on-connect: ~a" c))))
-
-(defmethod subscribe ((self stomp-connection) destination)
- (let ((frame (make-instance 'frame :name "SUBSCRIBE")))
- (set-destination frame destination)
- (send self frame)))
-
-(defmethod start ((self stomp-connection))
- (with-slots (terminate) self
- (setf terminate nil)
- (loop
- :until terminate
- :do (sb-sys:serve-all-events 1))
- (log-debug "terminated")))
-
-(defmethod stop ((self stomp-connection))
+ (set-header frame "login" username)
+ (set-header frame "passcode" passcode)
+ (send self frame))
+
+ ;;send SUBSCRIBE frames
+ (loop
+ :for reg :in registrations
+ :do (with-slots (destination client-ack?) reg
+ (subscribe self destination client-ack?)))
+
+ ;;receive loop
+ (setf terminate nil)
+ (let ((recvbuf ""))
+ (loop
+ :until terminate
+ :do (let ((sock (car (usocket:wait-for-input socket :timeout 0))))
+ (if sock
+ (let* ((recvstr (receive self))
+ (newbuf (if (= (length recvbuf) 0) ;; only call string-join when necessary
+ recvstr
+ (string-join (list recvbuf recvstr)))))
+ (setf recvbuf (process_receive_buffer self newbuf)))
+ (sleep 0.1)))))
+ (log-debug "terminated")))))
+
+(defmethod stop ((self connection))
+ "Gracefully terminates the current receive loop and closes the connection to the message broker."
(with-slots (terminate) self
(setf terminate t)))
-(defmethod send ((self stomp-connection) (frame frame))
- (with-slots (fd) self
- (let ((stream (sb-sys:make-fd-stream fd :output t :element-type 'character :buffering :none)))
- (print frame stream)
- (finish-output stream))))
-
-(defmethod send ((self stomp-connection) string)
- (with-slots (fd) self
- (let ((stream (sb-sys:make-fd-stream fd :output t :element-type '(unsigned-byte 8)
- :buffering :none)))
- (write-sequence (sb-ext:string-to-octets string) stream)
- (finish-output stream))))
-
-(defmethod apply-callbacks ((self stomp-connection) (frame frame))
- (with-slots (callbacks) self
- (let ((destination (get-destination frame)))
- (loop
- :for (dest func) :in callbacks
- :do (if (string= dest destination)
- (funcall func frame))))))
+(defmethod apply-callbacks ((self connection) (frame frame))
+ "Send FRAME to any matching registered callbacks."
+ (with-slots (registrations error-callback) self
+ (if (error-frame-p frame)
+ (when error-callback
+ (funcall error-callback frame))
+ (let ((dest (get-destination frame)))
+ (loop
+ :for reg :in registrations
+ :do (with-slots (callback destination) reg
+ (if (string= dest destination)
+ (funcall callback frame))))))))
+
+(defmethod send ((self connection) (frame frame))
+ (send self (with-output-to-string (stream)
+ (print frame stream))))
+
+(defmethod send ((self connection) string)
+ (with-slots (stream) self
+ (write-sequence (string-to-bytes string) stream)
+ (finish-output stream)))
-(defmethod receive ((self stomp-connection))
- "Called whenever there's activity on the file descriptor."
- (with-slots (socket fd handler callbacks) self
- (let ((str (sb-sys:make-fd-stream fd :input t :element-type '(unsigned-byte 8)
- :buffering :full)))
- (let ((buffer (loop
- :for b = (read-byte str nil 'eof)
- :while (listen str)
- :collect b)))
+(defmethod receive ((self connection))
+ "Called whenever there's activity on the connection stream.
+ Reads from the stream and returns the received buffer as a string."
+ (with-slots (stream) self
+ (let ((buffer (loop
+ :for b = (read-byte stream nil 'eof)
+ :while (listen stream)
+ :collect b)))
(if (> (length buffer) 0)
+
+ ;; return the buffer as a string
+ (string-from-bytes buffer)
+
+ ;; otherwise, it means the other end has terminated,
+ ;; so close things down
+ (progn
+ (log-debug "nothing to read from socket, closing.~%")
+ (disconnect self))))))
+
+(defmethod process_receive_buffer ((self connection) buffer)
+ "Try to extract and process frame(s) from recvbuf. Returns unprocessed buffer."
+ (labels ((process-frame (frame)
+ (log-debug "frame: ~a" frame)
+ (apply-callbacks self frame))
+
+ (extract-frame ()
+ (multiple-value-bind (before after)
+ (string-split buffer (string (code-char 0)))
+ (when after
+ ;;got one
+ (process-frame (make-frame before))
+ (setf buffer after)))))
+
+ (loop
+ :while (extract-frame))
+
+ buffer))
+
+(defmethod subscribe ((self connection) destination &optional client-ack?)
+ (let ((frame (make-instance 'frame :name "SUBSCRIBE")))
+ (set-destination frame destination)
+ (when client-ack?
+ (set-client-ack frame))
+ (send self frame)))
- ;; if we got something, send it to all the
- ;; registered callbacks
-
- (let ((frame (make-frame (string-from-bytes buffer))))
- (apply-callbacks self frame))
-
- ;; otherwise, it means the other end has terminated,
- ;; so close things down (eventually go into a sleep/wait
- ;; loop in case of reconnection
+(defmethod unsubscribe ((self connection) destination)
+ (let ((frame (make-instance 'frame :name "UNSUBSCRIBE")))
+ (set-destination frame destination)
+ (send self frame)))
- (handler-case
- (progn
- (log-debug "nothing to read from socket~%")
- (disconnect self)
- (sb-sys:remove-fd-handler handler)
- (sk-close socket))
- (condition (c)
- (log-debug "close: ~a" c))))))))
+(defmethod register ((self connection) callback destination &optional client-ack?)
+ "Register a callback func for a destination. A subscription to the destination using
+ the optional client-ack? is issued for all callbacks as part of connecting to the mq server."
+ (with-slots (stream registrations) self
+ (if stream
+ (subscribe self destination client-ack?))
+ (setf registrations (append registrations (list (make-instance 'registration
+ :callback callback
+ :destination destination
+ :client-ack? client-ack?))))))
+
+(defmethod register-error ((self connection) callback)
+ "Register a callback func for STOMP error frames."
+ (with-slots (error-callback) self
+ (setf error-callback callback)))
+
+(defmethod post ((self connection) message destination &optional headers)
+ (let ((frame (make-instance 'frame :name "SEND" :body (string-strip message))))
+ (set-destination frame destination)
+ (loop
+ :for (key value) :in headers
+ :do (set-header frame key value))
+ (send self frame)))
-(defmethod register ((self stomp-connection) callback destination)
- (with-slots (callbacks) self
- (subscribe self destination)
- (setf callbacks (append callbacks (list (list destination callback))))))
+(defmethod ack ((self connection) frame &optional transaction)
+ "Send the client ack for FRAME and optional TRANSACTION"
+ (let ((ack-frame (make-instance 'frame :name "ACK")))
+ (set-header ack-frame "message-id" (get-header frame "message-id"))
+ (set-header ack-frame "transaction" transaction)
+ (send self ack-frame)))
+
+(defmethod begin ((self connection) transaction)
+ "Begin a transaction with name TRANSACTION"
+ (let ((frame (make-instance 'frame :name "BEGIN")))
+ (set-header frame "transaction" transaction)
+ (send self frame)))
-(defmethod post ((self stomp-connection) message destination)
- (let ((frame (make-instance 'frame :name "SEND" :body message)))
- (set-destination frame destination)
+(defmethod commit ((self connection) transaction)
+ "Commit a transaction with name TRANSACTION"
+ (let ((frame (make-instance 'frame :name "COMMIT")))
+ (set-header frame "transaction" transaction)
(send self frame)))
-(defmethod disconnect ((self stomp-connection))
- (let ((frame (make-instance 'frame :name "DISCONNECT")))
- (send self frame))
- (with-slots (handler socket) self
- (handler-case
- (sb-sys:remove-fd-handler handler)
- (condition (c)
- (log-debug "discconnect-handler-error: ~a" c)))
- (handler-case
- (sk-close socket)
- (condition (c)
- (log-debug "disconnect-socket-error: ~a" c)))))
+;; naming this method 'abort' is not a good idea, so naming it 'rollback' instead
+(defmethod rollback ((self connection) transaction)
+ "Abort a transaction with name TRANSACTION"
+ (let ((frame (make-instance 'frame :name "ABORT")))
+ (set-header frame "transaction" transaction)
+ (send self frame)))
+(defmethod disconnect ((self connection))
+ (with-slots (stream) self
+ (when (and stream (open-stream-p stream))
+ (let ((frame (make-instance 'frame :name "DISCONNECT")))
+ (send self frame))
+ (close stream)
+ (setf stream nil))))