Skip to content
cl-stomp.lisp 12.3 KiB
Newer Older
keith.irwin's avatar
keith.irwin committed
;; -*- mode: lisp; -*-

(defpackage :cl-stomp

  (:nicknames :stomp)

  (:use :cl
keith.irwin's avatar
keith.irwin committed
        :cl-user)
keith.irwin's avatar
keith.irwin committed

  (:export :frame
keith.irwin's avatar
keith.irwin committed
           :set-header
           :get-header
           :set-destination
           :get-destination
           :frame-body-of
           :frame-name-of
           :stomp-connection
           :make-connection
           :connect
           :register
           :post
           :start
           :stop))
keith.irwin's avatar
keith.irwin committed

(in-package :cl-stomp)

;;-------------------------------------------------------------------------
;; socket interface

(defun ip-addr-of (host-name)
  (let ((host (sb-bsd-sockets:get-host-by-name host-name)))
    (sb-bsd-sockets:host-ent-address host)))

(defun inet-addr (ip-address)
  (sb-bsd-sockets:make-inet-address ip-address))

(defun sk-connect (sock address port)
  (let ((ip-address (if (stringp address) (inet-addr address) address)))
    (sb-bsd-sockets:socket-connect sock ip-address port)))

(defun sk-file-desc (sock)
  (sb-bsd-sockets:socket-file-descriptor sock))

(defun sk-close (sock)
  (let ((fd (sb-bsd-sockets:socket-file-descriptor sock)))
    (sb-sys:invalidate-descriptor fd)
    (sb-bsd-sockets:socket-close sock)))

(defun sk-make ()
  (let ((sock (make-instance 'sb-bsd-sockets:inet-socket 
                             :type :stream :protocol :tcp)))
    (setf (sb-bsd-sockets:sockopt-reuse-address sock) t)
    sock))

;;-------------------------------------------------------------------------
;; convenience utils

