Implementation of the STOMP protocol by Matt Reklaitis <mjr@itasoftware.com>
authorFrancois-Rene Rideau <fare@tunes.org>
Tue, 15 Mar 2011 20:00:37 +0000 (16:00 -0400)
committerFrancois-Rene Rideau <fare@tunes.org>
Tue, 15 Mar 2011 20:00:37 +0000 (16:00 -0400)
With authorization of the original author and of ITA,
this code is made available under an MIT-style license.

COPYING [new file with mode: 0644]
README [new file with mode: 0644]
cl-stomp.asd
cl-stomp.lisp

diff --git a/COPYING b/COPYING
new file mode 100644 (file)
index 0000000..95c1422
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,20 @@
+Copyright (c) 2007-2011 Keith Irwin, Matt Reklaitis
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/README b/README
new file mode 100644 (file)
index 0000000..dd234c1
--- /dev/null
+++ b/README
@@ -0,0 +1,5 @@
+cl-stomp: Common-Lisp interface to ActiveMQ using the STOMP protocol
+
+This code is available under an MIT-style license. See COPYING.
+It was written by Matt J. Reklaitis at ITA Software, based on code
+originally published by Keith Irwin in 2007 at http://cl-stomp.googlecode.com
index 32e0a00..854b5a3 100644 (file)
@@ -1,8 +1,16 @@
-;; -*- mode: lisp; -*-
+;; -*- mode: lisp; indent-tabs-mode: nil; -*-
+;;
+;; Available under MIT-style License. see COPYING.
+;;
+
+(in-package :asdf)
+
+(defsystem cl-stomp
+  :description "Implements the STOMP protocol for connecting to a message broker."
+  :author ""
+  :version ""
+  :licence ""
+  :depends-on (cl-ppcre usocket)
+  :components ((:file "cl-stomp")))
 
-(defsystem :cl-stomp
-    :version "0.1.0"
-    :depends-on (cl-ppcre)
-    :components
-    ((:file "cl-stomp")))
 
index f401aa0..d312535 100644 (file)
@@ -1,4 +1,7 @@
 ;; -*- mode: lisp; indent-tabs-mode: nil; -*-
+;;
+;; Available under MIT-style License. see COPYING.
+;;
 
 (defpackage :cl-stomp
 
   (:use :cl
         :cl-user)
 
-  (:export :frame
+  (:export :make-connection
+           :start
+           :stop
+           :frame
+           :frame-body-of
+           :frame-name-of
+           :frame-headers-of
            :set-header
            :get-header
            :set-destination
            :get-destination
-           :frame-body-of
-           :frame-name-of
-           :stomp-connection
-           :make-connection
-           :connect
            :register
+           :register-error
+           :subscribe
+           :unsubscribe
            :post
-           :start
-           :stop))
+           :ack
+           :begin
+           :commit
+           :rollback))
 
 (in-package :cl-stomp)
 
 ;;-------------------------------------------------------------------------
-;; socket interface
-
-(defun ip-addr-of (host-name)
-  (let ((host (sb-bsd-sockets:get-host-by-name host-name)))
-    (sb-bsd-sockets:host-ent-address host)))
-
-(defun inet-addr (ip-address)
-  (sb-bsd-sockets:make-inet-address ip-address))
-
-(defun sk-connect (sock address port)
-  (let ((ip-address (if (stringp address) (inet-addr address) address)))
-    (sb-bsd-sockets:socket-connect sock ip-address port)))
-
-(defun sk-file-desc (sock)
-  (sb-bsd-sockets:socket-file-descriptor sock))
-
-(defun sk-close (sock)
-  (let ((fd (sb-bsd-sockets:socket-file-descriptor sock)))
-    (sb-sys:invalidate-descriptor fd)
-    (sb-bsd-sockets:socket-close sock)))
-
-(defun sk-make ()
-  (let ((sock (make-instance 'sb-bsd-sockets:inet-socket 
-                             :type :stream :protocol :tcp)))
-    (setf (sb-bsd-sockets:sockopt-reuse-address sock) t)
-    sock))
-
-;;-------------------------------------------------------------------------
 ;; convenience utils
 
