/[de-setf-amqp]/rabbitmq/message.lisp
ViewVC logotype

Contents of /rabbitmq/message.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 9748 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;;-* Package: rabbitmq; -*-
2
3 (in-package "RABBITMQ")
4
5 ;; MESSAGE.LISP
6 ;; Nick Levine, Ravenbrook Limited, 2007-09-21
7 ;; James Anderson, setf.de, 2010-02-04
8 ;;
9 ;; 1. INTRODUCTION
10 ;;
11 ;; This document is to implement a class to emulate RabbitMQ messages.
12 ;;
13 ;; See Appendix C below for copyright and license.
14
15
16 ;; 2. MESSAGE
17
18 (defclass message ()
19 ((body
20 :initform nil :initarg :body
21 :accessor message-raw-body)
22 (properties
23 :initform nil :initarg :properties
24 :accessor message-properties)
25 (content-type
26 :initform nil :initarg :content-type
27 :accessor message-content-type
28 :type (or (member t nil) mime:mime-type)
29 :documentation "A MIME-TYPE instance specifies the encoding type.")
30 (envelope
31 :initform nil :initarg :envelope
32 :accessor message-envelope)))
33
34
35 ;;; timestamp epoch shifts -
36 ;;; Instance field/slot values are universal times and buffer accessors perform necessary
37 ;;; epoch shift integral with the access. As auch, decoded property/argument values are
38 ;;; universal times. If any code needs times in UNIX epoch, it shouldn't store or pass the values
39 ;;; through here."
40
41 (defun message-timestamp (message)
42 (getf (message-properties message) :timestamp))
43
44 (defun (setf message-timestamp) (new-value message)
45 (setf (getf (message-properties message) :timestamp) new-value))
46
47
48 (defclass outgoing-message (message)
49 ((properties
50 :initform (new-basic-properties))))
51
52 (defclass incoming-message (message)
53 ())
54
55 (defclass envelope ()
56 ((exchange
57 :initform nil :initarg :exchange
58 :accessor envelope-exchange)
59 (routing-key
60 :initform nil :initarg :routing-key
61 :accessor envelope-routing-key)
62 (delivery-tag
63 :initform nil :initarg :delivery-tag
64 :accessor envelope-delivery-tag)))
65
66 (defun make-envelope (&key exchange routing-key delivery-tag &allow-other-keys)
67 (make-instance 'envelope :exchange exchange
68 :routing-key routing-key
69 :delivery-tag delivery-tag))
70
71 (defun new-basic-properties ()
72 `(:delivery-mode 1 :priority 0))
73
74 (defun new-message (&key timestamp)
75 (let ((message (make-instance 'outgoing-message)))
76 (setf (message-timestamp message)
77 (case timestamp
78 ((t) (get-universal-time))
79 ((nil) 0)
80 (otherwise timestamp)))
81 message))
82
83
84
85 ;; 3. PROPERTIES
86 ;;
87 ;; TBD (if anyone ever wants them): clusterId and headers
88
89 (defun message-body (message)
90 (etypecase (message-content-type message)
91 (null nil)
92 ((eql t) (message-raw-body message))
93 (mime:text/* (message-body-string message))
94 (mime:application/octet-stream (message-body-data message))))
95
96 (defun (setf message-body) (new-value message)
97 (typecase new-value
98 (null
99 (setf (message-content-type message) nil)
100 (setf (message-raw-body message) nil))
101 (string
102 (setf (message-content-type message) mime:text/plain)
103 (setf (message-body-string message) new-value))
104 (simple-vector
105 (setf (message-content-type message) mime:application/octet-stream)
106 (setf (message-body-data message) new-value))
107 (otherwise
108 (setf (message-content-type message) t)
109 (setf (message-raw-body message) new-value))))
110
111
112 (defun message-body-size (message)
113 (length (message-body message)))
114
115
116 (defmacro def-message-property (accessor property)
117 `(progn
118 (defmethod ,accessor ((message message))
119 (getf (message-properties message) ',property))
120 (defmethod (setf ,accessor) (value (message message))
121 (setf (getf (message-properties message) ',property) value))
122 ',accessor))
123
124 ;; "The Basic class provides methods that support an industry-standard messaging model."
125
126 (def-message-property message-id :message-id)
127 (def-message-property message-application-id :app-id)
128 (def-message-property message-content-encoding :content-encoding)
129 (def-message-property message-correlation-id :correlation-id)
130 (def-message-property message-delivery-mode :delivery-mode)
131 (def-message-property message-expiration :expiration)
132 (def-message-property message-reply-to :reply-to)
133 (def-message-property message-priority :priority)
134 (def-message-property message-type :type)
135 (def-message-property message-user-id :userId)
136
137 (defun message-origin (message)
138 (format nil "~a/~a"
139 (message-reply-to message)
140 (message-id message)))
141
142
143 (defun message-delivery-persistent (message)
144 (eql (message-delivery-mode message) 2))
145
146 (defun (setf message-delivery-persistent) (new-value message)
147 (setf (message-delivery-mode message)
148 (if new-value 2 1))
149 new-value)
150
151
152 (defun message-raw-message-content-type (message)
153 (let ((type (message-content-type message)))
154 (when type (symbol-name (type-of type)))))
155
156 (defun (setf message-raw-message-content-type) (type message)
157 (setf (message-content-type message)
158 (etypecase type
159 ((or null (eql t)) type)
160 (string (mime:mime-type type)))))
161
162
163
164 (defun message-exchange (message)
165 (envelope-exchange (message-envelope message)))
166
167 (defun message-routing-key (message)
168 (envelope-routing-key (message-envelope message)))
169
170 (defun message-delivery-tag (message)
171 (envelope-delivery-tag (message-envelope message)))
172
173
174 ;; 4. METHODS
175
176 (defgeneric message-body-string (message)
177 (:method ((message string)) message)
178 (:method ((message vector))
179 (map 'string #'code-char message))
180 (:method ((message message))
181 (message-body-string (message-raw-body message))))
182
183 (defmethod (setf message-body-string) (new-value (self outgoing-message))
184 (setf (message-raw-body self)
185 (map 'vector #'char-code new-value))
186 new-value)
187
188 (defun message-body-data (message &key (element-type t))
189 (let* ((body (or (message-raw-body message)
190 (return-from message-body-data
191 nil)))
192 (data (make-array (length body) :element-type element-type)))
193 (typecase data
194 (simple-string (map-into data #'(lambda (x) (code-char x)) body))
195 (t (replace data body)))
196 data))
197
198 (defmethod (setf message-body-data) (new-value (self outgoing-message))
199 (let* ((length (length new-value)))
200 (setf (message-raw-body self)
201 (typecase new-value
202 (simple-vector (make-array length :element-type '(unsigned-byte 8)
203 :initial-contents new-value))
204 (simple-string (map 'vector #'char-code new-value))
205 (t (map 'vector #'(lambda (x) (assert (typep x '(unsigned-byte 8))) x)
206 new-value)))))
207 new-value)
208
209 (defun message-first-byte (message)
210 (let ((raw-body (message-raw-body message)))
211 (when raw-body
212 (aref raw-body 0))))
213
214
215 (defun full-publish (message channel ticket exchange routing-key mandatory immediate)
216 (declare (ignore ticket))
217 (amqp:request-publish (amqp:basic channel)
218 :exchange exchange
219 :routing-key routing-key
220 :mandatory mandatory
221 :immediate immediate
222 :body (message-body message)))
223
224 (defun publish (message channel exchange routing-key)
225 (full-publish message channel (channel-ticket channel) exchange routing-key nil nil))
226
227 (defun destroy-message (message)
228 (declare (ignore message))
229 nil)
230
231 (defun full-consume-queue (channel ticket queue consumer-tag no-local no-ack exclusive)
232 (declare (ignore ticket))
233 (amqp:request-consume (amqp:basic channel)
234 :queue queue
235 :consumer-tag consumer-tag
236 :no-local no-local
237 :no-ack no-ack
238 :exclusive exclusive))
239
240
241 (defun consume-queue (channel queue)
242 (with-alive-channel (channel)
243 (let ((consumer (make-instance 'queueingconsumer. :channel channel)))
244 ;; allow server to generate the consumerTag
245 (Channel.basicConsume channel (channel-ticket channel) queue nil consumer))))
246
247 (defun full-cancel-queue (channel consumer-tag)
248 (amqp:request-cancel channel (amqp:basic channel)
249 :consumer-tag consumer-tag))
250
251 (defun cancel-queue (channel &key (consumer-tag (channel-consumer-tag channel)))
252 (full-cancel-queue channel consumer-tag))
253
254
255 (defun acknowledge-delivery (channel message)
256 (let ((delivery-tag (message-delivery-tag message)))
257 (amqp:request-ack (amqp:basic channel)
258 :delivery-tag delivery-tag
259 :multiple nil)))
260
261
262 ;; A. REFERENCES
263 ;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
264 ;; [2] http://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.1/rabbitmq-java-client-javadoc-1.7.1/
265 ;;
266 ;; B. HISTORY
267 ;;
268 ;; 2007-09-21 NDL Created.
269 ;; 2010-02-04 JAA Emulation / de.setf.amqp.
270 ;;
271 ;;
272 ;; C. COPYRIGHT
273 ;;
274 ;; Copyright (c) 2007 Wiinz Limited.
275 ;; Copyright (c) 2010 james.anderson@setf.de
276 ;;
277 ;; See `rabbitmq.asd` for the license terms for the original org.levine.rabbitmq package.
278
279 ;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
280 ;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
281 ;;;
282 ;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
283 ;;; it under the terms of the GNU General Public License as published by
284 ;;; the Free Software Foundation as version 3 of the License.
285 ;;;
286 ;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
287 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
288 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
289 ;;; GNU General Public License for more details.
290 ;;;
291 ;;; You should have received a copy of the GNU General Public License
292 ;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).

  ViewVC Help
Powered by ViewVC 1.1.5