/[de-setf-amqp]/rabbitmq/channel.lisp
ViewVC logotype

Contents of /rabbitmq/channel.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 16276 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;;-* Package: rabbitmq; -*-
2 ;; $Id: //info.ravenbrook.com/user/ndl/lisp/cl-rabbit/channel.lisp#2 $
3
4 (in-package :rabbitmq)
5
6 ;; CHANNEL.LISP
7 ;; Nick Levine, Ravenbrook Limited, 2007-09-20
8 ;; James Anderson, setf.de, 2010-02-04
9 ;;
10 ;; 1. INTRODUCTION
11 ;;
12 ;; The purpose of this document is to implement a lisp interface to AMQP channels, consistent with the
13 ;; RabbitMQ API. It emulates the original com.nicklevine.rabbitmq version, which was layered over
14 ;; RabbitMQ/Java
15 ;;
16 ;; See Appendix C below for copyright and license.
17
18
19 ;; 2. OPEN & CLOSE
20
21 (defun new-channel (connection)
22 (amqp:connection.channel connection :number t))
23
24
25 (defmacro with-alive-channel ((channel &key (if-dead :error)) &body body)
26 (rebinding (channel)
27 `(if (channel-alive ,channel)
28 (progn ,@body)
29 ,@(case if-dead
30 ((:error)
31 `((progn (channel-not-alive ,channel)
32 ;; prevent tail call, aid debugging
33 nil)))))))
34
35 (defun channel-not-alive (channel)
36 (error 'channel-not-alive :channel channel))
37
38 (define-condition channel-not-alive (error)
39 ((channel :reader channel-not-alive-channel :initform nil :initarg :channel))
40 (:report (lambda (condition stream)
41 (format stream "Channel~@[ ~a~] is no longer alive"
42 (channel-not-alive-channel condition)))))
43
44 (defmacro with-channel ((channel connection) &body body)
45 (rebinding (connection)
46 `(multiple-value-prog1
47 (let ((,channel (new-channel ,connection)))
48 (unwind-protect
49 (progn ,@body)
50 (destroy-channel ,channel)))
51 (check-connection-alive ,connection))))
52
53 (defun destroy-channel (channel &key (code 0) (message "closed by application"))
54 (with-alive-channel (channel :if-dead nil)
55 (handler-case (amqp:request-close channel
56 :reply-code code
57 :reply-text message)
58 (channel-not-alive () ())))
59 channel)
60
61
62 ;; 3. CHANNEL SUBCLASS
63
64 (defclass channel ()
65 ((the-consumer
66 :initform nil
67 :reader the-consumer :writer setf-the-consumer)))
68
69 ;;; adjust the amqp:channel class to fit with the jfli-based class
70 (interpose-superclass 'channel 'amqp:channel)
71
72 (defmethod channel-ticket ((channel channel))
73 (amqp.utility:channel-ticket channel))
74
75 ;;; define a consumer and a deilvery
76
77 (defclass queueingconsumer. ()
78 ((queue :initform (make-instance 'amqp.utility:queue)
79 :reader consumer-queue)
80 (channel
81 :initform nil :initarg :channel
82 :accessor consumer-channel))
83 (:documentation "The queueing consumer interacts with the channel to accept,
84 parse and queue incoming messages"))
85
86
87 (defmethod consumer-next-delivery ((consumer queueingconsumer.))
88 (unless (consumer-empty-p consumer)
89 (amqp.utility:dequeue (consumer-queue consumer))))
90
91
92 ;;;!!! this needs to look through the queue to see if there is any pending i/o
93 ;;;!!! andd , if so, read it and push it through the processing pipeline
94 ;;;!!! once there is nothing wating the count is up-to-date
95
96 (defmethod consumer-empty-p ((consumer QueueingConsumer.))
97 (and (amqp.utility:collection-empty-p (consumer-queue consumer))
98 (let ((count 0))
99 (amqp:command-loop ((consumer-channel consumer) :wait nil)
100 (amqp:deliver ((class amqp:basic) &key &allow-other-keys)
101 ;; just count, but don't handle
102 (print (incf count))
103 ;; allow the next handler to process and queue the result
104 nil))
105 (zerop count))))
106
107
108 (defmethod consumer-empty-p ((consumer null))
109 t)
110
111 (defmethod consumer-arrived-count ((consumer QueueingConsumer.))
112 (if (consumer-empty-p consumer)
113 0
114 (amqp.utility:collection-size (consumer-queue consumer))))
115
116 (defmethod consumer-arrived-count ((consumer null))
117 0)
118
119
120 (defclass queueingconsumer$delivery (incoming-message)
121 ()
122 (:documentation "The delivery class holds the delivered payload."))
123
124
125
126 (defmethod Channel.basicConsume ((channel AMQP-1-1-0-8-0:channel) ticket queue no-ack consumer)
127 (check-type consumer queueingconsumer.)
128 (amqp:request-consume (amqp:basic channel) :ticket ticket :no-ack no-ack :queue queue)
129 (setf (de.setf.amqp.implementation::channel-command channel 'amqp:deliver)
130 #'(lambda (channel class method &rest args)
131 (let* ((body (apply #'amqp:respond-to-deliver class args))
132 (message (make-instance 'queueingconsumer$delivery
133 :body body
134 :content-type (amqp.utility:class-mime-type class)
135 :envelope (apply #'make-envelope
136 (amqp.utility:method-arguments method))
137 :properties (amqp.utility:class-properties class)))
138 (consumer (the-consumer channel)))
139 (when consumer
140 (amqp.utility:enqueue message (consumer-queue consumer)))
141 message)))
142 (setf (consumer-channel consumer) channel)
143 (setf-the-consumer consumer channel)
144 channel)
145
146
147 (defmethod Channel.basicConsume ((channel AMQP-1-1-0-9-1:channel) (ticket t) queue no-ack consumer)
148 (flet ((delivery-handler (channel class method &rest args)
149 (print :handler)
150 (let* ((body (apply #'amqp:respond-to-deliver class args))
151 (message (make-instance 'queueingconsumer$delivery
152 :body body
153 :content-type (amqp.utility:class-mime-type class)
154 :envelope (apply #'make-envelope
155 (amqp.utility:method-arguments method))
156 :properties (amqp.utility:class-properties class)))
157 (consumer (the-consumer channel)))
158 (amqp:log :error class "delivered: ~s" message)
159 (when consumer
160 (amqp.utility:enqueue message (consumer-queue consumer)))
161 message)))
162 (check-type consumer queueingconsumer.)
163 (amqp:request-consume (amqp:basic channel) :no-ack no-ack :queue queue)
164 (setf (de.setf.amqp.implementation::channel-command channel 'amqp:deliver)
165 #'delivery-handler)
166 (setf (consumer-channel consumer) channel)
167 (setf-the-consumer consumer channel)
168 channel))
169
170 (defmethod Channel.basicCancel ((channel channel) consumer-tag no-wait)
171 (prog1 (amqp:request-cancel (amqp:basic channel) :consumer-tag consumer-tag :no-wait no-wait)
172 (setf (slot-value channel 'the-consumer) nil)))
173
174
175 ;; 4. MESSAGES
176
177 (defun full-next-message (channel consumer nowait)
178 (with-alive-channel (channel)
179 (when nowait
180 (when (consumer-empty-p consumer)
181 (return-from full-next-message
182 nil)))
183 ;; TBD ?? -- shouldn't there be some waiting to do when nowait is
184 ;; false? I no longer remember my intent, and this function
185 ;; doesn't have a mirror in libamq so I can't peek at that -- NDL
186 ;; 2007-09-28
187 (let ((delivery (consumer-next-delivery consumer))
188 (basic (amqp:basic channel)))
189 (unless (amqp:basic-no-ack basic)
190 (acknowledge-delivery channel delivery))
191 delivery)))
192
193 (defun next-message (channel)
194 (full-next-message channel (the-consumer channel) t))
195
196 (defmethod consumer-empty-p ((channel channel))
197 (consumer-empty-p (the-consumer channel)))
198
199 (defun channel-arrived-count (channel)
200 (with-alive-channel (channel)
201 (consumer-arrived-count (the-consumer channel))))
202
203 (defun channel-arrived-count-or-nil (channel)
204 (let ((count (channel-arrived-count channel)))
205 (when (plusp count)
206 count)))
207
208 (defun channel-wait (channel timeout)
209 (when (zerop timeout)
210 (error "~s called with zerop timeout. If you really wanted that, call ~s instead"
211 'channel-wait 'channel-wait-forever))
212 (with-alive-channel (channel)
213 (let ((consumer (the-consumer channel)))
214 (assert consumer ()
215 "No consumer present to satisfy wait criteria: ~s." channel)
216 (let ((deadline (+ timeout (get-internal-run-time))))
217 (loop (when (or (>= (get-internal-run-time) deadline)
218 (not (consumer-empty-p consumer)))
219 (return))
220 (sleep 0.01))))))
221
222 (defun channel-wait-forever (channel)
223 (with-alive-channel (channel)
224 (let ((consumer (the-consumer channel)))
225 (assert consumer ()
226 "No consumer present to satisfy wait criteria: ~s." channel)
227 (loop (unless (consumer-empty-p consumer)
228 (return))
229 (sleep 0.01)))))
230
231
232 ;; 5. PROPERTIES
233
234 (defun channel-alive (channel)
235 (open-stream-p channel))
236
237 (defun channel-consumer-count (channel)
238 (values 1
239 (list (the-consumer channel))))
240
241 (defun channel-consumer-tag (channel)
242 (amqp:basic-consumer-tag (amqp:basic channel)))
243
244
245 ;; 6. EXCHANGE
246
247 (defmacro with-exchange-type ((type) &body body)
248 `(let ((,type (ecase ,type
249 ((:fanout) "fanout") ;; ignores routing-key
250 ((:direct) "direct") ;; exact match on routing-key
251 ((:topic) "topic") ;; pattern matching on routing-key
252 )))
253 ,@body))
254
255 (defgeneric full-declare-exchange (channel ticket exchange type passive durable auto-delete no-wait arguments)
256 (:method ((channel AMQP-1-1-0-9-1:channel) ticket exchange type passive durable auto-delete no-wait arguments)
257 (declare (ignore ticket auto-delete))
258 (with-alive-channel (channel)
259 (amqp:request-declare (amqp:exchange channel)
260 :exchange exchange
261 :type type
262 :passive passive
263 :durable durable
264 :no-wait no-wait
265 :arguments arguments)))
266 (:method ((channel AMQP-1-1-0-8-0:channel) ticket exchange type passive durable auto-delete no-wait arguments)
267 (declare (ignore no-wait))
268 (with-alive-channel (channel)
269 (amqp:request-declare (amqp:exchange channel)
270 :ticket ticket
271 :exchange exchange
272 :type type
273 :passive passive
274 :durable durable
275 :auto-delete auto-delete
276 :arguments arguments))))
277
278 (defun declare-exchange (channel exchange type)
279 (with-exchange-type (type)
280 (full-declare-exchange channel (channel-ticket channel) exchange type nil nil nil nil nil)))
281
282 (defun full-test-exchange (connection exchange type durable auto-delete arguments)
283 (with-channel (channel connection)
284 (trapping-not-found
285 (full-declare-exchange channel (channel-ticket channel) exchange type t durable auto-delete nil arguments)
286 t)))
287
288 (defun test-exchange (connection exchange type)
289 (with-exchange-type (type)
290 (full-test-exchange connection exchange type nil nil nil)))
291
292
293 (defgeneric full-delete-exchange (channel ticket exchange if-unused)
294 (:method ((channel AMQP-1-1-0-9-1:channel) ticket exchange if-unused)
295 (declare (ignore ticket))
296 (with-alive-channel (channel)
297 (amqp:request-delete (amqp:exchange channel) :exchange exchange :if-unused if-unused)))
298 (:method ((channel AMQP-1-1-0-8-0:channel) ticket exchange if-unused)
299 (with-alive-channel (channel)
300 (amqp:request-delete (amqp:exchange channel) :ticket ticket :exchange exchange :if-unused if-unused))))
301
302 (defun delete-exchange (channel exchange)
303 (full-delete-exchange channel (channel-ticket channel) exchange nil))
304
305
306 ;; 7. QUEUE
307
308 (defgeneric full-declare-queue (channel ticket queue passive durable exclusive auto-delete arguments)
309 (:method ((channel AMQP-1-1-0-9-1:channel) ticket queue passive durable exclusive auto-delete arguments)
310 (declare (ignore ticket))
311 (with-alive-channel (channel)
312 (amqp:request-declare (amqp:queue channel)
313 :queue queue
314 :passive passive
315 :durable durable
316 :exclusive exclusive
317 :auto-delete auto-delete
318 :arguments arguments)))
319 (:method ((channel AMQP-1-1-0-8-0:channel) ticket queue passive durable exclusive auto-delete arguments)
320 (with-alive-channel (channel)
321 (amqp:request-declare (amqp:queue channel)
322 :ticket ticket
323 :queue queue
324 :passive passive
325 :durable durable
326 :exclusive exclusive
327 :auto-delete auto-delete
328 :arguments arguments))))
329
330 (defun declare-queue (channel queue)
331 (full-declare-queue channel (channel-ticket channel) queue nil nil nil nil nil))
332
333 (defun full-test-queue (connection queue durable exclusive auto-delete arguments)
334 (with-channel (channel connection)
335 (trapping-not-found
336 (amqp:request-declare (amqp:queue channel)
337 :ticket (channel-ticket channel)
338 :queue queue
339 :passive t
340 :durable durable
341 :exclusive exclusive
342 :auto-delete auto-delete
343 :arguments arguments)
344 t)))
345
346 (defun test-queue (connection queue)
347 (full-test-queue connection queue nil nil nil nil))
348
349 (defun full-delete-queue (channel ticket queue if-unused if-empty)
350 (with-alive-channel (channel)
351 (amqp:request-delete (amqp:queue channel)
352 :ticket ticket
353 :queue queue
354 :if-unused if-unused
355 :if-empty if-empty)))
356
357 (defun delete-queue (channel queue)
358 (full-delete-queue channel (channel-ticket channel) queue nil nil))
359
360
361 (defgeneric full-bind-queue (channel ticket queue exchange routing-key arguments)
362 (:method ((channel AMQP-1-1-0-9-1:channel) ticket queue exchange routing-key arguments)
363 (declare (ignore ticket))
364 (with-alive-channel (channel)
365 (amqp:request-bind (amqp:queue channel)
366 :queue queue
367 :exchange exchange
368 :routing-key routing-key
369 :arguments arguments)))
370 (:method ((channel AMQP-1-1-0-8-0:channel) ticket queue exchange routing-key arguments)
371 (with-alive-channel (channel)
372 (amqp:request-bind (amqp:queue channel)
373 :ticket ticket
374 :queue queue
375 :exchange exchange
376 :routing-key routing-key
377 :arguments arguments))))
378
379 (defun bind-queue (channel queue exchange routing-key)
380 (full-bind-queue channel (channel-ticket channel) queue exchange routing-key nil))
381
382
383
384 ;; A. REFERENCES
385 ;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
386 ;; [2] http://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.1/rabbitmq-java-client-javadoc-1.7.1/
387 ;;
388 ;; B. HISTORY
389 ;;
390 ;; 2007-09-20 NDL Created.
391 ;; 2010-02-04 JAA Emulation / de.setf.amqp.
392 ;;
393 ;;
394 ;; C. COPYRIGHT
395 ;;
396 ;; Copyright (c) 2007 Wiinz Limited.
397 ;; Copyright (c) 2010 james.anderson@setf.de
398 ;;
399 ;; See `rabbitmq.asd` for the license terms for the original org.levine.rabbitmq package.
400
401 ;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
402 ;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
403 ;;;
404 ;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
405 ;;; it under the terms of the GNU General Public License as published by
406 ;;; the Free Software Foundation as version 3 of the License.
407 ;;;
408 ;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
409 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
410 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
411 ;;; GNU General Public License for more details.
412 ;;;
413 ;;; You should have received a copy of the GNU General Public License
414 ;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).

  ViewVC Help
Powered by ViewVC 1.1.5