:set-header
:get-destination
:set-destination
+ :pass-through-key-mapping-strategy
+ :amq-key-mapping-strategy
:register
+ :unregister-id
:register-error
:subscribe
:unsubscribe
+ :unsubscribe-id
:post
:ack
:begin
(declare (ignore fmt args)))
(defun string-strip (string)
- "Remove spaces, tabs and line enders from a string."
+ "Remove spaces, tabs and line enders from both ends of a string."
(check-type string string)
- (string-trim '(#\Space #\NewLine #\Return #\Tab #\Nul) string))
+ (string-trim '(#\Space #\NewLine #\Return #\Tab #\Nul #\Linefeed) string))
+
+(defun string-left-strip (string)
+ "Remove spaces, tabs and line enders from the beginning of a string."
+ (check-type string string)
+ (string-left-trim '(#\Space #\NewLine #\Return #\Tab #\Nul #\Linefeed) string))
(defun string-split (string delim)
"Splits STRING at the first occurrence of DELIM and returns the substrings before and after it.
(defgeneric stop (connection)
(:documentation "Stop the connection with STOMP."))
-(defgeneric register (connection callback destination &optional client-ack?)
+(defgeneric register (connection callback destination &key selector id client-ack?)
(:documentation "Register a listener for messages to a destination.
CALLBACK should be a function which accepts a frame argument.
- If CLIENT-ACK? is T, the client is responsible for sending ACK."))
+ SELECTOR can be used to provide an SQL 92 selector for filtering
+ messages. An ID may be given for later use with UNREGISTER-ID
+ to support overlapping subscriptions using selectors with the
+ same destination. If CLIENT-ACK? is T, the client is responsible
+ for sending ACK."))
+
+(defgeneric unregister-id (connection id)
+ (:documentation "Unregister a callback by id."))
(defgeneric register-error (connection callback)
(:documentation "Register a listener for STOMP error frames."))
-(defgeneric subscribe (connection destination &optional client-ack?)
- (:documentation "Subscribe to a topic or queue."))
+(defgeneric subscribe (connection destination &key selector id client-ack?)
+ (:documentation "Subscribe to a topic or queue.
+ SELECTOR can be used to provide an SQL 92 selector for filtering
+ messages. An ID may be given for later use with UNSUBSCRIBE-ID
+ to support overlapping subscriptions using selectors with the
+ same destination."))
(defgeneric unsubscribe (connection destination)
- (:documentation "Unsubscribe from a topic or queue."))
+ (:documentation "Unsubscribe from a topic or queue by name.
+ Unsubscribing does not unregister any callbacks."))
+
+(defgeneric unsubscribe-id (connection id)
+ (:documentation "Unsubscribe from a topic or queue by id.
+ Unsubscribing does not unregister any callbacks."))
(defgeneric post (connection message destination &optional headers)
(:documentation "Post a message to a destination.
:initarg :body
:accessor frame-body)))
-(defun make-frame-from-string (string)
+(defun make-frame-from-string (string connection)
"Construct a frame by parsing STRING according to the STOMP protocol."
;; Declare some useful local functions
(labels ((get-line (stream)
(unless (eql line 'eof)
line)))
(make-header (line)
- (multiple-value-bind (before after)
- (string-split line ":")
- (list (string-strip (string before)) (string-strip (string after)))))
+ (with-slots (key-mapping-strategy) connection
+ (multiple-value-bind (before after)
+ (string-split line ":")
+ (list (demangle-key key-mapping-strategy (string-strip (string before)))
+ (string-strip (string after))))))
;; Frame name is first line
(get-name (stream)
(get-line stream))
(progn ,@body)
(send ,connection ,vframe)))
-;; Note that this is not a pretty-printing function
-;; The format of the output is what's expected by the STOMP protocol
-(defmethod print-object ((frame frame) stream)
- "Print FRAME to STREAM according to the STOMP protocol."
+(defmethod print-object ((frame frame) stream)
(with-slots (name headers body) frame
- (format stream "~A~%" name)
- (dolist (header headers)
- (format stream "~A:~A~%" (first header) (second header)))
- (format stream "~%~A~A" body (code-char 0))))
-
+ (format stream "~A~%~A~%~A~%" name headers body)))
+
+(defmethod render-frame ((frame frame) connection)
+ (with-output-to-string (stream)
+ (with-slots (name headers body) frame
+ (format stream "~A~%" name)
+ (with-slots (key-mapping-strategy) connection
+ (dolist (header headers)
+ (format stream
+ "~A:~A~%"
+ (first header)
+ ;;(mangle-key key-mapping-strategy (first header))
+ (second header)))
+ (format stream "~%~A~A" body (code-char 0))))))
+
(defun header= (header1 header2)
"Case insensitive comparison function for headers."
(string-equal (string header1) (string header2)))
(defmethod set-header ((frame frame) (key string) value)
"Add a header named KEY to FRAME with VALUE, which can be of any type.
If the header already exists, VALUE replaces the existing value."
- (when value
+ (when value
(with-slots (headers) frame
(if (not (assoc key headers :test #'header=))
(setf headers (append (list (list key value)) headers))
(push (list key value) result)
(push header result)))
(setf headers result))))))
-
+
(defmethod get-destination ((frame frame))
"Return the destination header for FRAME."
(get-header frame "destination"))
"Set the destination header for FRAME."
(set-header frame "destination" destination))
+(defmethod get-subscription ((frame frame))
+ "Get the subscription header for FRAME, if one exists."
+ (get-header frame "subscription"))
+
+(defmethod set-selector ((frame frame) (selector string))
+ "Specify a 'selector' header for FRAME."
+ (set-header frame "selector" selector))
+
+(defmethod set-id ((frame frame) (id string))
+ "Specify an 'id' header for FRAME."
+ (set-header frame "id" id))
+
(defmethod set-client-ack ((frame frame))
- "Specify a 'client' ack header for FRAME"
+ "Specify a 'client' ack header for FRAME."
(set-header frame "ack" "client"))
(defmethod error-frame-p ((frame frame))
:initform nil
:initarg :callback)
(destination :type string ;the topic/queue name
- :initarg :destination)
+ :initarg :destination)
+ (selector :type (or null string) ;an SQL 92 selector, if provided
+ :initarg :selector)
+ (id :type (or null string) ;a subscription id, if provided
+ :initarg :id)
(client-ack? :initarg :client-ack?))) ;use client (or auto) ack?
;;;-------------------------------------------------------------------------
+;;; Key-mapping strategies
+
+(defgeneric mangle-key (strategy key))
+(defgeneric demangle-key (strategy key))
+
+
+(defclass pass-through-key-mapping-strategy () ())
+
+(defmethod mangle-key ((strategy pass-through-key-mapping-strategy) key)
+ key)
+
+(defmethod demangle-key ((strategy pass-through-key-mapping-strategy) key)
+ key)
+
+
+(defclass amq-key-mapping-strategy () ())
+
+(defvar *replacement-pairs*
+ '(("-" . "_HYPHEN_")
+ ("." . "_DOT_")))
+
+(defmethod mangle-key ((strategy amq-key-mapping-strategy) key)
+ (declare (ignore strategy))
+ (let ((str key))
+ (loop for pair in *replacement-pairs*
+ do (setf str (string-replace (car pair) (cdr pair) str)))
+ str))
+
+(defmethod demangle-key ((strategy amq-key-mapping-strategy) key)
+ (declare (ignore strategy))
+ (let ((str key))
+ (loop for pair in *replacement-pairs*
+ do (setf str (string-replace (cdr pair) (car pair) str)))
+ str))
+
+(defun string-replace (search replace string)
+ (loop for start = (search search (or result string)
+ :start2 (if start (1+ start) 0))
+ while start
+ as result = (concatenate 'string
+ (subseq (or result string) 0 start)
+ replace
+ (subseq (or result string) (+ start (length search))))
+ finally (return-from string-replace (or result string))))
+
+
+;;;-------------------------------------------------------------------------
;;; Connections
(defclass connection ()
(registrations :type list
:initform ()
:initarg :registrations)
+ (key-mapping-strategy :initarg :key-mapping-strategy
+ :initform (make-instance 'pass-through-key-mapping-strategy))
(error-callback :type (or null function)
:initform nil)
(terminate :initform nil)))
-(defun make-connection (host port)
+(defun make-connection (host port
+ &key (key-mapping-strategy 'pass-through-key-mapping-strategy))
(check-type host string)
(check-type port integer)
(make-instance 'connection
:host host
- :port port))
+ :port port
+ :key-mapping-strategy (make-instance key-mapping-strategy)))
+
+
;;;-------------------------------------------------------------------------
;;; Implementation of the API
(connect conn username passcode)
;; Send SUBSCRIBE frames
(loop for reg in registrations
- do (with-slots (destination client-ack?) reg
- (subscribe conn destination client-ack?)))
+ do (with-slots (destination selector id client-ack?) reg
+ (subscribe conn destination
+ :selector selector :id id :client-ack? client-ack?)))
;; The receive loop
(let ((recvbuf '()))
(loop until terminate
(defmethod send ((conn connection) (frame frame))
(send conn (with-output-to-string (stream)
- ;; We use 'princ' because the 'print-object' method on frames
- ;; does its output using the STOMP protocol
- (princ frame stream))))
+ (write-string (render-frame frame conn) stream))))
(defmethod send ((conn connection) (string string))
(with-slots (stream encoding) conn
(let* ((framevector (coerce buffer '(vector (unsigned-byte 8))))
(framestring (babel:octets-to-string framevector
:start 0 :end pos :encoding encoding)))
- (process-frame (make-frame-from-string (string-strip framestring)))
+ (process-frame (make-frame-from-string (string-left-strip framestring) conn))
(setf buffer (subseq buffer (1+ pos)))))))))
(loop while (extract-frame))
buffer))
(if (error-frame-p frame)
(when error-callback
(funcall error-callback frame))
- (let ((dest (get-destination frame)))
+ (let ((dest (get-destination frame))
+ (subscription (get-subscription frame)))
(loop for reg in registrations
- do (with-slots (callback destination) reg
- (when (and callback (destination= dest destination))
+ do (with-slots (callback destination id) reg
+ (when (and callback
+ ;; one or both could be nil
+ (string-equal subscription id)
+ ;; destination= will not return T for registrations using wildcards
+ ;; or temporary destinations, so allow a matching non-nil id to be
+ ;; sufficient for applying the callback
+ (or id (destination= dest destination)))
(funcall callback frame))))))))
-(defmethod register ((conn connection) callback (destination string) &optional client-ack?)
+(defmethod register ((conn connection) callback (destination string) &key selector id client-ack?)
"Register a callback for a destination. A subscription to the destination using the
optional client-ack? is issued for all callbacks as part of connecting to the MQ server."
(check-type callback (or null function))
(with-slots (stream registrations) conn
(when stream
- (subscribe conn destination client-ack?))
+ (subscribe conn destination :selector selector :id id :client-ack? client-ack?))
(setf registrations (append registrations (list (make-instance 'registration
:callback callback
:destination destination
+ :selector selector
+ :id id
:client-ack? client-ack?))))))
+(defmethod unregister-id ((conn connection) id)
+ (with-slots (stream registrations) conn
+ (when stream
+ (unsubscribe-id conn id))
+ (setf registrations (remove-if #'(lambda (reg)
+ (string-equal id (slot-value reg 'id))) registrations))))
+
(defmethod register-error ((conn connection) callback)
"Register an error callback for STOMP error frames."
(check-type callback (or null function))
(with-slots (error-callback) conn
(setf error-callback callback)))
-(defmethod subscribe ((conn connection) (destination string) &optional client-ack?)
+(defmethod subscribe ((conn connection) (destination string) &key selector id client-ack?)
(sending-frame (conn frame "SUBSCRIBE"
"destination" destination)
+ (when selector
+ (set-selector frame selector))
+ (when id
+ (set-id frame id))
(when client-ack?
(set-client-ack frame))))
(sending-frame (conn frame "UNSUBSCRIBE"
"destination" destination)))
+(defmethod unsubscribe-id ((conn connection) (id string))
+ (sending-frame (conn frame "UNSUBSCRIBE"
+ "id" id)))
+
(defmethod post ((conn connection) (message string) (destination string) &optional headers)
(sending-frame (conn frame "SEND"
"destination" destination)