Newer
Older
Francois-Rene Rideau
committed
;;; -*- mode: lisp; indent-tabs-mode: nil; -*-
;;;
;;; Available under MIT-style License. see COPYING.
;;;
(defpackage :cl-stomp
(:nicknames :stomp)
(:use :cl
Francois-Rene Rideau
committed
(:export :make-connection
:start
:stop
:frame
Francois-Rene Rideau
committed
:frame-body
:frame-name
:frame-headers
Francois-Rene Rideau
committed
:set-header
Francois-Rene Rideau
committed
:set-destination
:pass-through-key-mapping-strategy
:amq-key-mapping-strategy
:unregister-id
Francois-Rene Rideau
committed
:register-error
:subscribe
:unsubscribe
:unsubscribe-id
Francois-Rene Rideau
committed
:ack
:begin
:commit
:rollback))
Francois-Rene Rideau
committed
;;;-------------------------------------------------------------------------
;;; Convenience utils
Francois-Rene Rideau
committed
#+nil
Francois-Rene Rideau
committed
(fresh-line *standard-output*)
Francois-Rene Rideau
committed
(fresh-line *standard-output*)
(force-output *standard-output*))
Francois-Rene Rideau
committed
#-nil
(defun log-debug (fmt &rest args)
(declare (ignore fmt args)))
Francois-Rene Rideau
committed
(defun string-strip (string)
"Remove spaces, tabs and line enders from both ends of a string."
Francois-Rene Rideau
committed
(check-type string string)
(string-trim '(#\Space #\NewLine #\Return #\Tab #\Nul #\Linefeed) string))
(defun string-left-strip (string)
"Remove spaces, tabs and line enders from the beginning of a string."
(check-type string string)
(string-left-trim '(#\Space #\NewLine #\Return #\Tab #\Nul #\Linefeed) string))
Francois-Rene Rideau
committed
(defun string-split (string delim)
Francois-Rene Rideau
committed
"Splits STRING at the first occurrence of DELIM and returns the substrings before and after it.
Francois-Rene Rideau
committed
If DELIM is not found in STRING, returns STRING and NIL."
(when string
(let ((start (search delim string :test 'string=)))
(if (null start)
(values string nil)
(let ((start2 (min (length string) (+ (length delim) start))))
(values (subseq string 0 start)
(subseq string start2)))))))
Francois-Rene Rideau
committed
;;;-------------------------------------------------------------------------
;;; The CL-STOMP API
(defgeneric get-header (frame key)
(:documentation "Return the value of the header named KEY, or NIL if it doesn't exist."))
(defgeneric set-header (frame key value)
(:documentation "Add a header named KEY to FRAME with VALUE.
If the header already exists, VALUE replaces the existing value."))
(defgeneric get-destination (frame)
(:documentation "Return the destination header for FRAME."))
(defgeneric set-destination (frame destination)
(:documentation "Set the destination header for FRAME."))
(defgeneric start (connection &key username passcode)
(:documentation "Start listening for messages from STOMP."))
(defgeneric stop (connection)
(:documentation "Stop the connection with STOMP."))
(defgeneric register (connection callback destination &key selector id client-ack?)
Francois-Rene Rideau
committed
(:documentation "Register a listener for messages to a destination.
CALLBACK should be a function which accepts a frame argument.
SELECTOR can be used to provide an SQL 92 selector for filtering
messages. An ID may be given for later use with UNREGISTER-ID
to support overlapping subscriptions using selectors with the
same destination. If CLIENT-ACK? is T, the client is responsible
for sending ACK."))
(defgeneric unregister-id (connection id)
(:documentation "Unregister a callback by id."))
Francois-Rene Rideau
committed
(defgeneric register-error (connection callback)
(:documentation "Register a listener for STOMP error frames."))
(defgeneric subscribe (connection destination &key selector id client-ack?)
(:documentation "Subscribe to a topic or queue.
SELECTOR can be used to provide an SQL 92 selector for filtering
messages. An ID may be given for later use with UNSUBSCRIBE-ID
to support overlapping subscriptions using selectors with the
same destination."))
Francois-Rene Rideau
committed
(defgeneric unsubscribe (connection destination)
(:documentation "Unsubscribe from a topic or queue by name.
Unsubscribing does not unregister any callbacks."))
(defgeneric unsubscribe-id (connection id)
(:documentation "Unsubscribe from a topic or queue by id.
Unsubscribing does not unregister any callbacks."))
Francois-Rene Rideau
committed
(defgeneric post (connection message destination &optional headers)
(:documentation "Post a message to a destination.
HEADERS is an alist of header name and value pairs."))
(defgeneric ack (connection frame &optional transaction)
(:documentation "Send the client an ACK for frame."))
(defgeneric begin (connection transaction)
(:documentation "Start a transaction with the given name."))
(defgeneric commit (connection transaction)
(:documentation "Commit a transaction with the given name."))
(defgeneric rollback (connection transaction)
(:documentation "Abort a transaction with the given name."))
;;;-------------------------------------------------------------------------
;;; Frames
Francois-Rene Rideau
committed
((name :type string
:initform "MESSAGE"
:initarg :name
:accessor frame-name)
(headers :type list
:initform ()
:initarg :headers
:accessor frame-headers)
(body :type string
:initform ""
:initarg :body
:accessor frame-body)))
(defun make-frame-from-string (string connection)
Francois-Rene Rideau
committed
"Construct a frame by parsing STRING according to the STOMP protocol."
;; Declare some useful local functions
(labels ((get-line (stream)
Francois-Rene Rideau
committed
(let ((line (read-line stream nil 'eof)))
(unless (eql line 'eof)
line)))
(with-slots (key-mapping-strategy) connection
(multiple-value-bind (before after)
(string-split line ":")
(list (demangle-key key-mapping-strategy (string-strip (string before)))
(string-strip (string after))))))
Francois-Rene Rideau
committed
;; Frame name is first line
Francois-Rene Rideau
committed
(get-name (stream)
(get-line stream))
Francois-Rene Rideau
committed
;; Frame headers are second lines through to empty line
Francois-Rene Rideau
committed
(get-headers (stream)
Francois-Rene Rideau
committed
(loop for line = (get-line stream)
while (> (length line) 0)
collect (make-header line)))
;; Frame body is all the lines after the empty line
Francois-Rene Rideau
committed
(get-body (stream)
Francois-Rene Rideau
committed
(coerce (loop for c = (read-char stream nil 'eof)
while (not (eql c 'eof))
collect c)
'string)))
Francois-Rene Rideau
committed
(with-input-from-string (stream string)
(let ((name (get-name stream))
(headers (get-headers stream))
(body (get-body stream)))
Francois-Rene Rideau
committed
(make-instance 'frame
:name name
:headers headers
:body body)))))
;; Makes a frame with the given name and headers,
;; evaluates the body,
;; and then sends the frame over the connection
(defmacro sending-frame ((connection vframe name &rest headers) &body body)
`(let ((,vframe (make-instance 'frame :name ,name)))
,@(loop for (key val) on headers by #'cddr
collect (list 'set-header vframe key val))
(progn ,@body)
(send ,connection ,vframe)))
(defmethod print-object ((frame frame) stream)
Francois-Rene Rideau
committed
(with-slots (name headers body) frame
(format stream "~A~%~A~%~A~%" name headers body)))
(defmethod render-frame ((frame frame) connection)
(with-output-to-string (stream)
(with-slots (name headers body) frame
(format stream "~A~%" name)
(with-slots (key-mapping-strategy) connection
(dolist (header headers)
(format stream
"~A:~A~%"
(first header)
;;(mangle-key key-mapping-strategy (first header))
(second header)))
(format stream "~%~A~A" body (code-char 0))))))
Francois-Rene Rideau
committed
(defun header= (header1 header2)
Francois-Rene Rideau
committed
"Case insensitive comparison function for headers."
(string-equal (string header1) (string header2)))
Francois-Rene Rideau
committed
Francois-Rene Rideau
committed
(defmethod get-header ((frame frame) (key string))
Francois-Rene Rideau
committed
"Return the value of the header named KEY, or NIL if it doesn't exist."
Francois-Rene Rideau
committed
(with-slots (headers) frame
Francois-Rene Rideau
committed
(second (assoc key headers :test #'header=))))
Francois-Rene Rideau
committed
(defmethod set-header ((frame frame) (key string) value)
"Add a header named KEY to FRAME with VALUE, which can be of any type.
If the header already exists, VALUE replaces the existing value."
(when value
Francois-Rene Rideau
committed
(with-slots (headers) frame
Francois-Rene Rideau
committed
(if (not (assoc key headers :test #'header=))
Francois-Rene Rideau
committed
(let ((result ()))
Francois-Rene Rideau
committed
(if (header= (first header) key)
(push (list key value) result)
(push header result)))
(setf headers result))))))
Francois-Rene Rideau
committed
(defmethod get-destination ((frame frame))
"Return the destination header for FRAME."
(get-header frame "destination"))
Francois-Rene Rideau
committed
(defmethod set-destination ((frame frame) (destination string))
"Set the destination header for FRAME."
(set-header frame "destination" destination))
Francois-Rene Rideau
committed
(defmethod get-subscription ((frame frame))
"Get the subscription header for FRAME, if one exists."
(get-header frame "subscription"))
(defmethod set-selector ((frame frame) (selector string))
"Specify a 'selector' header for FRAME."
(set-header frame "selector" selector))
(defmethod set-id ((frame frame) (id string))
"Specify an 'id' header for FRAME."
(set-header frame "id" id))
Francois-Rene Rideau
committed
(defmethod set-client-ack ((frame frame))
"Specify a 'client' ack header for FRAME."
Francois-Rene Rideau
committed
(set-header frame "ack" "client"))
Francois-Rene Rideau
committed
Francois-Rene Rideau
committed
(defmethod error-frame-p ((frame frame))
(string-equal (frame-name frame) "error"))
Francois-Rene Rideau
committed
Francois-Rene Rideau
committed
;;;-------------------------------------------------------------------------
;;; Registrations
Francois-Rene Rideau
committed
Francois-Rene Rideau
committed
(defclass registration ()
((callback :type (or null function) ;the callback function
:initform nil
:initarg :callback)
(destination :type string ;the topic/queue name
:initarg :destination)
(selector :type (or null string) ;an SQL 92 selector, if provided
:initarg :selector)
(id :type (or null string) ;a subscription id, if provided
:initarg :id)
Francois-Rene Rideau
committed
(client-ack? :initarg :client-ack?))) ;use client (or auto) ack?
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
;;;-------------------------------------------------------------------------
;;; Key-mapping strategies
(defgeneric mangle-key (strategy key))
(defgeneric demangle-key (strategy key))
(defclass pass-through-key-mapping-strategy () ())
(defmethod mangle-key ((strategy pass-through-key-mapping-strategy) key)
key)
(defmethod demangle-key ((strategy pass-through-key-mapping-strategy) key)
key)
(defclass amq-key-mapping-strategy () ())
(defvar *replacement-pairs*
'(("-" . "_HYPHEN_")
("." . "_DOT_")))
(defmethod mangle-key ((strategy amq-key-mapping-strategy) key)
(declare (ignore strategy))
(let ((str key))
(loop for pair in *replacement-pairs*
do (setf str (string-replace (car pair) (cdr pair) str)))
str))
(defmethod demangle-key ((strategy amq-key-mapping-strategy) key)
(declare (ignore strategy))
(let ((str key))
(loop for pair in *replacement-pairs*
do (setf str (string-replace (cdr pair) (car pair) str)))
str))
(defun string-replace (search replace string)
(loop for start = (search search (or result string)
:start2 (if start (1+ start) 0))
while start
as result = (concatenate 'string
(subseq (or result string) 0 start)
replace
(subseq (or result string) (+ start (length search))))
finally (return-from string-replace (or result string))))
Francois-Rene Rideau
committed
;;;-------------------------------------------------------------------------
;;; Connections
Francois-Rene Rideau
committed
(defclass connection ()
Francois-Rene Rideau
committed
((host :type string
:initform "localhost"
:initarg :host)
(port :type integer
:initform 61613
:initarg :port)
(stream :initform nil
:initarg :stream)
(encoding :initform :utf-8) ;only utf-8 is currently supported
Francois-Rene Rideau
committed
(registrations :type list
:initform ()
:initarg :registrations)
(key-mapping-strategy :initarg :key-mapping-strategy
:initform (make-instance 'pass-through-key-mapping-strategy))
Francois-Rene Rideau
committed
(error-callback :type (or null function)
:initform nil)
(terminate :initform nil)))
(defun make-connection (host port
&key (key-mapping-strategy 'pass-through-key-mapping-strategy))
Francois-Rene Rideau
committed
(check-type host string)
(check-type port integer)
(make-instance 'connection
:host host
:port port
:key-mapping-strategy (make-instance key-mapping-strategy)))
Francois-Rene Rideau
committed
;;;-------------------------------------------------------------------------
;;; Implementation of the API
Francois-Rene Rideau
committed
Francois-Rene Rideau
committed
(defmethod start ((conn connection) &key username passcode)
Francois-Rene Rideau
committed
"Connects to the message broker, sends subscriptions for any existing registrations,
and enters a receive loop."
(handler-bind
((t (lambda (e)
Francois-Rene Rideau
committed
(disconnect conn)
(log-debug "Error: ~A" e))))
(with-slots (host port stream registrations terminate) conn
Francois-Rene Rideau
committed
(usocket:with-client-socket (socket strm host port
Francois-Rene Rideau
committed
:protocol :stream
:element-type '(unsigned-byte 8))
Francois-Rene Rideau
committed
(setf stream strm)
Francois-Rene Rideau
committed
;; Send CONNECT frame
(connect conn username passcode)
;; Send SUBSCRIBE frames
(loop for reg in registrations
do (with-slots (destination selector id client-ack?) reg
(subscribe conn destination
:selector selector :id id :client-ack? client-ack?)))
Francois-Rene Rideau
committed
;; The receive loop
Francois-Rene Rideau
committed
(let ((recvbuf '()))
Francois-Rene Rideau
committed
(loop until terminate
do (let ((sock (car (usocket:wait-for-input socket :timeout 1))))
(when sock
Francois-Rene Rideau
committed
(let ((newbuf (append recvbuf (receive conn))))
Francois-Rene Rideau
committed
(setf recvbuf (process-receive-buffer conn newbuf)))))))
(disconnect conn)
(log-debug "Terminated")
(setf terminate nil)))))
Francois-Rene Rideau
committed
Francois-Rene Rideau
committed
(defmethod stop ((conn connection))
Francois-Rene Rideau
committed
"Gracefully terminates the current receive loop and closes the connection to the message broker."
Francois-Rene Rideau
committed
(with-slots (terminate) conn
Francois-Rene Rideau
committed
(defmethod connect ((conn connection) &optional username passcode)
(check-type username (or null string))
(check-type passcode (or null string))
(sending-frame (conn frame "CONNECT"
"login" username
Francois-Rene Rideau
committed
(defmethod disconnect ((conn connection))
(with-slots (stream) conn
(when stream
(when (open-stream-p stream)
(sending-frame (conn frame "DISCONNECT"))
(close stream))
Francois-Rene Rideau
committed
(setf stream nil))))
(defmethod send ((conn connection) (frame frame))
(send conn (with-output-to-string (stream)
(write-string (render-frame frame conn) stream))))
Francois-Rene Rideau
committed
Francois-Rene Rideau
committed
(defmethod send ((conn connection) (string string))
(with-slots (stream encoding) conn
(write-sequence (babel:string-to-octets string :encoding encoding) stream)
Francois-Rene Rideau
committed
(finish-output stream)))
Francois-Rene Rideau
committed
(defmethod receive ((conn connection))
Francois-Rene Rideau
committed
"Called whenever there's activity on the connection stream.
Reads from the stream and returns the received buffer as a list of bytes,
or NIL if the connection has been closed by the broker."
Francois-Rene Rideau
committed
(with-slots (stream) conn
(let ((buffer (loop while (listen stream)
as b = (read-byte stream nil 'eof)
unless (eql b 'eof)
collect b)))
Francois-Rene Rideau
committed
(if (> (length buffer) 0)
Francois-Rene Rideau
committed
;; Return the buffer
buffer
Francois-Rene Rideau
committed
;; Otherwise, it means the other end has terminated,
;; so close things down
(progn
(log-debug "Nothing to read from socket, closing.")
Francois-Rene Rideau
committed
(defmethod process-receive-buffer ((conn connection) buffer)
Francois-Rene Rideau
committed
"Try to extract and process frame(s) from buffer. Returns unprocessed buffer."
Francois-Rene Rideau
committed
(labels ((process-frame (frame)
Francois-Rene Rideau
committed
(log-debug "Frame: ~A" frame)
(apply-callbacks conn frame))
(extract-frame ()
;; Identify frames by looking for NULLs
;; This is safe with UTF-8 because a 0 will never appear within multibyte characters
;;--- TODO: Use content-length header when provided instead of relying on NULL delimiter
Francois-Rene Rideau
committed
(let ((pos (position 0 buffer)))
(when pos
(with-slots (encoding) conn
(let* ((framevector (coerce buffer '(vector (unsigned-byte 8))))
(framestring (babel:octets-to-string framevector
:start 0 :end pos :encoding encoding)))
(process-frame (make-frame-from-string (string-left-strip framestring) conn))
(setf buffer (subseq buffer (1+ pos)))))))))
Francois-Rene Rideau
committed
(loop while (extract-frame))
Francois-Rene Rideau
committed
buffer))
(defun destination= (actual registered)
"Returns T if the REGISTERED destination matches the ACTUAL destination."
;;--- TODO: Implement wildcard matching? (NOTE: not all message brokers support wildcard matching)
(string-equal actual registered))
Francois-Rene Rideau
committed
(defmethod apply-callbacks ((conn connection) (frame frame))
"Send FRAME to any matching registered callbacks."
(with-slots (registrations error-callback) conn
(if (error-frame-p frame)
(when error-callback
(funcall error-callback frame))
(let ((dest (get-destination frame))
(subscription (get-subscription frame)))
Francois-Rene Rideau
committed
(loop for reg in registrations
do (with-slots (callback destination id) reg
(when (and callback
;; one or both could be nil
(string-equal subscription id)
;; destination= will not return T for registrations using wildcards
;; or temporary destinations, so allow a matching non-nil id to be
;; sufficient for applying the callback
(or id (destination= dest destination)))
Francois-Rene Rideau
committed
(funcall callback frame))))))))
(defmethod register ((conn connection) callback (destination string) &key selector id client-ack?)
Francois-Rene Rideau
committed
"Register a callback for a destination. A subscription to the destination using the
optional client-ack? is issued for all callbacks as part of connecting to the MQ server."
(check-type callback (or null function))
(with-slots (stream registrations) conn
(when stream
(subscribe conn destination :selector selector :id id :client-ack? client-ack?))
Francois-Rene Rideau
committed
(setf registrations (append registrations (list (make-instance 'registration
:callback callback
:destination destination
:selector selector
:id id
Francois-Rene Rideau
committed
:client-ack? client-ack?))))))
(defmethod unregister-id ((conn connection) id)
(with-slots (stream registrations) conn
(when stream
(unsubscribe-id conn id))
(setf registrations (remove-if #'(lambda (reg)
(string-equal id (slot-value reg 'id))) registrations))))
Francois-Rene Rideau
committed
(defmethod register-error ((conn connection) callback)
"Register an error callback for STOMP error frames."
(check-type callback (or null function))
(with-slots (error-callback) conn
Francois-Rene Rideau
committed
(setf error-callback callback)))
(defmethod subscribe ((conn connection) (destination string) &key selector id client-ack?)
Francois-Rene Rideau
committed
(sending-frame (conn frame "SUBSCRIBE"
"destination" destination)
(when selector
(set-selector frame selector))
(when id
(set-id frame id))
Francois-Rene Rideau
committed
(when client-ack?
(set-client-ack frame))))
(defmethod unsubscribe ((conn connection) (destination string))
(sending-frame (conn frame "UNSUBSCRIBE"
Francois-Rene Rideau
committed
(defmethod unsubscribe-id ((conn connection) (id string))
(sending-frame (conn frame "UNSUBSCRIBE"
"id" id)))
Francois-Rene Rideau
committed
(defmethod post ((conn connection) (message string) (destination string) &optional headers)
(sending-frame (conn frame "SEND"
"destination" destination)
(loop for (key value) in headers
unless (header= key "destination") ;don't overwrite the destination set above
do (set-header frame key value))
(setf (frame-body frame) message)))
Francois-Rene Rideau
committed
(defmethod ack ((conn connection) (for-frame frame) &optional transaction)
Francois-Rene Rideau
committed
"Send the client ack for FRAME and optional TRANSACTION"
Francois-Rene Rideau
committed
(check-type transaction (or null string))
(sending-frame (conn frame "ACK"
"message-id" (get-header for-frame "message-id")
Francois-Rene Rideau
committed
Francois-Rene Rideau
committed
(defmethod begin ((conn connection) (transaction string))
Francois-Rene Rideau
committed
"Begin a transaction with name TRANSACTION"
Francois-Rene Rideau
committed
(sending-frame (conn frame "BEGIN"
Francois-Rene Rideau
committed
(defmethod commit ((conn connection) (transaction string))
Francois-Rene Rideau
committed
"Commit a transaction with name TRANSACTION"
Francois-Rene Rideau
committed
(sending-frame (conn frame "COMMIT"
Francois-Rene Rideau
committed
;; Naming this method 'abort' is not a good idea, so calling it 'rollback' instead
(defmethod rollback ((conn connection) (transaction string))
"Abort a transaction with name TRANSACTION."
(sending-frame (conn frame "ABORT"