More development on the QRes side, by mjr, dschulz, agasparovic, qres-r496122
authorFrancois-Rene Rideau <fare@tunes.org>
Tue, 4 Oct 2011 21:37:27 +0000 (17:37 -0400)
committerFrancois-Rene Rideau <fare@tunes.org>
Tue, 4 Oct 2011 21:37:27 +0000 (17:37 -0400)
with cleanups by swm, eschwartz.

------------------------------------------------------------------------
r496122 | mjr | 2011-09-29 17:33:13 -0400 (Thu, 29 Sep 2011) | 40 lines

Implement transactional enqueuing directly to message broker

Tests that will verify: Added assertion to test-stomp-bypass-oaq
                        Added test-stomp-reconcile-tx-queued-message
                        test-stomp-client unit tests pass using local amq:
                        ran 11 unit tests (70 assertions, 0 failures, 0 errors)

------------------------------------------------------------------------
r492371 | swm | 2011-09-12 14:29:20 -0400 (Mon, 12 Sep 2011) | 61 lines

Clean up code along the way (e.g., remove code marked obsolete,
  get rid of spurious 'quake:' package prefixes, etc).

------------------------------------------------------------------------
r478768 | mjr | 2011-07-13 12:49:27 -0400 (Wed, 13 Jul 2011) | 21 lines

Fix buggy key-mapping code, stop trimming trailing whitespace on recvd frames.

Tests that will verify: test-stomp-client unit tests pass using local amq:
                        ran 10 unit tests (65 assertions, 0 failures, 0 errors)

------------------------------------------------------------------------
r478083 | eschwartz | 2011-07-08 18:23:20 -0400 (Fri, 08 Jul 2011) | 28 lines

Replace buggy defconstant with defvar.

------------------------------------------------------------------------
r475933 | dschulz | 2011-06-27 14:13:23 -0400 (Mon, 27 Jun 2011) | 25 lines

Add a header-mangling option to cl-stomp, and use it in our stomp code.

AMQ has a lot of assumptions baked in around how thier headers are
formatted. (no "-" or "." characters) This is all in the JMS spec, and
this brings our stomp implementation up to code WRT all that junk. We
mangle and demangle header strings in the same way that AMQ's stomp
implementation does.

------------------------------------------------------------------------
r474659 | mjr | 2011-06-21 13:31:31 -0400 (Tue, 21 Jun 2011) | 54 lines

Tests that will verify: test-stomp-client unit tests pass using local amq:
                        ran 10 unit tests (65 assertions, 0 failures, 0 errors)

------------------------------------------------------------------------
r471116 | agasparovic | 2011-06-03 11:32:18 -0400 (Fri, 03 Jun 2011) | 18 lines

Added support for message selectors to the CL-STOMP library

CL-STOMP now allows subscribing to a destination with an SQL 92-style selector
string, which will filter messages by their properties (headers) at the
broker. It also supports identifying a subscription with a string, which may be
used later in conjunction with a new UNSUBSCRIBE-ID method. This is useful in
cases of overlapping subscriptions to the same destination, but with differing
selectors; a client can either UNSUBSCRIBE all subscriptions to the destination
or one by id.

------------------------------------------------------------------------
r468880 | mjr | 2011-05-24 11:37:48 -0400 (Tue, 24 May 2011) | 21 lines

Handle malformed stomp frame headers

Tests that will verify: test-stomp-client unit tests pass using local amq:
                        ran 8 unit tests (56 assertions, 0 failures, 0 errors)

cl-stomp.lisp

index b67641e..b63b165 100644 (file)
            :set-header
            :get-destination
            :set-destination
+           :pass-through-key-mapping-strategy
+           :amq-key-mapping-strategy
            :register
+           :unregister-id
            :register-error
            :subscribe
            :unsubscribe
