(make-header (line)
(multiple-value-bind (before after)
(string-split line ":")
- (list (string-strip before) (string-strip after))))
+ (list (string-strip (string before)) (string-strip (string after)))))
;; Frame name is first line
(get-name (stream)
(get-line stream))
(format stream "~A~%" name)
(dolist (header headers)
(format stream "~A:~A~%" (first header) (second header)))
- (format stream "~%~A~%~A~%" body (code-char 0))))
+ (format stream "~%~A~A" body (code-char 0))))
(defun header= (header1 header2)
"Case insensitive comparison function for headers."
:initarg :port)
(stream :initform nil
:initarg :stream)
+ (encoding :initform :utf-8) ;only utf-8 is currently supported
(registrations :type list
:initform ()
:initarg :registrations)
do (with-slots (destination client-ack?) reg
(subscribe conn destination client-ack?)))
;; The receive loop
- (setf terminate nil)
(let ((recvbuf '()))
(loop until terminate
do (let ((sock (car (usocket:wait-for-input socket :timeout 1))))
(when sock
(let ((newbuf (append recvbuf (receive conn))))
(setf recvbuf (process-receive-buffer conn newbuf)))))))
- (log-debug "Terminated")))))
+ (disconnect conn)
+ (log-debug "Terminated")
+ (setf terminate nil)))))
(defmethod stop ((conn connection))
"Gracefully terminates the current receive loop and closes the connection to the message broker."
(check-type passcode (or null string))
(sending-frame (conn frame "CONNECT"
"login" username
- "passcode" passcode)
- ))
+ "passcode" passcode)))
(defmethod disconnect ((conn connection))
(with-slots (stream) conn
- (when (and stream (open-stream-p stream))
- (sending-frame (conn frame "DISCONNECT")
- )
- (close stream)
+ (when stream
+ (when (open-stream-p stream)
+ (sending-frame (conn frame "DISCONNECT"))
+ (close stream))
(setf stream nil))))
(defmethod send ((conn connection) (frame frame))
(send conn (with-output-to-string (stream)
- ;; We use 'print' because the 'print-object' method on frames
+ ;; We use 'princ' because the 'print-object' method on frames
;; does its output using the STOMP protocol
- (print frame stream))))
+ (princ frame stream))))
(defmethod send ((conn connection) (string string))
- (with-slots (stream) conn
- (write-sequence (babel:string-to-octets string :encoding :utf-8) stream)
+ (with-slots (stream encoding) conn
+ (write-sequence (babel:string-to-octets string :encoding encoding) stream)
(finish-output stream)))
(defmethod receive ((conn connection))
"Called whenever there's activity on the connection stream.
- Reads from the stream and returns the received buffer as a list of bytes."
+ Reads from the stream and returns the received buffer as a list of bytes,
+ or NIL if the connection has been closed by the broker."
(with-slots (stream) conn
- (let ((buffer (loop for b = (read-byte stream nil 'eof)
- while (listen stream)
- collect b)))
+ (let ((buffer (loop while (listen stream)
+ as b = (read-byte stream nil 'eof)
+ unless (eql b 'eof)
+ collect b)))
(if (> (length buffer) 0)
;; Return the buffer
buffer
;; so close things down
(progn
(log-debug "Nothing to read from socket, closing.")
- (disconnect conn))))))
+ (stop conn)
+ nil)))))
(defmethod process-receive-buffer ((conn connection) buffer)
"Try to extract and process frame(s) from buffer. Returns unprocessed buffer."
(log-debug "Frame: ~A" frame)
(apply-callbacks conn frame))
(extract-frame ()
- ;;Identify frames by looking for NULLs
- ;;This is safe with utf-8 because a 0 will never appear within multibyte characters
- ;;TODO: Use content-length header when provided instead of relying on NULL delimiter
+ ;; Identify frames by looking for NULLs
+ ;; This is safe with UTF-8 because a 0 will never appear within multibyte characters
+ ;;--- TODO: Use content-length header when provided instead of relying on NULL delimiter
(let ((pos (position 0 buffer)))
(when pos
- (let* ((framebytes (subseq buffer 0 pos))
- (framevector (coerce framebytes '(vector (unsigned-byte 8))))
- (framestring (babel:octets-to-string framevector :encoding :utf-8)))
- (process-frame (make-frame-from-string (string-strip framestring)))
- (setf buffer (subseq buffer (+ pos 1))))))))
+ (with-slots (encoding) conn
+ (let* ((framevector (coerce buffer '(vector (unsigned-byte 8))))
+ (framestring (babel:octets-to-string framevector
+ :start 0 :end pos :encoding encoding)))
+ (process-frame (make-frame-from-string (string-strip framestring)))
+ (setf buffer (subseq buffer (1+ pos)))))))))
(loop while (extract-frame))
buffer))
+(defun destination= (actual registered)
+ "Returns T if the REGISTERED destination matches the ACTUAL destination."
+ ;;--- TODO: Implement wildcard matching? (NOTE: not all message brokers support wildcard matching)
+ (string-equal actual registered))
+
(defmethod apply-callbacks ((conn connection) (frame frame))
"Send FRAME to any matching registered callbacks."
(with-slots (registrations error-callback) conn
(let ((dest (get-destination frame)))
(loop for reg in registrations
do (with-slots (callback destination) reg
- (when (and callback (string-equal dest destination))
+ (when (and callback (destination= dest destination))
(funcall callback frame))))))))
(defmethod register ((conn connection) callback (destination string) &optional client-ack?)
(defmethod unsubscribe ((conn connection) (destination string))
(sending-frame (conn frame "UNSUBSCRIBE"
- "destination" destination)
- ))
+ "destination" destination)))
(defmethod post ((conn connection) (message string) (destination string) &optional headers)
(sending-frame (conn frame "SEND"
"destination" destination)
(loop for (key value) in headers
- do (set-header frame key value))
- (setf (frame-body frame) (string-strip message))))
+ unless (header= key "destination") ;don't overwrite the destination set above
+ do (set-header frame key value))
+ (setf (frame-body frame) message)))
(defmethod ack ((conn connection) (for-frame frame) &optional transaction)
"Send the client ack for FRAME and optional TRANSACTION"
(check-type transaction (or null string))
(sending-frame (conn frame "ACK"
"message-id" (get-header for-frame "message-id")
- "transaction" transaction)
- ))
+ "transaction" transaction)))
(defmethod begin ((conn connection) (transaction string))
"Begin a transaction with name TRANSACTION"
(sending-frame (conn frame "BEGIN"
- "transaction" transaction)
- ))
+ "transaction" transaction)))
(defmethod commit ((conn connection) (transaction string))
"Commit a transaction with name TRANSACTION"
(sending-frame (conn frame "COMMIT"
- "transaction" transaction)
- ))
+ "transaction" transaction)))
;; Naming this method 'abort' is not a good idea, so calling it 'rollback' instead
(defmethod rollback ((conn connection) (transaction string))
"Abort a transaction with name TRANSACTION."
(sending-frame (conn frame "ABORT"
- "transaction" transaction)
- ))
+ "transaction" transaction)))