(defun log-debug (fmt &rest args)
  (apply #'format *standard-output* fmt args)
  (finish-output *standard-output*))

(defun string-from-bytes (bytes)
  (map 'string #'code-char bytes))

(defun string-strip (str)
  "Remove spaces, tabs and line enders from a string."
  (declare (string str))
  (string-trim (list #\Space #\NewLine #\Return #\Tab #\Nul) str))

(defun string-join (lines)
  (let ((result (make-array '(0) :element-type 'character :fill-pointer 0 :adjustable t)))
    (with-output-to-string (stream result)
      (dolist (line lines)
keith.irwin's avatar
keith.irwin committed
        (write-line line stream)))
keith.irwin's avatar
keith.irwin committed
    (string-strip result)))

(defun string-split (str &key (delim " ") (limit nil))
  "Returns a list of words in STR broken at the DELIM boundary."

  (labels ((pop-word (str &key (delim " "))
keith.irwin's avatar
keith.irwin committed
             "Returns the first word and rest of STR broken at DELIM."
             (let ((start (position delim str :test 'string=)))
               (if (null start)
                   (values str nil)
                   (let ((start2 (min (length str)
                                      (+ 1 start))))
                     (values (subseq str 0 start)
                             (subseq str start2))))))

           (splitter (str delim limit count)
             (if (null str)
                 nil
                 (if (and (not (null limit))
                          (>= count limit))
                     (list str)
                     (multiple-value-bind (word rest) (pop-word str :delim delim)
                       (if (not (zerop (length word)))
                           (append (list (string-strip word)) 
                                   (splitter rest delim limit (incf count)))
                           (splitter rest delim limit (incf count))))))))
keith.irwin's avatar
keith.irwin committed

    (splitter str delim limit 0)))

(defun string-lines (string)
  (let ((result ()))
    (with-input-from-string (stream string)
      (loop 
keith.irwin's avatar
keith.irwin committed
         :for line = (read-line stream nil 'eof)
         :while (not (eql line 'eof))
         :do (setf result (pushnew line result))))
keith.irwin's avatar
keith.irwin committed
    (nreverse result)))

(defun string-contains (string str-or-char-list)
  "Returns true if STR-OR-CHAR-LIST is contained in STRING."
  (declare (type string string))
  (search (coerce str-or-char-list 'string) string :test #'string=))

;;-------------------------------------------------------------------------
;; frame

(defgeneric set-destination (frame destination)
  (:documentation "Set the destination header for the frame."))

(defgeneric get-header (frame key)
  (:documentation ""))

(defgeneric get-destination (frame)
  (:documentation ""))

(defgeneric set-header (frame key value)
  (:documentation ""))

(defclass frame ()
  ((name
    :initform "MESSAGE"
    :initarg :name
    :accessor frame-name-of)
   (headers
    :initform ()
    :initarg :headers
    :accessor frame-headers-of)
   (body
    :initform ""
    :initarg :body
    :accessor frame-body-of)))

(defun make-frame (string)

  ;; declare some useful local functions
  (labels (

keith.irwin's avatar
keith.irwin committed
           (make-header (line)
             (map 'list #'string-strip (string-split line :delim ":" :limit 1)))

           ;; frame name is first line
           (find-name (source)
             (first source))

           ;; frame headers are second lines through to empty line
           (find-headers (source)
             (let ((lines (rest source)))
               (loop
                  :for line :in lines
                  :while (> (length line) 0)
                  :collect (make-header line))))

           ;; frame body is all lines after the empty line
           (find-body (source)
             (let* ((lines (reverse source))
                    (result (loop
                               :for line :in lines
                               :while (> (length line) 0)
                               :collect line)))
               (string-join (nreverse result)))))
keith.irwin's avatar
keith.irwin committed

    (let ((lines (string-lines string)))
      (let ((name (find-name lines))
keith.irwin's avatar
keith.irwin committed
            (headers (find-headers lines))
            (body (find-body lines)))
        (make-instance 'frame :name name :headers headers :body body)))))
keith.irwin's avatar
keith.irwin committed

(defmethod print-object ((self frame) stream)
  (with-slots (name headers body) self
    (format stream "~A~%" name)
    (dolist (header headers)
      (format stream "~a:~a~%" (first header) (second header)))
    (format stream "~%~a~%~a~%" body (code-char 0))))

(defmethod set-destination ((self frame) destination)
  (set-header self "destination" destination))

(defmethod get-header ((self frame) key)
  (with-slots (headers) self
    (second (assoc key headers :test #'string=))))

(defmethod get-destination ((self frame))
  (get-header self "destination"))

(defmethod set-header ((self frame) key value)
  (with-slots (headers) self
    (if (not (assoc key headers :test #'string=))
keith.irwin's avatar
keith.irwin committed
        (setf headers (append (list (list key value)) headers))
        (let ((result))
          (dolist (header headers)
            (if (string= (first header) key)
                (push (list key value) result)
                (push header result)))
          (format t "result: ~s~%" result)
          (setf headers result)))))
keith.irwin's avatar
keith.irwin committed

;;-------------------------------------------------------------------------
;; stomp

(defgeneric connect (connection &key user pass)
  (:documentation "Connect to the stomp server."))

(defgeneric disconnect (connection)
  (:documentation "Disconnection from the stomp server."))

(defgeneric send (connection frame)
  (:documentation "Send a string to stomp."))

(defgeneric post (connection message destination)
  (:documentation "Post a message to a destination."))

(defgeneric register (connection callback destination)
  (:documentation "Register a listener for messages to a destination."))

(defgeneric receive (connection)
  (:documentation "Read data from stomp."))

(defgeneric subscribe (connection destination)
  (:documentation "Subscribe to a topic."))

(defgeneric start (connection)
  (:documentation "Start listening for messages from stomp."))

(defgeneric apply-callbacks (instance frame)
  (:documentation "Send message to registered callbacks."))

(defgeneric stop (connection)
  (:documentation "Stop the connection with stomp."))

(defclass stomp-connection ()
  ((host
    :initform "localhost"
    :initarg :host
    :accessor host-of)
   (port
    :initform 61613
    :initarg :port
    :accessor port-of)
   (ip
    :initform "127.0.0.1"
    :initarg :ip
    :accessor ip-of)
   (socket
    :initform nil
    :initarg :socket
    :accessor socket-of)
   (fd
    :initform nil
    :initarg :fd
    :accessor fd-of)
   (handler
    :initform nil
    :initarg :handler
    :accessor handler-of)
   (callbacks
    :initform ()
    :initarg :callbacks
    :accessor callbacks-of)
   (terminate
    :initform nil)))

(defun make-connection (host port)
  (make-instance 'stomp-connection :host host :port port))

(defmethod connect ((self stomp-connection) &key user pass)
  (handler-case 

      ;; open the socket and send the connect string

      (with-slots (host port ip socket fd handler) self
keith.irwin's avatar
keith.irwin committed
        (setf ip (ip-addr-of host))
        (setf socket (sk-make))
        (sk-connect socket ip port)
        (setf fd (sk-file-desc socket))
        (setf handler (sb-sys:add-fd-handler fd :input (lambda (x)
                                                         (declare (ignore x))
                                                         (receive self))))
        (let ((frame (make-instance 'frame :name "CONNECT")))
          (if user
              (set-header frame "login" user))
          (if pass
              (set-header frame "passcode" pass))
          (send self frame)))
keith.irwin's avatar
keith.irwin committed

    ;; just log the error for now

    (condition (c)
      (setf (socket-of self) nil)
      (log-debug "error-on-connect: ~a" c))))

(defmethod subscribe ((self stomp-connection) destination)
  (let ((frame (make-instance 'frame :name "SUBSCRIBE")))
    (set-destination frame destination)
    (send self frame)))

(defmethod start ((self stomp-connection))
  (with-slots (terminate) self
    (setf terminate nil)
    (loop
       :until terminate
       :do (sb-sys:serve-all-events 1))
    (log-debug "terminated")))
    
(defmethod stop ((self stomp-connection))
  (with-slots (terminate) self
    (setf terminate t)))

(defmethod send ((self stomp-connection) (frame frame))
  (with-slots (fd) self
    (let ((stream (sb-sys:make-fd-stream fd :output t :element-type 'character :buffering :none)))
      (print frame stream)
      (finish-output stream))))

(defmethod send ((self stomp-connection) string)
  (with-slots (fd) self
    (let ((stream (sb-sys:make-fd-stream fd :output t :element-type '(unsigned-byte 8)
keith.irwin's avatar
keith.irwin committed
                                         :buffering :none)))
keith.irwin's avatar
keith.irwin committed
      (write-sequence (sb-ext:string-to-octets string) stream)
      (finish-output stream))))

(defmethod apply-callbacks ((self stomp-connection) (frame frame))
  (with-slots (callbacks) self
    (let ((destination (get-destination frame)))
      (loop 
keith.irwin's avatar
keith.irwin committed
         :for (dest func) :in callbacks
         :do (if (string= dest destination)
                 (funcall func frame))))))
keith.irwin's avatar
keith.irwin committed
  
(defmethod receive ((self stomp-connection))
  "Called whenever there's activity on the file descriptor."
  (with-slots (socket fd handler callbacks) self
    (let ((str (sb-sys:make-fd-stream fd :input t :element-type '(unsigned-byte 8)
keith.irwin's avatar
keith.irwin committed
                                      :buffering :full)))
keith.irwin's avatar
keith.irwin committed
      (let ((buffer (loop 
keith.irwin's avatar
keith.irwin committed
                       :for b = (read-byte str nil 'eof)
                       :while (listen str)
                       :collect b)))
keith.irwin's avatar
keith.irwin committed

keith.irwin's avatar
keith.irwin committed
        (if (> (length buffer) 0)
keith.irwin's avatar
keith.irwin committed

keith.irwin's avatar
keith.irwin committed
            ;; if we got something, send it to all the
            ;; registered callbacks
keith.irwin's avatar
keith.irwin committed

keith.irwin's avatar
keith.irwin committed
            (let ((frame (make-frame (string-from-bytes buffer))))
              (apply-callbacks self frame))
keith.irwin's avatar
keith.irwin committed

keith.irwin's avatar
keith.irwin committed
            ;; otherwise, it means the other end has terminated,
            ;; so close things down (eventually go into a sleep/wait
            ;; loop in case of reconnection
keith.irwin's avatar
keith.irwin committed

keith.irwin's avatar
keith.irwin committed
            (handler-case
                (progn
                  (log-debug "nothing to read from socket~%")
                  (disconnect self)
                  (sb-sys:remove-fd-handler handler)
                  (sk-close socket))
              (condition (c)
                (log-debug "close: ~a" c))))))))
keith.irwin's avatar
keith.irwin committed

(defmethod register ((self stomp-connection) callback destination)
  (with-slots (callbacks) self
    (subscribe self destination)
    (setf callbacks (append callbacks (list (list destination callback))))))

(defmethod post ((self stomp-connection) message destination)
  (let ((frame (make-instance 'frame :name "SEND" :body message)))
    (set-destination frame destination)
    (send self frame)))

(defmethod disconnect ((self stomp-connection))
  (let ((frame (make-instance 'frame :name "DISCONNECT")))
    (send self frame))
  (with-slots (handler socket) self
    (handler-case
keith.irwin's avatar
keith.irwin committed
        (sb-sys:remove-fd-handler handler)
keith.irwin's avatar
keith.irwin committed
      (condition (c)
keith.irwin's avatar
keith.irwin committed
        (log-debug "discconnect-handler-error: ~a" c)))
keith.irwin's avatar
keith.irwin committed
    (handler-case
keith.irwin's avatar
keith.irwin committed
        (sk-close socket)
keith.irwin's avatar
keith.irwin committed
      (condition (c)
keith.irwin's avatar
keith.irwin committed
        (log-debug "disconnect-socket-error: ~a" c)))))
keith.irwin's avatar
keith.irwin committed