Fixes from the QRes tree, by mjr qres-r468880
authorFrancois-Rene Rideau <fare@tunes.org>
Tue, 24 May 2011 21:05:19 +0000 (17:05 -0400)
committerFrancois-Rene Rideau <fare@tunes.org>
Tue, 24 May 2011 21:05:19 +0000 (17:05 -0400)
------------------------------------------------------------------------
r468880 | mjr | 2011-05-24 11:37:48 -0400 (Tue, 24 May 2011) | 21 lines

Handle malformed stomp frame headers

Reviewer: gordon

Tests that will verify: test-stomp-client unit tests pass using local amq:
                        ran 8 unit tests (56 assertions, 0 failures, 0 errors)

------------------------------------------------------------------------
r467053 | swm | 2011-05-16 12:56:20 -0400 (Mon, 16 May 2011) | 7 lines

Try to make the open-source code that we publish adhere
to the Lisp conventions we internally "mandate" and which
will likely become Google's own Lisp style conevntions.

No functional change.

------------------------------------------------------------------------
r466830 | mjr | 2011-05-13 16:33:20 -0400 (Fri, 13 May 2011) | 22 lines

Add logic to stomp handler for failed queued messages.

Reviewer: dschulz

Tests that will verify: updated test-stomp-client unit tests:
                        test-stomp-client unit tests pass using local amq:
                        ran 7 unit tests (49 assertions, 0 failures, 0 errors)

------------------------------------------------------------------------
r465514 | mjr | 2011-05-09 15:27:06 -0400 (Mon, 09 May 2011) | 20 lines

Tweak disconnecting from stomp message broker

Reviewer: gordon

Tests that will verify: test-stomp-client unit tests pass using local amq:
ran 7 unit tests (39 assertions, 0 failures, 0 errors)

------------------------------------------------------------------------
r463376 | mjr | 2011-04-26 19:21:28 -0400 (Tue, 26 Apr 2011) | 3 lines

Add comment about wildcard matching

------------------------------------------------------------------------
r461393 | mjr | 2011-04-19 12:50:16 -0400 (Tue, 19 Apr 2011) | 22 lines

Fix stomp receive loop
Add response message expiration header
Use locking to avoid race condition when stopping stomp

Reviewer: gordon

Tests that will verify: added test-stomp-response-expiration,
  test-stomp-monitor now reliably passes

cl-stomp.lisp

index 0bc375c..b67641e 100644 (file)
            (make-header (line)
              (multiple-value-bind (before after) 
                  (string-split line ":")
-               (list (string-strip before) (string-strip after))))
+               (list (string-strip (string before)) (string-strip (string after)))))
            ;; Frame name is first line
            (get-name (stream)
              (get-line stream))
     (format stream "~A~%" name)
     (dolist (header headers)
       (format stream "~A:~A~%" (first header) (second header)))
-    (format stream "~%~A~%~A~%" body (code-char 0))))
+    (format stream "~%~A~A" body (code-char 0))))
 
 (defun header= (header1 header2)
   "Case insensitive comparison function for headers."
          :initarg :port)
    (stream :initform nil
            :initarg :stream)
+   (encoding :initform :utf-8)           ;only utf-8 is currently supported
    (registrations :type list
                   :initform ()
                   :initarg :registrations)
               do (with-slots (destination client-ack?) reg
                    (subscribe conn destination client-ack?)))
         ;; The receive loop
