Newer
Older
;; -*- mode: lisp; -*-
(defpackage :cl-stomp
(:nicknames :stomp)
(:use :cl
:set-header
:get-header
:set-destination
:get-destination
:frame-body-of
:frame-name-of
:stomp-connection
:make-connection
:connect
:register
:post
:start
:stop))
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
(in-package :cl-stomp)
;;-------------------------------------------------------------------------
;; socket interface
(defun ip-addr-of (host-name)
(let ((host (sb-bsd-sockets:get-host-by-name host-name)))
(sb-bsd-sockets:host-ent-address host)))
(defun inet-addr (ip-address)
(sb-bsd-sockets:make-inet-address ip-address))
(defun sk-connect (sock address port)
(let ((ip-address (if (stringp address) (inet-addr address) address)))
(sb-bsd-sockets:socket-connect sock ip-address port)))
(defun sk-file-desc (sock)
(sb-bsd-sockets:socket-file-descriptor sock))
(defun sk-close (sock)
(let ((fd (sb-bsd-sockets:socket-file-descriptor sock)))
(sb-sys:invalidate-descriptor fd)
(sb-bsd-sockets:socket-close sock)))
(defun sk-make ()
(let ((sock (make-instance 'sb-bsd-sockets:inet-socket
:type :stream :protocol :tcp)))
(setf (sb-bsd-sockets:sockopt-reuse-address sock) t)
sock))
;;-------------------------------------------------------------------------
;; convenience utils
(defun log-debug (fmt &rest args)
(apply #'format *standard-output* fmt args)
(finish-output *standard-output*))
(defun string-from-bytes (bytes)
(map 'string #'code-char bytes))
(defun string-strip (str)
"Remove spaces, tabs and line enders from a string."
(declare (string str))
(string-trim (list #\Space #\NewLine #\Return #\Tab #\Nul) str))
(defun string-join (lines)
(let ((result (make-array '(0) :element-type 'character :fill-pointer 0 :adjustable t)))
(with-output-to-string (stream result)
(dolist (line lines)
(string-strip result)))
(defun string-split (str &key (delim " ") (limit nil))
"Returns a list of words in STR broken at the DELIM boundary."
(labels ((pop-word (str &key (delim " "))
"Returns the first word and rest of STR broken at DELIM."
(let ((start (position delim str :test 'string=)))
(if (null start)
(values str nil)
(let ((start2 (min (length str)
(+ 1 start))))
(values (subseq str 0 start)
(subseq str start2))))))
(splitter (str delim limit count)
(if (null str)
nil
(if (and (not (null limit))
(>= count limit))
(list str)
(multiple-value-bind (word rest) (pop-word str :delim delim)
(if (not (zerop (length word)))
(append (list (string-strip word))
(splitter rest delim limit (incf count)))
(splitter rest delim limit (incf count))))))))
(splitter str delim limit 0)))
(defun string-lines (string)
(let ((result ()))
(with-input-from-string (stream string)
(loop
:for line = (read-line stream nil 'eof)
:while (not (eql line 'eof))
:do (setf result (pushnew line result))))
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
(nreverse result)))
(defun string-contains (string str-or-char-list)
"Returns true if STR-OR-CHAR-LIST is contained in STRING."
(declare (type string string))
(search (coerce str-or-char-list 'string) string :test #'string=))
;;-------------------------------------------------------------------------
;; frame
(defgeneric set-destination (frame destination)
(:documentation "Set the destination header for the frame."))
(defgeneric get-header (frame key)
(:documentation ""))
(defgeneric get-destination (frame)
(:documentation ""))
(defgeneric set-header (frame key value)
(:documentation ""))
(defclass frame ()
((name
:initform "MESSAGE"
:initarg :name
:accessor frame-name-of)
(headers
:initform ()
:initarg :headers
:accessor frame-headers-of)
(body
:initform ""
:initarg :body
:accessor frame-body-of)))
(defun make-frame (string)
;; declare some useful local functions
(labels (
(make-header (line)
(map 'list #'string-strip (string-split line :delim ":" :limit 1)))
;; frame name is first line
(find-name (source)
(first source))
;; frame headers are second lines through to empty line
(find-headers (source)
(let ((lines (rest source)))
(loop
:for line :in lines
:while (> (length line) 0)
:collect (make-header line))))
;; frame body is all lines after the empty line
(find-body (source)
(let* ((lines (reverse source))
(result (loop
:for line :in lines
:while (> (length line) 0)
:collect line)))
(string-join (nreverse result)))))
(let ((lines (string-lines string)))
(let ((name (find-name lines))
(headers (find-headers lines))
(body (find-body lines)))
(make-instance 'frame :name name :headers headers :body body)))))
(defmethod print-object ((self frame) stream)
(with-slots (name headers body) self
(format stream "~A~%" name)
(dolist (header headers)
(format stream "~a:~a~%" (first header) (second header)))
(format stream "~%~a~%~a~%" body (code-char 0))))
(defmethod set-destination ((self frame) destination)
(set-header self "destination" destination))
(defmethod get-header ((self frame) key)
(with-slots (headers) self
(second (assoc key headers :test #'string=))))
(defmethod get-destination ((self frame))
(get-header self "destination"))
(defmethod set-header ((self frame) key value)
(with-slots (headers) self
(if (not (assoc key headers :test #'string=))
(setf headers (append (list (list key value)) headers))
(let ((result))
(dolist (header headers)
(if (string= (first header) key)
(push (list key value) result)
(push header result)))
(format t "result: ~s~%" result)
(setf headers result)))))
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
;;-------------------------------------------------------------------------
;; stomp
(defgeneric connect (connection &key user pass)
(:documentation "Connect to the stomp server."))
(defgeneric disconnect (connection)
(:documentation "Disconnection from the stomp server."))
(defgeneric send (connection frame)
(:documentation "Send a string to stomp."))
(defgeneric post (connection message destination)
(:documentation "Post a message to a destination."))
(defgeneric register (connection callback destination)
(:documentation "Register a listener for messages to a destination."))
(defgeneric receive (connection)
(:documentation "Read data from stomp."))
(defgeneric subscribe (connection destination)
(:documentation "Subscribe to a topic."))
(defgeneric start (connection)
(:documentation "Start listening for messages from stomp."))
(defgeneric apply-callbacks (instance frame)
(:documentation "Send message to registered callbacks."))
(defgeneric stop (connection)
(:documentation "Stop the connection with stomp."))
(defclass stomp-connection ()
((host
:initform "localhost"
:initarg :host
:accessor host-of)
(port
:initform 61613
:initarg :port
:accessor port-of)
(ip
:initform "127.0.0.1"
:initarg :ip
:accessor ip-of)
(socket
:initform nil
:initarg :socket
:accessor socket-of)
(fd
:initform nil
:initarg :fd
:accessor fd-of)
(handler
:initform nil
:initarg :handler
:accessor handler-of)
(callbacks
:initform ()
:initarg :callbacks
:accessor callbacks-of)
(terminate
:initform nil)))
(defun make-connection (host port)
(make-instance 'stomp-connection :host host :port port))
(defmethod connect ((self stomp-connection) &key user pass)
(handler-case
;; open the socket and send the connect string
(with-slots (host port ip socket fd handler) self
(setf ip (ip-addr-of host))
(setf socket (sk-make))
(sk-connect socket ip port)
(setf fd (sk-file-desc socket))
(setf handler (sb-sys:add-fd-handler fd :input (lambda (x)
(declare (ignore x))
(receive self))))
(let ((frame (make-instance 'frame :name "CONNECT")))
(if user
(set-header frame "login" user))
(if pass
(set-header frame "passcode" pass))
(send self frame)))
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
;; just log the error for now
(condition (c)
(setf (socket-of self) nil)
(log-debug "error-on-connect: ~a" c))))
(defmethod subscribe ((self stomp-connection) destination)
(let ((frame (make-instance 'frame :name "SUBSCRIBE")))
(set-destination frame destination)
(send self frame)))
(defmethod start ((self stomp-connection))
(with-slots (terminate) self
(setf terminate nil)
(loop
:until terminate
:do (sb-sys:serve-all-events 1))
(log-debug "terminated")))
(defmethod stop ((self stomp-connection))
(with-slots (terminate) self
(setf terminate t)))
(defmethod send ((self stomp-connection) (frame frame))
(with-slots (fd) self
(let ((stream (sb-sys:make-fd-stream fd :output t :element-type 'character :buffering :none)))
(print frame stream)
(finish-output stream))))
(defmethod send ((self stomp-connection) string)
(with-slots (fd) self
(let ((stream (sb-sys:make-fd-stream fd :output t :element-type '(unsigned-byte 8)
(write-sequence (sb-ext:string-to-octets string) stream)
(finish-output stream))))
(defmethod apply-callbacks ((self stomp-connection) (frame frame))
(with-slots (callbacks) self
(let ((destination (get-destination frame)))
(loop
:for (dest func) :in callbacks
:do (if (string= dest destination)
(funcall func frame))))))
(defmethod receive ((self stomp-connection))
"Called whenever there's activity on the file descriptor."
(with-slots (socket fd handler callbacks) self
(let ((str (sb-sys:make-fd-stream fd :input t :element-type '(unsigned-byte 8)
:for b = (read-byte str nil 'eof)
:while (listen str)
:collect b)))
;; if we got something, send it to all the
;; registered callbacks
(let ((frame (make-frame (string-from-bytes buffer))))
(apply-callbacks self frame))
;; otherwise, it means the other end has terminated,
;; so close things down (eventually go into a sleep/wait
;; loop in case of reconnection
(handler-case
(progn
(log-debug "nothing to read from socket~%")
(disconnect self)
(sb-sys:remove-fd-handler handler)
(sk-close socket))
(condition (c)
(log-debug "close: ~a" c))))))))
(defmethod register ((self stomp-connection) callback destination)
(with-slots (callbacks) self
(subscribe self destination)
(setf callbacks (append callbacks (list (list destination callback))))))
(defmethod post ((self stomp-connection) message destination)
(let ((frame (make-instance 'frame :name "SEND" :body message)))
(set-destination frame destination)
(send self frame)))
(defmethod disconnect ((self stomp-connection))
(let ((frame (make-instance 'frame :name "DISCONNECT")))
(send self frame))
(with-slots (handler socket) self
(handler-case