+           :unsubscribe-id
            :post
            :ack
            :begin
   (declare (ignore fmt args)))
 
 (defun string-strip (string)
-  "Remove spaces, tabs and line enders from a string."
+  "Remove spaces, tabs and line enders from both ends of a string."
   (check-type string string)
-  (string-trim '(#\Space #\NewLine #\Return #\Tab #\Nul) string))
+  (string-trim '(#\Space #\NewLine #\Return #\Tab #\Nul #\Linefeed) string))
+
+(defun string-left-strip (string)
+  "Remove spaces, tabs and line enders from the beginning of a string."
+  (check-type string string)
+  (string-left-trim '(#\Space #\NewLine #\Return #\Tab #\Nul #\Linefeed) string))
 
 (defun string-split (string delim)
   "Splits STRING at the first occurrence of DELIM and returns the substrings before and after it.
 (defgeneric stop (connection)
   (:documentation "Stop the connection with STOMP."))
 
-(defgeneric register (connection callback destination &optional client-ack?)
+(defgeneric register (connection callback destination &key selector id client-ack?)
   (:documentation "Register a listener for messages to a destination.
                    CALLBACK should be a function which accepts a frame argument.
-                   If CLIENT-ACK? is T, the client is responsible for sending ACK."))
+                   SELECTOR can be used to provide an SQL 92 selector for filtering
+                   messages. An ID may be given for later use with UNREGISTER-ID
+                   to support overlapping subscriptions using selectors with the
+                   same destination. If CLIENT-ACK? is T, the client is responsible
+                   for sending ACK."))
+
+(defgeneric unregister-id (connection id)
+  (:documentation "Unregister a callback by id."))
 
 (defgeneric register-error (connection callback)
   (:documentation "Register a listener for STOMP error frames."))
 
-(defgeneric subscribe (connection destination &optional client-ack?)
-  (:documentation "Subscribe to a topic or queue."))
+(defgeneric subscribe (connection destination &key selector id client-ack?)
+  (:documentation "Subscribe to a topic or queue.
+                   SELECTOR can be used to provide an SQL 92 selector for filtering
+                   messages. An ID may be given for later use with UNSUBSCRIBE-ID
+                   to support overlapping subscriptions using selectors with the
+                   same destination."))
 
 (defgeneric unsubscribe (connection destination)
-  (:documentation "Unsubscribe from a topic or queue."))
+  (:documentation "Unsubscribe from a topic or queue by name.
+                   Unsubscribing does not unregister any callbacks."))
+
+(defgeneric unsubscribe-id (connection id)
+  (:documentation "Unsubscribe from a topic or queue by id.
+                   Unsubscribing does not unregister any callbacks."))
 
 (defgeneric post (connection message destination &optional headers)
   (:documentation "Post a message to a destination.
          :initarg :body
          :accessor frame-body)))
 
-(defun make-frame-from-string (string)
+(defun make-frame-from-string (string connection)
   "Construct a frame by parsing STRING according to the STOMP protocol."
   ;; Declare some useful local functions
   (labels ((get-line (stream)
                (unless (eql line 'eof)
                  line)))
            (make-header (line)
-             (multiple-value-bind (before after) 
-                 (string-split line ":")
-               (list (string-strip (string before)) (string-strip (string after)))))
+             (with-slots (key-mapping-strategy) connection
+               (multiple-value-bind (before after) 
+                   (string-split line ":")
+                 (list (demangle-key key-mapping-strategy (string-strip (string before))) 
+                       (string-strip (string after))))))
            ;; Frame name is first line
            (get-name (stream)
              (get-line stream))
      (progn ,@body)
      (send ,connection ,vframe)))
 
-;; Note that this is not a pretty-printing function
-;; The format of the output is what's expected by the STOMP protocol
-(defmethod print-object ((frame frame) stream)
-  "Print FRAME to STREAM according to the STOMP protocol."
+(defmethod print-object ((frame frame) stream) 
   (with-slots (name headers body) frame
-    (format stream "~A~%" name)
-    (dolist (header headers)
-      (format stream "~A:~A~%" (first header) (second header)))
-    (format stream "~%~A~A" body (code-char 0))))
-
+    (format stream "~A~%~A~%~A~%" name headers body)))
+
+(defmethod render-frame ((frame frame) connection)
+  (with-output-to-string (stream)
+    (with-slots (name headers body) frame
+      (format stream "~A~%" name)
+      (with-slots (key-mapping-strategy) connection
+        (dolist (header headers)
+          (format stream 
+                  "~A:~A~%" 
+                  (first header)
+                  ;;(mangle-key key-mapping-strategy (first header))
+                  (second header)))
+        (format stream "~%~A~A" body (code-char 0))))))
+  
 (defun header= (header1 header2)
   "Case insensitive comparison function for headers."
   (string-equal (string header1) (string header2)))
 (defmethod set-header ((frame frame) (key string) value)
   "Add a header named KEY to FRAME with VALUE, which can be of any type.
    If the header already exists, VALUE replaces the existing value."
-  (when value
+  (when value    
     (with-slots (headers) frame
       (if (not (assoc key headers :test #'header=))
         (setf headers (append (list (list key value)) headers))
               (push (list key value) result)
               (push header result)))
           (setf headers result))))))
-
+  
 (defmethod get-destination ((frame frame))
   "Return the destination header for FRAME."
   (get-header frame "destination"))
   "Set the destination header for FRAME."
   (set-header frame "destination" destination))
 
+(defmethod get-subscription ((frame frame))
+  "Get the subscription header for FRAME, if one exists."
+  (get-header frame "subscription"))
+
+(defmethod set-selector ((frame frame) (selector string))
+  "Specify a 'selector' header for FRAME."
+  (set-header frame "selector" selector))
+
+(defmethod set-id ((frame frame) (id string))
+  "Specify an 'id' header for FRAME."
+  (set-header frame "id" id))
+
 (defmethod set-client-ack ((frame frame))
-  "Specify a 'client' ack header for FRAME"
+  "Specify a 'client' ack header for FRAME."
   (set-header frame "ack" "client"))
 
 (defmethod error-frame-p ((frame frame))
                 :initform nil
                 :initarg :callback)
    (destination :type string             ;the topic/queue name
-                :initarg :destination) 
+                :initarg :destination)
+   (selector    :type (or null string)   ;an SQL 92 selector, if provided
+                :initarg :selector)
+   (id          :type (or null string)   ;a subscription id, if provided
+                :initarg :id)
    (client-ack? :initarg :client-ack?))) ;use client (or auto) ack?
 
 
 ;;;-------------------------------------------------------------------------
+;;; Key-mapping strategies
+
+(defgeneric mangle-key (strategy key))
+(defgeneric demangle-key (strategy key))
+
+
+(defclass pass-through-key-mapping-strategy () ())
+
+(defmethod mangle-key ((strategy pass-through-key-mapping-strategy) key)
+  key)
+
+(defmethod demangle-key ((strategy pass-through-key-mapping-strategy) key)
+  key)
+
+
+(defclass amq-key-mapping-strategy () ())
+
+(defvar *replacement-pairs* 
+  '(("-" . "_HYPHEN_")
+    ("." . "_DOT_")))
+
+(defmethod mangle-key ((strategy amq-key-mapping-strategy) key)
+  (declare (ignore strategy))
+  (let ((str key))
+    (loop for pair in *replacement-pairs*          
+          do (setf str (string-replace (car pair) (cdr pair) str)))    
+    str))
+
+(defmethod demangle-key ((strategy amq-key-mapping-strategy) key)
+  (declare (ignore strategy))
+  (let ((str key))
+    (loop for pair in *replacement-pairs*          
+          do (setf str (string-replace (cdr pair) (car pair) str)))    
+    str))
+
+(defun string-replace (search replace string)
+  (loop for start = (search search (or result string)
+                            :start2 (if start (1+ start) 0))
+        while start
+        as result = (concatenate 'string
+                      (subseq (or result string) 0 start)
+                      replace
+                      (subseq (or result string) (+ start (length search))))
+        finally (return-from string-replace (or result string))))
+
+
+;;;-------------------------------------------------------------------------
 ;;; Connections
 
 (defclass connection ()
    (registrations :type list
                   :initform ()
                   :initarg :registrations)
+   (key-mapping-strategy :initarg :key-mapping-strategy
+                         :initform (make-instance 'pass-through-key-mapping-strategy))
    (error-callback :type (or null function)
                    :initform nil)
    (terminate :initform nil)))
 
-(defun make-connection (host port)
+(defun make-connection (host port
+                        &key (key-mapping-strategy 'pass-through-key-mapping-strategy))
   (check-type host string)
   (check-type port integer)
   (make-instance 'connection
     :host host
-    :port port))
+    :port port
+    :key-mapping-strategy (make-instance key-mapping-strategy)))
+
+
 
 ;;;-------------------------------------------------------------------------
 ;;; Implementation of the API
         (connect conn username passcode)
         ;; Send SUBSCRIBE frames
         (loop for reg in registrations
-              do (with-slots (destination client-ack?) reg
-                   (subscribe conn destination client-ack?)))
+              do (with-slots (destination selector id client-ack?) reg
+                   (subscribe conn destination
+                              :selector selector :id id :client-ack? client-ack?)))
         ;; The receive loop
         (let ((recvbuf '()))
           (loop until terminate
 
 (defmethod send ((conn connection) (frame frame))
   (send conn (with-output-to-string (stream)
-               ;; We use 'princ' because the 'print-object' method on frames
-               ;; does its output using the STOMP protocol
-               (princ frame stream))))
+               (write-string (render-frame frame conn) stream))))
 
 (defmethod send ((conn connection) (string string))
   (with-slots (stream encoding) conn 
                    (let* ((framevector (coerce buffer '(vector (unsigned-byte 8))))
                           (framestring (babel:octets-to-string framevector 
                                          :start 0 :end pos :encoding encoding)))
-                     (process-frame (make-frame-from-string (string-strip framestring)))
+                     (process-frame (make-frame-from-string (string-left-strip framestring) conn))
                      (setf buffer (subseq buffer (1+ pos)))))))))
     (loop while (extract-frame))
     buffer))
     (if (error-frame-p frame)
       (when error-callback
         (funcall error-callback frame))
-      (let ((dest (get-destination frame)))
+      (let ((dest (get-destination frame))
+            (subscription (get-subscription frame)))
         (loop for reg in registrations
-              do (with-slots (callback destination) reg
-                   (when (and callback (destination= dest destination))
+              do (with-slots (callback destination id) reg
+                   (when (and callback
+                              ;; one or both could be nil
+                              (string-equal subscription id)
+                              ;; destination= will not return T for registrations using wildcards
+                              ;; or temporary destinations, so allow a matching non-nil id to be
+                              ;; sufficient for applying the callback
+                              (or id (destination= dest destination)))
                      (funcall callback frame))))))))
 
-(defmethod register ((conn connection) callback (destination string) &optional client-ack?)
+(defmethod register ((conn connection) callback (destination string) &key selector id client-ack?)
   "Register a callback for a destination.  A subscription to the destination using the
    optional client-ack? is issued for all callbacks as part of connecting to the MQ server."
   (check-type callback (or null function))
   (with-slots (stream registrations) conn
     (when stream
-      (subscribe conn destination client-ack?))
+      (subscribe conn destination :selector selector :id id :client-ack? client-ack?))
     (setf registrations (append registrations (list (make-instance 'registration 
                                                       :callback callback
                                                       :destination destination
+                                                      :selector selector
+                                                      :id id
                                                       :client-ack? client-ack?))))))
 
+(defmethod unregister-id ((conn connection) id)
+  (with-slots (stream registrations) conn
+    (when stream
+      (unsubscribe-id conn id))
+    (setf registrations (remove-if #'(lambda (reg) 
+                                       (string-equal id (slot-value reg 'id))) registrations))))
+
 (defmethod register-error ((conn connection) callback)
   "Register an error callback for STOMP error frames."
   (check-type callback (or null function))
   (with-slots (error-callback) conn
     (setf error-callback callback)))
 
-(defmethod subscribe ((conn connection) (destination string) &optional client-ack?)
+(defmethod subscribe ((conn connection) (destination string) &key selector id client-ack?)
   (sending-frame (conn frame "SUBSCRIBE"
                        "destination" destination)
+    (when selector
+      (set-selector frame selector))
+    (when id
+      (set-id frame id))
     (when client-ack?
       (set-client-ack frame))))
 
   (sending-frame (conn frame "UNSUBSCRIBE"
                        "destination" destination)))
 
+(defmethod unsubscribe-id ((conn connection) (id string))
+  (sending-frame (conn frame "UNSUBSCRIBE"
+                       "id" id)))
+
 (defmethod post ((conn connection) (message string) (destination string) &optional headers)
   (sending-frame (conn frame "SEND"
                        "destination" destination)