-        (setf terminate nil)
         (let ((recvbuf '()))
           (loop until terminate
                 do (let ((sock (car (usocket:wait-for-input socket :timeout 1))))
                      (when sock
                        (let ((newbuf (append recvbuf (receive conn))))
                          (setf recvbuf (process-receive-buffer conn newbuf)))))))
-        (log-debug "Terminated")))))
+        (disconnect conn)
+        (log-debug "Terminated")
+        (setf terminate nil)))))
         
 (defmethod stop ((conn connection))
   "Gracefully terminates the current receive loop and closes the connection to the message broker."
   (check-type passcode (or null string))
   (sending-frame (conn frame "CONNECT"
                        "login" username
-                       "passcode" passcode)
-    ))
+                       "passcode" passcode)))
 
 (defmethod disconnect ((conn connection))
   (with-slots (stream) conn
-    (when (and stream (open-stream-p stream))
-      (sending-frame (conn frame "DISCONNECT")
-        )
-      (close stream)
+    (when stream 
+      (when (open-stream-p stream)
+        (sending-frame (conn frame "DISCONNECT"))
+        (close stream))
       (setf stream nil))))
 
 (defmethod send ((conn connection) (frame frame))
   (send conn (with-output-to-string (stream)
-               ;; We use 'print' because the 'print-object' method on frames
+               ;; We use 'princ' because the 'print-object' method on frames
                ;; does its output using the STOMP protocol
-               (print frame stream))))
+               (princ frame stream))))
 
 (defmethod send ((conn connection) (string string))
-  (with-slots (stream) conn 
-    (write-sequence (babel:string-to-octets string :encoding :utf-8) stream)
+  (with-slots (stream encoding) conn 
+    (write-sequence (babel:string-to-octets string :encoding encoding) stream)
     (finish-output stream)))
 
 (defmethod receive ((conn connection))
   "Called whenever there's activity on the connection stream.
-   Reads from the stream and returns the received buffer as a list of bytes."
+   Reads from the stream and returns the received buffer as a list of bytes,
+    or NIL if the connection has been closed by the broker."
   (with-slots (stream) conn
-    (let ((buffer (loop for b = (read-byte stream nil 'eof)
-                        while (listen stream)
-                        collect b)))
+    (let ((buffer (loop while (listen stream)
+                        as b = (read-byte stream nil 'eof)
+                        unless (eql b 'eof)
+                          collect b)))
       (if (> (length buffer) 0)
         ;; Return the buffer
         buffer
         ;; so close things down
         (progn
           (log-debug "Nothing to read from socket, closing.")
-          (disconnect conn))))))
+          (stop conn)
+          nil)))))
 
 (defmethod process-receive-buffer ((conn connection) buffer)
   "Try to extract and process frame(s) from buffer.  Returns unprocessed buffer."
              (log-debug "Frame: ~A" frame)
              (apply-callbacks conn frame))
            (extract-frame ()
-             ;;Identify frames by looking for NULLs
-             ;;This is safe with utf-8 because a 0 will never appear within multibyte characters
-             ;;TODO: Use content-length header when provided instead of relying on NULL delimiter
+             ;; Identify frames by looking for NULLs
+             ;; This is safe with UTF-8 because a 0 will never appear within multibyte characters
+             ;;--- TODO: Use content-length header when provided instead of relying on NULL delimiter
              (let ((pos (position 0 buffer)))
                (when pos
-                 (let* ((framebytes (subseq buffer 0 pos))
-                        (framevector (coerce framebytes '(vector (unsigned-byte 8))))
-                        (framestring (babel:octets-to-string framevector :encoding :utf-8)))
-                   (process-frame (make-frame-from-string (string-strip framestring)))
-                   (setf buffer (subseq buffer (+ pos 1))))))))
+                 (with-slots (encoding) conn 
+                   (let* ((framevector (coerce buffer '(vector (unsigned-byte 8))))
+                          (framestring (babel:octets-to-string framevector 
+                                         :start 0 :end pos :encoding encoding)))
+                     (process-frame (make-frame-from-string (string-strip framestring)))
+                     (setf buffer (subseq buffer (1+ pos)))))))))
     (loop while (extract-frame))
     buffer))
 
+(defun destination= (actual registered)
+  "Returns T if the REGISTERED destination matches the ACTUAL destination."
+  ;;--- TODO: Implement wildcard matching? (NOTE: not all message brokers support wildcard matching)
+  (string-equal actual registered))
+
 (defmethod apply-callbacks ((conn connection) (frame frame))
   "Send FRAME to any matching registered callbacks."
   (with-slots (registrations error-callback) conn
       (let ((dest (get-destination frame)))
         (loop for reg in registrations
               do (with-slots (callback destination) reg
-                   (when (and callback (string-equal dest destination))
+                   (when (and callback (destination= dest destination))
                      (funcall callback frame))))))))
 
 (defmethod register ((conn connection) callback (destination string) &optional client-ack?)
 
 (defmethod unsubscribe ((conn connection) (destination string))
   (sending-frame (conn frame "UNSUBSCRIBE"
-                       "destination" destination)
-    ))
+                       "destination" destination)))
 
 (defmethod post ((conn connection) (message string) (destination string) &optional headers)
   (sending-frame (conn frame "SEND"
                        "destination" destination)
     (loop for (key value) in headers
-          do (set-header frame key value))
-    (setf (frame-body frame) (string-strip message))))
+          unless (header= key "destination")    ;don't overwrite the destination set above
+            do (set-header frame key value))
+    (setf (frame-body frame) message)))
 
 (defmethod ack ((conn connection) (for-frame frame) &optional transaction)
   "Send the client ack for FRAME and optional TRANSACTION"
   (check-type transaction (or null string))
   (sending-frame (conn frame "ACK"
                        "message-id" (get-header for-frame "message-id")
-                       "transaction" transaction)
-    ))
+                       "transaction" transaction)))
 
 (defmethod begin ((conn connection) (transaction string))
   "Begin a transaction with name TRANSACTION"
   (sending-frame (conn frame "BEGIN"
-                       "transaction" transaction)
-    ))
+                       "transaction" transaction)))
 
 (defmethod commit ((conn connection) (transaction string))
   "Commit a transaction with name TRANSACTION"
   (sending-frame (conn frame "COMMIT"
-                       "transaction" transaction)
-    ))
+                       "transaction" transaction)))
 
 ;; Naming this method 'abort' is not a good idea, so calling it 'rollback' instead
 (defmethod rollback ((conn connection) (transaction string))
   "Abort a transaction with name TRANSACTION."
   (sending-frame (conn frame "ABORT"
-                       "transaction" transaction)
-    ))
+                       "transaction" transaction)))