+#+nil
 (defun log-debug (fmt &rest args)
   (apply #'format *standard-output* fmt args)
   (finish-output *standard-output*))
 
+#-nil
+(defun log-debug (fmt &rest args)
+  (declare (ignore fmt args)))
+
 (defun string-from-bytes (bytes)
   (map 'string #'code-char bytes))
 
+(defun string-to-bytes (string)
+  (map 'sequence 'char-code string))
+
 (defun string-strip (str)
   "Remove spaces, tabs and line enders from a string."
   (declare (string str))
   (let ((result (make-array '(0) :element-type 'character :fill-pointer 0 :adjustable t)))
     (with-output-to-string (stream result)
       (dolist (line lines)
-        (write-line line stream)))
-    (string-strip result)))
-
-(defun string-split (str &key (delim " ") (limit nil))
-  "Returns a list of words in STR broken at the DELIM boundary."
-
-  (labels ((pop-word (str &key (delim " "))
-             "Returns the first word and rest of STR broken at DELIM."
-             (let ((start (position delim str :test 'string=)))
-               (if (null start)
-                   (values str nil)
-                   (let ((start2 (min (length str)
-                                      (+ 1 start))))
-                     (values (subseq str 0 start)
-                             (subseq str start2))))))
-
-           (splitter (str delim limit count)
-             (if (null str)
-                 nil
-                 (if (and (not (null limit))
-                          (>= count limit))
-                     (list str)
-                     (multiple-value-bind (word rest) (pop-word str :delim delim)
-                       (if (not (zerop (length word)))
-                           (append (list (string-strip word)) 
-                                   (splitter rest delim limit (incf count)))
-                           (splitter rest delim limit (incf count))))))))
-
-    (splitter str delim limit 0)))
+        (write-line line stream)))))
+
+(defun string-split (string delim)
+  "Splits STRING at the first occurance of DELIM and returns the before and after subsequences.
+   If DELIM is not found in STRING, returns STRING and NIL."
+  (when string
+    (let ((start (search delim string :test 'string=)))
+      (if (null start)
+        (values string nil)
+        (let ((start2 (min (length string) (+ (length delim) start))))
+          (values (subseq string 0 start)
+                  (subseq string start2)))))))
 
 (defun string-lines (string)
   (let ((result ()))
          :do (setf result (pushnew line result))))
     (nreverse result)))
 
-(defun string-contains (string str-or-char-list)
-  "Returns true if STR-OR-CHAR-LIST is contained in STRING."
-  (declare (type string string))
-  (search (coerce str-or-char-list 'string) string :test #'string=))
-
 ;;-------------------------------------------------------------------------
 ;; frame
 
-(defgeneric set-destination (frame destination)
-  (:documentation "Set the destination header for the frame."))
-
-(defgeneric get-header (frame key)
-  (:documentation ""))
-
-(defgeneric get-destination (frame)
-  (:documentation ""))
-
-(defgeneric set-header (frame key value)
-  (:documentation ""))
-
 (defclass frame ()
   ((name
     :initform "MESSAGE"
     :accessor frame-body-of)))
 
 (defun make-frame (string)
-
+  "Construct a frame by parsing STRING according to protocol."
   ;; declare some useful local functions
   (labels (
+           (get-line (stream)
+             (let ((line (read-line stream nil 'eof)))
+               (unless (eql line 'eof)
+                 line)))
 
            (make-header (line)
-             (map 'list #'string-strip (string-split line :delim ":" :limit 1)))
+             (multiple-value-bind (before after) 
+                 (string-split line ":")
+               (list (string-strip before) (string-strip after))))
 
            ;; frame name is first line
-           (find-name (source)
-             (first source))
+           (get-name (stream)
+             (get-line stream))
 
            ;; frame headers are second lines through to empty line
-           (find-headers (source)
-             (let ((lines (rest source)))
-               (loop
-                  :for line :in lines
-                  :while (> (length line) 0)
-                  :collect (make-header line))))
+           (get-headers (stream)
+             (loop
+               :for line = (get-line stream)
+               :while (> (length line) 0)
+               :collect (make-header line)))
 
            ;; frame body is all lines after the empty line
-           (find-body (source)
-             (let* ((lines (reverse source))
-                    (result (loop
-                               :for line :in lines
-                               :while (> (length line) 0)
-                               :collect line)))
-               (string-join (nreverse result)))))
-
-    (let ((lines (string-lines string)))
-      (let ((name (find-name lines))
-            (headers (find-headers lines))
-            (body (find-body lines)))
+           (get-body (stream)
+             (coerce (loop
+                       :for c = (read-char stream nil 'eof)
+                       :while (not (eql c 'eof))
+                       :collect c)
+               'string)))
+
+    (with-input-from-string (stream string)
+      (let ((name (get-name stream))
+            (headers (get-headers stream))
+            (body (get-body stream)))
         (make-instance 'frame :name name :headers headers :body body)))))
 
 (defmethod print-object ((self frame) stream)
+  "Print FRAME to STREAM according to protocol."
   (with-slots (name headers body) self
     (format stream "~A~%" name)
     (dolist (header headers)
     (format stream "~%~a~%~a~%" body (code-char 0))))
 
 (defmethod set-destination ((self frame) destination)
+  "Set the destination header for FRAME."
   (set-header self "destination" destination))
 
+(defmethod set-client-ack ((self frame))
+  "Specify a 'client' ack header for FRAME"
+  (set-header self "ack" "client"))
+
+(defun header= (header1 header2)
+  "Case insensitive comparison func for headers."
+  (string= (string-downcase header1) (string-downcase header2)))
+
 (defmethod get-header ((self frame) key)
+  "Return the value of the header named KEY, or NIL if it doesn't exist."
   (with-slots (headers) self
-    (second (assoc key headers :test #'string=))))
+    (second (assoc key headers :test #'header=))))
 
 (defmethod get-destination ((self frame))
+  "Return the value of the destination header."
   (get-header self "destination"))
 
 (defmethod set-header ((self frame) key value)
-  (with-slots (headers) self
-    (if (not (assoc key headers :test #'string=))
+  "Add a header named KEY to FRAME with VALUE.
+   If the header already exists, VALUE is appended to the existing value(s)."
+  (when value
+    (with-slots (headers) self
+      (if (not (assoc key headers :test #'header=))
         (setf headers (append (list (list key value)) headers))
         (let ((result))
           (dolist (header headers)
-            (if (string= (first header) key)
-                (push (list key value) result)
-                (push header result)))
-          (format t "result: ~s~%" result)
-          (setf headers result)))))
+            (if (header= (first header) key)
+              (push (list key value) result)
+              (push header result)))
+          (setf headers result))))))
+
+(defmethod error-frame-p ((self frame))
+  (string= (string-downcase (frame-name-of self)) "error"))
 
 ;;-------------------------------------------------------------------------
 ;; stomp
 
-(defgeneric connect (connection &key user pass)
-  (:documentation "Connect to the stomp server."))
+(defgeneric get-header (frame key)
+  (:documentation ""))
+
+(defgeneric set-header (frame key value)
+  (:documentation ""))
 
-(defgeneric disconnect (connection)
-  (:documentation "Disconnection from the stomp server."))
+(defgeneric get-destination (frame)
+  (:documentation ""))
 
-(defgeneric send (connection frame)
-  (:documentation "Send a string to stomp."))
+(defgeneric set-destination (frame destination)
+  (:documentation "Set the destination header for the frame."))
 
-(defgeneric post (connection message destination)
+(defgeneric post (connection message destination &optional headers)
   (:documentation "Post a message to a destination."))
 
-(defgeneric register (connection callback destination)
-  (:documentation "Register a listener for messages to a destination."))
+(defgeneric ack (connection frame &optional transaction)
+  (:documentation "Send the client ack for frame."))
 
-(defgeneric receive (connection)
-  (:documentation "Read data from stomp."))
+(defgeneric begin (connection transaction)
+  (:documentation "Start a transaction for the given name"))
 
-(defgeneric subscribe (connection destination)
-  (:documentation "Subscribe to a topic."))
+(defgeneric commit (connection transaction)
+  (:documentation "Commit a transaction for the given name"))
 
-(defgeneric start (connection)
-  (:documentation "Start listening for messages from stomp."))
+(defgeneric rollback (connection transaction)
+  (:documentation "Abort a transaction for the given name"))
+
+(defgeneric register (connection callback destination &optional client-ack?)
+  (:documentation "Register a listener for messages to a destination.
+                   Callback should be a lambda which accepts a frame argument.
+                   If client-ack? is t, client is responsible for sending ACK."))
+
+(defgeneric register-error (connection callback)
+  (:documentation "Register a listener for STOMP error frames."))
+
+(defgeneric subscribe (connection destination &optional client-ack?)
+  (:documentation "Subscribe to a topic or queue."))
 
-(defgeneric apply-callbacks (instance frame)
-  (:documentation "Send message to registered callbacks."))
+(defgeneric unsubscribe (connection destination)
+  (:documentation "Unsubscribe from a topic or queue."))
+
+(defgeneric start (connection &key username passcode)
+  (:documentation "Start listening for messages from stomp."))
 
 (defgeneric stop (connection)
   (:documentation "Stop the connection with stomp."))
 
-(defclass stomp-connection ()
+(defclass registration ()
+  ((callback    :initarg :callback)       ; The callback func
+   (destination :initarg :destination)    ; The topic/queue name
+   (client-ack? :initarg :client-ack?)))  ; Use client (or auto) ack?
+
+(defclass connection ()
   ((host
     :initform "localhost"
-    :initarg :host
-    :accessor host-of)
+    :initarg :host)
    (port
     :initform 61613
-    :initarg :port
-    :accessor port-of)
-   (ip
-    :initform "127.0.0.1"
-    :initarg :ip
-    :accessor ip-of)
-   (socket
-    :initform nil
-    :initarg :socket
-    :accessor socket-of)
-   (fd
-    :initform nil
-    :initarg :fd
-    :accessor fd-of)
-   (handler
+    :initarg :port)
+   (stream
     :initform nil
-    :initarg :handler
-    :accessor handler-of)
-   (callbacks
-    :initform ()
-    :initarg :callbacks
-    :accessor callbacks-of)
+    :initarg :stream)
+   (registrations
+    :initform '()
+    :initarg :registrations)
+   (error-callback
+     :initform nil)
    (terminate
     :initform nil)))
 
 (defun make-connection (host port)
-  (make-instance 'stomp-connection :host host :port port))
-
-(defmethod connect ((self stomp-connection) &key user pass)
-  (handler-case 
-
-      ;; open the socket and send the connect string
-
-      (with-slots (host port ip socket fd handler) self
-        (setf ip (ip-addr-of host))
-        (setf socket (sk-make))
-        (sk-connect socket ip port)
-        (setf fd (sk-file-desc socket))
-        (setf handler (sb-sys:add-fd-handler fd :input (lambda (x)
-                                                         (declare (ignore x))
-                                                         (receive self))))
+  (make-instance 'connection :host host :port port))
+
+(defmethod start ((self connection) &key username passcode)
+  "Connects to the message broker, sends subscriptions for any existing registrations,
+   and enters a receive loop."
+  (handler-bind
+      ((t (lambda (e)
+            (disconnect self)
+            (log-debug "error: ~a" e))))
+    (with-slots (host port stream registrations terminate) self
+      (usocket:with-client-socket (socket strm host port
+                                    :protocol :stream
+                                    :element-type '(unsigned-byte 8))
+        (setf stream strm)
+        ;;send CONNECT frame
         (let ((frame (make-instance 'frame :name "CONNECT")))
-          (if user
-              (set-header frame "login" user))
-          (if pass
-              (set-header frame "passcode" pass))
-          (send self frame)))
-
-    ;; just log the error for now
-
-    (condition (c)
-      (setf (socket-of self) nil)
-      (log-debug "error-on-connect: ~a" c))))
-
-(defmethod subscribe ((self stomp-connection) destination)
-  (let ((frame (make-instance 'frame :name "SUBSCRIBE")))
-    (set-destination frame destination)
-    (send self frame)))
-
-(defmethod start ((self stomp-connection))
-  (with-slots (terminate) self
-    (setf terminate nil)
-    (loop
-       :until terminate
-       :do (sb-sys:serve-all-events 1))
-    (log-debug "terminated")))
-    
-(defmethod stop ((self stomp-connection))
+          (set-header frame "login" username)
+          (set-header frame "passcode" passcode)
+          (send self frame))
+        
+        ;;send SUBSCRIBE frames
+        (loop 
+          :for reg :in registrations
+          :do (with-slots (destination client-ack?) reg
+                (subscribe self destination client-ack?)))
+
+        ;;receive loop
+        (setf terminate nil)
+        (let ((recvbuf ""))
+          (loop
+            :until terminate
+            :do (let ((sock (car (usocket:wait-for-input socket :timeout 0))))
+                  (if sock
+                    (let* ((recvstr (receive self))
+                           (newbuf (if (= (length recvbuf) 0)  ;; only call string-join when necessary
+                                     recvstr
+                                     (string-join (list recvbuf recvstr)))))
+                      (setf recvbuf (process_receive_buffer self newbuf)))
+                    (sleep 0.1)))))
+        (log-debug "terminated")))))
+        
+(defmethod stop ((self connection))
+  "Gracefully terminates the current receive loop and closes the connection to the message broker."
   (with-slots (terminate) self
     (setf terminate t)))
 
-(defmethod send ((self stomp-connection) (frame frame))
-  (with-slots (fd) self
-    (let ((stream (sb-sys:make-fd-stream fd :output t :element-type 'character :buffering :none)))
-      (print frame stream)
-      (finish-output stream))))
-
-(defmethod send ((self stomp-connection) string)
-  (with-slots (fd) self
-    (let ((stream (sb-sys:make-fd-stream fd :output t :element-type '(unsigned-byte 8)
-                                         :buffering :none)))
-      (write-sequence (sb-ext:string-to-octets string) stream)
-      (finish-output stream))))
-
-(defmethod apply-callbacks ((self stomp-connection) (frame frame))
-  (with-slots (callbacks) self
-    (let ((destination (get-destination frame)))
-      (loop 
-         :for (dest func) :in callbacks
-         :do (if (string= dest destination)
-                 (funcall func frame))))))
+(defmethod apply-callbacks ((self connection) (frame frame))
+  "Send FRAME to any matching registered callbacks."
+  (with-slots (registrations error-callback) self
+    (if (error-frame-p frame)
+      (when error-callback
+        (funcall error-callback frame))
+      (let ((dest (get-destination frame)))
+        (loop 
+          :for reg :in registrations
+          :do (with-slots (callback destination) reg
+                (if (string= dest destination)
+                  (funcall callback frame))))))))
+
+(defmethod send ((self connection) (frame frame))
+  (send self (with-output-to-string (stream)
+               (print frame stream))))
+
+(defmethod send ((self connection) string)
+  (with-slots (stream) self
+    (write-sequence (string-to-bytes string) stream)
+    (finish-output stream)))
   
-(defmethod receive ((self stomp-connection))
-  "Called whenever there's activity on the file descriptor."
-  (with-slots (socket fd handler callbacks) self
-    (let ((str (sb-sys:make-fd-stream fd :input t :element-type '(unsigned-byte 8)
-                                      :buffering :full)))
-      (let ((buffer (loop 
-                       :for b = (read-byte str nil 'eof)
-                       :while (listen str)
-                       :collect b)))
+(defmethod receive ((self connection))
+  "Called whenever there's activity on the connection stream.
+   Reads from the stream and returns the received buffer as a string."
+  (with-slots (stream) self
+      (let ((buffer (loop
+                      :for b = (read-byte stream nil 'eof)
+                      :while (listen stream)
+                      :collect b)))
 
         (if (> (length buffer) 0)
+          
+          ;; return the buffer as a string
+          (string-from-bytes buffer)
+
+          ;; otherwise, it means the other end has terminated,
+          ;; so close things down
+          (progn
+            (log-debug "nothing to read from socket, closing.~%")
+            (disconnect self))))))
+
+(defmethod process_receive_buffer ((self connection) buffer)
+  "Try to extract and process frame(s) from recvbuf.  Returns unprocessed buffer."
+  (labels ((process-frame (frame)
+             (log-debug "frame: ~a" frame)
+             (apply-callbacks self frame))
+
+            (extract-frame ()
+              (multiple-value-bind (before after)
+                  (string-split buffer (string (code-char 0)))
+                (when after
+                  ;;got one
+                  (process-frame (make-frame before))
+                  (setf buffer after)))))
+
+    (loop 
+      :while (extract-frame))
+
+    buffer))
+
+(defmethod subscribe ((self connection) destination &optional client-ack?)
+  (let ((frame (make-instance 'frame :name "SUBSCRIBE")))
+    (set-destination frame destination)
+    (when client-ack?
+      (set-client-ack frame))
+    (send self frame)))
 
-            ;; if we got something, send it to all the
-            ;; registered callbacks
-
-            (let ((frame (make-frame (string-from-bytes buffer))))
-              (apply-callbacks self frame))
-
-            ;; otherwise, it means the other end has terminated,
-            ;; so close things down (eventually go into a sleep/wait
-            ;; loop in case of reconnection
+(defmethod unsubscribe ((self connection) destination)
+  (let ((frame (make-instance 'frame :name "UNSUBSCRIBE")))
+    (set-destination frame destination)
+    (send self frame)))
 
-            (handler-case
-                (progn
-                  (log-debug "nothing to read from socket~%")
-                  (disconnect self)
-                  (sb-sys:remove-fd-handler handler)
-                  (sk-close socket))
-              (condition (c)
-                (log-debug "close: ~a" c))))))))
+(defmethod register ((self connection) callback destination &optional client-ack?)
+  "Register a callback func for a destination.  A subscription to the destination using
+   the optional client-ack? is issued for all callbacks as part of connecting to the mq server."
+  (with-slots (stream registrations) self
+    (if stream
+      (subscribe self destination client-ack?))
+    (setf registrations (append registrations (list (make-instance 'registration 
+                                                      :callback callback
+                                                      :destination destination
+                                                      :client-ack? client-ack?))))))
+
+(defmethod register-error ((self connection) callback)
+  "Register a callback func for STOMP error frames."
+  (with-slots (error-callback) self
+    (setf error-callback callback)))
+
+(defmethod post ((self connection) message destination &optional headers)
+  (let ((frame (make-instance 'frame :name "SEND" :body (string-strip message))))
+    (set-destination frame destination)
+    (loop 
+      :for (key value) :in headers
+      :do (set-header frame key value))
+    (send self frame)))
 
-(defmethod register ((self stomp-connection) callback destination)
-  (with-slots (callbacks) self
-    (subscribe self destination)
-    (setf callbacks (append callbacks (list (list destination callback))))))
+(defmethod ack ((self connection) frame &optional transaction)
+  "Send the client ack for FRAME and optional TRANSACTION"
+  (let ((ack-frame (make-instance 'frame :name "ACK")))
+    (set-header ack-frame "message-id" (get-header frame "message-id"))
+    (set-header ack-frame "transaction" transaction)
+    (send self ack-frame)))
+
+(defmethod begin ((self connection) transaction)
+  "Begin a transaction with name TRANSACTION"
+  (let ((frame (make-instance 'frame :name "BEGIN")))
+    (set-header frame "transaction" transaction)
+    (send self frame)))
 
-(defmethod post ((self stomp-connection) message destination)
-  (let ((frame (make-instance 'frame :name "SEND" :body message)))
-    (set-destination frame destination)
+(defmethod commit ((self connection) transaction)
+  "Commit a transaction with name TRANSACTION"
+  (let ((frame (make-instance 'frame :name "COMMIT")))
+    (set-header frame "transaction" transaction)
     (send self frame)))
 
-(defmethod disconnect ((self stomp-connection))
-  (let ((frame (make-instance 'frame :name "DISCONNECT")))
-    (send self frame))
-  (with-slots (handler socket) self
-    (handler-case
-        (sb-sys:remove-fd-handler handler)
-      (condition (c)
-        (log-debug "discconnect-handler-error: ~a" c)))
-    (handler-case
-        (sk-close socket)
-      (condition (c)
-        (log-debug "disconnect-socket-error: ~a" c)))))
+;; naming this method 'abort' is not a good idea, so naming it 'rollback' instead
+(defmethod rollback ((self connection) transaction)
+  "Abort a transaction with name TRANSACTION"
+  (let ((frame (make-instance 'frame :name "ABORT")))
+    (set-header frame "transaction" transaction)
+    (send self frame)))
 
+(defmethod disconnect ((self connection))
+  (with-slots (stream) self
+    (when (and stream (open-stream-p stream))
+      (let ((frame (make-instance 'frame :name "DISCONNECT")))
+        (send self frame))
+      (close stream)
+      (setf stream nil))))