/[de-setf-amqp]/commands.lisp
ViewVC logotype

Contents of /commands.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (hide annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 33250 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 janderson 3 ;;; -*- Package: de.setf.amqp.implementation; -*-
2    
3     (in-package :de.setf.amqp.implementation)
4    
5     (document :file
6     (description "This file defines the protocol operators for AMQP `class` and `METHOD` entities for the
7     'de.setf.amqp' library.")
8     (copyright
9     "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
10     "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
11     of the GNU Affero General Public License as published by the Free Software Foundation.
12    
13     'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
14     implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15     See the Affero General Public License for more details.
16    
17     A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
18     If not, see the GNU [site](http://www.gnu.org/licenses/).")
19    
20     (long-description "Each (object . method) combination corresponds to several operators, which act in concert
21     to implement the protocol:
22    
23     - `respond-to-_method_` peforms the command on a client object in response to a broker message.
24     This includes changes to instance state, open/close side effects for `connection` and `channel` methods,
25     instantiating and binding, or releasing any related, as well as any requisite broker message responses
26     as confirmation or as further processing.
27    
28     - `request-_method_` issues the request to the broker, together with any client object operations
29     required by the protocol.
30    
31     - `send-_method_` encodes frames and performs transport-level operations to send the command to
32     the broker. This delegates to protocol-specific methods, which encode the respective arguments, and to
33     the network device operations for the network stream functions.
34    
35     - no explicit receive operators are defined, as messages are self-describing and decoded accordingly.
36     Application code is writtein in terms of `command-case` or `command-loop` statements which dispatch based
37     on received commands type.
38    
39     The `def-amqp-command` forms below define the protocol class and the generic method operators.
40     The `:response` and `:request` clauses include methods as appropriate to whether both the broker and
41     the client implement the operation. A `:request` clause automatically defined a `send-` operators.
42     An additional (possibly blank) `:send` clause can be included if sending must be supported in addition to
43     a complete command request.
44    
45     The respective respond-to and and request operators are implemented in two layers.
46     The interface operator, which uses the elementary name, is implemented in terms of a second
47     operator: `channel-respond-to-`, or `channel-request-`, which requires an additional initial argument,
48     the `channel`. The delegation call interposes the respective `objects-channel` value as this initial
49     argument. The specialized methods are defined with `amqp:channel` as the initial specialization.
50    
51     The interface architecture makes it possible for applications to alter the api behavior by specializing
52     just the channel, just the protocol class, or both."))
53    
54    
55    
56    
57     (defmacro response-function (name)
58     "For use as the intiform for method response functions, if the target is defined, ok. Otherwise use instead
59     the decult response function, which signals an error."
60     `(if (fboundp ',name) ',name 'default-channel-respond-to))
61    
62    
63     (defgeneric default-channel-respond-to
64     (channel class &rest args)
65     (:documentation "the base protocol response operator for alert.")
66     (:method :before ((channel t) (class t) &rest args) "a before method logs the response-to-be and updates the class instance."
67     (declare (dynamic-extent args))
68     (amqp:log* default-channel-respond-to class args))
69     (:method ((channel amqp:channel) (class t) &rest args)
70     (amqp:not-implemented-error :message-string "Unimplemented method: ~s . ~s"
71     :message-arguments (list class args))))
72    
73    
74     (def-amqp-command amqp:ack (class &key delivery-tag multiple)
75     (:documentation "C-->S : acknowledge one of more messages")
76    
77     (:request
78     (:method ((class amqp::basic) &rest args &key delivery-tag multiple)
79     (declare (ignore delivery-tag multiple))
80     (apply #'amqp:send-ack class args)
81     class)
82    
83     (:method ((class amqp::file) &rest args &key delivery-tag multiple)
84     (declare (ignore delivery-tag multiple))
85     (apply #'amqp:send-ack class args)
86     class)))
87    
88    
89     (def-amqp-command amqp:alert (class &key reply-code reply-text details)
90     (:documentation "C<--S : send a non-fatal warning message : Async , carries content ")
91    
92     (:response
93     (:method ((class amqp::channel) &rest args)
94     (declare (ignore args))
95     "Do nothing more than log the message."
96     class)))
97    
98    
99     (def-amqp-command amqp:bind (class &key ticket queue exchange routing-key no-wait arguments)
100     (:documentation "C-->S: Bind queue to an exchange")
101    
102     (:request
103     (:method ((queue-class amqp::queue) &rest args &key ticket queue exchange routing-key no-wait arguments)
104     (declare (dynamic-extent args))
105    
106     (assert-argument-types amqp:bind
107     (ticket integer nil)
108     (queue (or string amqp:queue))
109     (exchange (or string amqp:exchange))
110     (routing-key string nil)
111     (no-wait amqp:bit nil)
112     (arguments list nil))
113    
114     (setf exchange (amqp:exchange-exchange exchange))
115     (setf queue (amqp:queue-queue queue))
116    
117     (apply #'amqp::send-bind queue-class :exchange exchange :queue queue
118     args)
119     (command-loop (queue-class)
120     (amqp:bind-ok (queue)
121     (amqp:log :debug queue "bound.")
122     (return-from command-loop))
123     (t
124     (class &rest args)
125     (amqp:log :warn class "Unexpected response: ~s . ~s" class args)
126     (return-from command-loop)))
127     queue-class)))
128    
129    
130     (def-amqp-command amqp:bind-ok (class &key)
131     (:documentation "C<--S : Confirm bind successful.
132     This command appears as eventual response to a Bind, and should be processed
133     synchronously by a request-bind. If one appears independently, log it.
134     and continue.")
135    
136     (:response
137     (:method ((queue amqp:queue) &key)
138     "Simply log and continue."
139     queue)))
140    
141    
142     (def-amqp-command amqp:cancel (class &rest args &key consumer-tag no-wait)
143     (:documentation "C-->S :
144     This method cancels a consumer. This does not affect already delivered messages, but it does mean the
145     server will not send any more messages for that consumer. The client may receive an arbitrary number of
146     messages in between sending the cancel method and receiving the cancel-ok reply.")
147    
148     (:request
149     (:method ((basic amqp::basic) &rest args &key consumer-tag no-wait)
150     (declare (dynamic-extent args))
151     (assert-argument-types amqp:cancel
152     (consumer-tag (amqp:string 8) nil)
153     (no-wait amqp:bit nil))
154    
155     (apply #'amqp:send-cancel basic args)
156    
157     (command-loop (basic)
158     ;; skip everything except the -ok
159     (amqp:cancel-ok (class) (return-from command-loop))
160     (amqp:header (frame) t)
161     (amqp:body (frame) t))
162     basic)))
163    
164    
165     (def-amqp-command amqp:cancel-ok (class &key consumer-tag)
166     (:documentation "C<--S : confirm a canceled consumer.
167     This command appears as eventual response to Cancel and should be processed
168     synchronously by a request-cancel. If one appears independently, log it.
169     and continue.")
170    
171     (:response
172     (:method ((basic amqp::basic) &key consumer-tag)
173     (declare (ignore consumer-tag))
174     "Simply log and continue."
175     basic)))
176    
177    
178     (def-amqp-command amqp:close (class &key reply-code reply-text class-id method-id)
179     (:documentation "C<->S : request a connection or a channel close")
180    
181     (:request
182     (:method ((channel amqp:channel) &key (reply-code 0) (reply-text "") (class-id 0) (method-id 0))
183     "Perform a local channel close and forward the reauest to the broker.
184     Invoked ambivalently with close->device-close. The channel
185     state indicates the progress: if it's :close-channel, then the stream close is
186     in progress. stream close after the protocol close. The broker request
187     entails a synchronous close-ok response."
188    
189     (let ((initial-state (shiftf (channel-state channel) amqp.s:close-channel)))
190     (etypecase initial-state
191     ((or amqp.s:use-channel amqp.s:close-channel)
192     (when (connected-channel-p channel)
193     (amqp::send-close channel
194     :reply-code reply-code
195     :reply-text reply-text
196     :class-id class-id
197     :method-id method-id)
198     (command-loop (channel)
199     (amqp:header (basic &rest args)
200     (declare (dynamic-extent args))
201     (amqp:log :debug basic "Draining closed channel: ~s . ~s" basic args)
202     nil)
203     (amqp:body (basic &rest args)
204     (declare (dynamic-extent args))
205     (amqp:log :debug basic "Draining closed channel: ~s . ~s" basic args)
206     nil)
207     (amqp:close-ok (channel &key &allow-other-keys) (return-from command-loop)))
208    
209     ;; once the channel is flushed, close the stream if that's not already in progress
210     (unless (typep initial-state 'amqp.s:close-channel)
211     (device-close channel nil)))))
212     channel))
213    
214     (:method ((connection amqp:connection) &key (reply-code 0) (reply-text "")
215     (class-id (amqp:class-id connection))
216     (method-id 0))
217     "Perform a local connection close and forward the request to the broker.
218     Then close the local stream."
219    
220     (let ((initial-state (shiftf (connection-state connection) amqp.s:close-connection)))
221     (etypecase initial-state
222     ;; if in use, or closing due to stream close, then send the close, and
223     ;; check whether to close the stream.
224     ((or amqp.s:use-connection amqp.s:close-connection)
225     (amqp::send-close connection
226     :reply-code reply-code
227     :reply-text reply-text
228     :class-id class-id
229     :method-id method-id)
230    
231     (command-loop ((connection.channel connection :number 0))
232     (amqp:close-ok (connection) (return-from command-loop)))
233    
234     ;; once the connection is flushed, if the initial state was in use, close the stream
235     (typecase initial-state
236     (amqp.s:use-connection
237     (close connection)
238     ;; once it has been closed, reset to the initial state
239     (setf (connection-state connection) amqp.s:open-connection))))
240     ;; if, eg. already closing, do nothing
241     (amqp.s:connection-state ))
242     connection)))
243    
244     (:response
245     (:method ((channel amqp:channel) &key &allow-other-keys)
246     "Perform a remotely requested on the channel by sending the ok to the server and
247     disconnecting and closing the local stream."
248    
249     (when (connected-channel-p channel)
250     (amqp::send-close-ok channel)
251    
252     ;; disconnect it and close the stream
253     (disconnect-channel (channel-connection channel) channel)
254     (close channel)
255     channel))
256    
257     (:method ((connection amqp:connection) &key reply-code reply-text class-id method-id)
258     "Perform a remotely requested connection close by sending the ok to the server and
259     closing the local stream."
260     (declare (ignore reply-code reply-text class-id method-id))
261    
262     (when (open-stream-p connection)
263     (amqp::send-close-ok connection)
264    
265     ;; once the response is sent, close the stream
266     (close connection))
267     connection)))
268    
269    
270     (def-amqp-command amqp:close-ok (class &key)
271     (:documentation "C<->S : confirm a channel or connection close close : Sync response to Close.
272     This command appears as the eventual response to Cancel and should be processes
273     synchronously together with that. I one appears independently, ignore it.")
274    
275     (:request
276     (:method ((class amqp:connection) &key)
277     (amqp::send-close-ok class)
278     class)
279    
280     (:method ((class amqp:channel) &key)
281     (amqp::send-close-ok class)
282     class))
283    
284     (:response
285     (:method ((class amqp:connection) &key)
286     class)
287    
288     (:method ((class amqp:channel) &key)
289     class)))
290    
291    
292     (def-amqp-command amqp:commit (class &key)
293     (:documentation "C-->S : Commit the current transaction.")
294    
295     (:request
296     (:method ((tx amqp:tx) &key)
297     "Send the command and wait for the response."
298    
299     (amqp::send-commit tx)
300     (command-loop (tx)
301     (amqp:commit-ok (class) (return-from command-loop))))))
302    
303    
304     (def-amqp-command amqp:commit-ok (class &key)
305     (:documentation "C<--S : Confirm a transaction as a syncronous response to Commit
306     This command appears as eventual response to Commit and should be processed
307     synchronously together with that. I one appears independently, ignore it.")
308    
309     (:response
310     (:method ((tx amqp:tx) &key)
311     tx)))
312    
313    
314     (def-amqp-command amqp:consume (class &key queue consumer-tag no-local no-ack exclusive no-wait arguments)
315     (:documentation "C-->S : Create a consumer for a given queue.
316    
317     CLASS : amqp:basic : a basic class instance bound to a channel.
318    
319     The passed basic instance mediates a consume request on the channel and is
320     returned as a handle to mediate responses. In a synchronous application,
321     the channel owner can proceed directly to process deliver replies. In an
322     event-driven application, the owner can register a handler for future
323     deliver commands and process them either as polled or asynchronous events.")
324    
325     (:request
326     (:method ((basic amqp:basic) &rest args &key queue consumer-tag no-local no-ack exclusive no-wait arguments)
327     (declare (dynamic-extent args))
328    
329     (assert-argument-types amqp:consume
330     (queue (or (amqp:string 8) amqp:queue))
331     (consumer-tag (amqp:string 8) nil)
332     (no-local amqp:bit nil)
333     (no-ack amqp:bit nil)
334     (exclusive amqp:bit nil)
335     (no-wait amqp:bit nil)
336     (arguments list nil))
337    
338     (setf queue (amqp:queue-queue queue))
339     (apply #'amqp::send-consume basic :queue queue args)
340    
341     (command-loop (basic)
342     (amqp:consume-ok ((class amqp:basic) &key consumer-tag)
343     (amqp:log :debug class "consume ok: ~s" consumer-tag)
344     (return-from command-loop)))
345    
346     ;; once the request is acknowledged, return the issuing class
347     basic)))
348    
349    
350     (def-amqp-command amqp:consume-ok (class &key consumer-tag)
351     (:documentation "C<--S : Confirm a consume. Sync response to Commit
352     This command appears as eventual response to Consume and should be processed
353     synchronously together with that. If one appears independently, ignore it.")
354    
355     (:response
356     (:method ((basic amqp:basic) &key consumer-tag)
357     (declare (ignore consumer-tag))
358     basic)))
359    
360    
361     (def-amqp-command amqp:declare (class &key ticket queue exchange passive durable exclusive auto-delete no-wait arguments
362     type)
363     (:documentation "C-->S : Request the broker to declare an exchange or a queue,
364     and create it if needed.")
365    
366     (:request
367     (:method ((exchange amqp:exchange) &rest args)
368     (declare (dynamic-extent args))
369     (apply #'amqp::send-declare exchange args)
370     (command-loop (exchange)
371     (amqp:declare-ok ((class amqp:exchange) &key ) (return-from command-loop)))
372     exchange)
373    
374     (:method ((queue amqp:queue) &rest args)
375     (apply #'amqp::send-declare queue args)
376     (command-loop (queue)
377     (amqp:declare-ok ((class amqp:queue) &key queue message-count consumer-count)
378     (amqp:log :debug queue "queue declared: ~a ~a ~a" queue message-count consumer-count)
379     (return-from command-loop)))
380     queue)))
381    
382    
383     (def-amqp-command amqp:Declare-ok (class &key queue message-count consumer-count)
384     (:documentation "C<--S : Confirm a declare. Sync response to Declare.
385     This command appears as eventual response to Declare and should be processed
386     synchronously together with that. I one appears independently, ignore it.")
387    
388     (:response
389     (:method ((class amqp:object) &rest args)
390     (declare (dynamic-extent args) (ignore args))
391     class)))
392    
393    
394     (def-amqp-command amqp:Delete (class &key queue if-unused if-empty)
395     (:documentation "C-->S : ")
396    
397     (:request
398     (:method ((exchange amqp:exchange) &rest args)
399     (declare (dynamic-extent args))
400     (apply #'amqp::send-delete exchange args)
401     (command-loop (exchange)
402     (amqp:delete-ok (class) (return-from command-loop)))
403     exchange)
404    
405     (:method ((queue amqp:queue) &rest args)
406     (declare (dynamic-extent args))
407     (apply #'amqp::send-declare queue args)
408     (command-loop (queue)
409     (amqp:declare-ok (class) (return-from command-loop)))
410     queue)))
411    
412    
413     (def-amqp-command amqp:delete-ok (class &key queue message-count)
414     (:documentation "C<--S : ")
415    
416     (:response
417     (:method ((class amqp:object) &rest args)
418     (declare (dynamic-extent args) (ignore args))
419     class)))
420    
421    
422     (def-amqp-command amqp:deliver (class &key consumer-tag delivery-tag redelivered exchange routing-key)
423     (:documentation "C<--S : notify a client of an incoming consumer message.
424     CLASS : The client class to which the message is being delivered.
425     A read frame generates an immediate basic instance, which then delegates
426     further processing based on the connection's mode:
427     :queue causes the entire message to be read and enqueued as a raw sequence
428     :stream causes the connection/channel to be placed in content mode to, with
429     adjustments to stream parameters for future reading.")
430    
431     (:response
432     (:method ((basic amqp:basic) &rest args)
433     (declare (dynamic-extent args))
434     (let ((channel (object-channel basic)))
435     (prog1 (apply #'device-read-content channel args)
436     (when (channel-acknowledge-messages channel)
437     (amqp::send-ack basic)))))))
438    
439    
440     (def-amqp-command amqp:Flow (class &key active)
441     (:documentation "C<->S : enable/disable flow from peer : Sync request ")
442    
443     (:response
444     (:method ((channel amqp:channel) &key active)
445    
446     (amqp::send-flow-ok channel :active active)
447     (ecase active
448     (0 (signal (channel-condition channel 'channel-flow-stop-condition)))
449     (1 (signal (channel-condition channel 'channel-flow-start-condition))))))
450    
451     (:request
452     (:method ((channel amqp:channel) &key active)
453     (amqp::send-flow channel :active active)
454     ;; what happens now? the flow-ok appears in the content stream?
455     channel)))
456    
457    
458     (def-amqp-command amqp:Flow-Ok (class &key active)
459     (:documentation "C<->S : confirm a flow method : Async response to Flow
460     This command appears as eventual response to Flow and should be processed
461     synchronously together with that. I one appears independently, ignore it.")
462    
463     (:response
464     (:method ((class amqp:channel) &key active)
465     (declare (ignore active))
466     class))
467    
468     (:send )) ; needed for the send rsponse
469    
470    
471     (def-amqp-command amqp:get (object &key queue no-ack body)
472     (:documentation "C-->S : C:GET ( S:GET-OK content / S:GET-EMPTY )
473     Request the 'next' message for the given queue.
474     OBJECT : (or amqp:channel amqp:basic amqp:queue) : designates the queue
475    
476     Resolves the given object to the queue and encodes a Basic.Get with the appropriate arguments.
477     Processes the responses get-ok and get-empty. If the reply is -ok invoke `device-read-content`
478     and return the result. If -empty, return nil.")
479    
480     (:request
481     (:method ((channel amqp:channel) &rest args)
482     (declare (dynamic-extent args))
483     (apply #'channel-request-get channel (amqp:channel.basic channel) args))
484    
485     (:method ((channel amqp:queue) &rest args &key queue no-ack body)
486     (declare (dynamic-extent args) (ignore no-ack body))
487     (apply #'channel-request-get amqp:channel (amqp:channel.basic amqp:channel)
488     :queue queue
489     args))
490    
491     (:method ((basic amqp:basic) &rest args &key queue no-ack (body nil body-s))
492     (declare (dynamic-extent args))
493     (assert-argument-type amqp:get queue (or string amqp:queue))
494     (setf queue (amqp:queue-queue queue))
495     (setf (channel-acknowledge-messages (object-channel basic)) (not no-ack))
496     (when body-s
497     (setf args (copy-list args))
498     (remf args :body))
499     (apply #'amqp::send-get basic :queue queue args)
500    
501     (command-case (basic)
502     (amqp:get-empty ((basic amqp:basic) &rest get-empty-args)
503     (declare (dynamic-extent get-empty-args))
504     (amqp:log :debug basic "respond-to-get, get-empty: ~s" get-empty-args)
505     (return-from command-case nil))
506     (amqp:get-ok ((basic amqp:basic) &rest get-ok-args
507     &key delivery-tag redelivered exchange routing-key message-count)
508     (declare (dynamic-extent get-ok-args)
509     (ignore redelivered exchange routing-key message-count))
510     (amqp:log :debug basic "respond-to-get, get-ok: ~s" get-ok-args)
511     (let ((channel (object-channel basic)))
512     (return-from command-case
513     (prog1 (apply #'device-read-content channel :body body get-ok-args)
514     (unless (amqp:basic-no-ack basic)
515     (amqp::send-ack basic :delivery-tag delivery-tag))))))))))
516    
517    
518     (def-amqp-command amqp:get-ok (class &key delivery-tag redelivered exchange routing-key message-count)
519     (:documentation "C<--S : provide client with a message")
520    
521     (:response
522     (:method ((basic amqp:basic) &rest args)
523     (declare (dynamic-extent args))
524     (let ((channel (object-channel basic)))
525     ;;; nb. do not ack a get-ok
526     (prog1 (apply #'device-read-content channel args))))))
527    
528    
529     (def-amqp-command amqp:Get-Empty (class &key)
530     (:documentation "C<--S : indicate no message available")
531    
532     (:response
533     (:method ((basic amqp:basic) &key)
534     nil)))
535    
536    
537     (def-amqp-command amqp:open (class &key virtual-host)
538     (:documentation "C-->S : open a connection or channel for use : Sync request , carries content.
539     If on a connection, it specifies the virtual host name. On a channel, the id is in the header.")
540    
541     (:request
542     (:method ((class amqp:connection) &rest args
543     &key virtual-host &allow-other-keys)
544     "Set-Up the connection for a given virutal host"
545     (declare (dynamic-extent args))
546     (assert (stringp virtual-host) ()
547     "The required virtual-host must be a string: ~s" virtual-host)
548     (apply #'amqp::send-open class args)
549     (command-loop (class)
550     (amqp:open-ok (class &rest args)
551     (declare (dynamic-extent args))
552     (apply #'amqp::respond-to-open-ok class args)
553     (return-from command-loop)))
554     class)
555    
556     (:method ((class amqp:channel) &rest args)
557     (apply #'amqp::send-open class args)
558     (command-loop (class)
559     ; qpid answers with a channel command
560     (amqp:open-ok (class &rest args)
561     (amqp:log :debug class "Opened: ~s" args)
562     (return-from command-loop))))))
563    
564    
565     (def-amqp-command amqp:Open-Ok (class &key)
566     (:documentation "C<--S : signal that connection is ready")
567    
568     (:response
569     (:method ((class amqp::connection) &key &allow-other-keys)
570     class)
571     (:method ((class amqp::channel) &key &allow-other-keys)
572     class)))
573    
574    
575     (def-amqp-command amqp:Publish (class &key body exchange routing-key mandatory immediate)
576     (:documentation "C-->S : publish a message :
577     This method publishes a message to a specific exchange. The message will be routed to queues as
578     defined by the exchange configuration and distributed to any active consumers when the transaction, if
579     any, is committed.")
580    
581     (:request
582     (:method ((channel amqp:channel) &rest args)
583     "The class' channel is state is set to use-channel.body.output, the stream is cleared,
584     and the encoding is asserted. If a body is supplied, then, it is written. Otherwise the
585     channel is left available as a stream."
586    
587     (declare (dynamic-extent args))
588     ;; delegate to the channel's basic class
589     (apply #'amqp::request-publish (amqp:channel.basic channel)
590     args))
591    
592     (:method ((basic amqp:basic) &rest args &key body exchange routing-key mandatory immediate)
593     (declare (ignore routing-key mandatory immediate))
594     (setf exchange (amqp:exchange-exchange exchange)) ; coerce to a string
595     (setf (basic-exchange basic) exchange) ; cache for possible use in chunk headers
596     (when body
597     (setf args (copy-list args))
598     (remf args :body))
599     (apply #'amqp::send-publish basic :exchange exchange args)
600    
601     (let ((channel (object-channel basic)))
602     (apply #'device-write-content channel body :exchange exchange args)))))
603    
604    
605     (def-amqp-command amqp:purge (class &key queue no-wait)
606     (:documentation "C<->S : "))
607    
608    
609     (def-amqp-command amqp:purge-ok (class &key message-count)
610     (:documentation "C<->S : "))
611    
612    
613     (def-amqp-command amqp:qos (class &key prefetch-size prefetch-count global)
614     (:documentation "C-->S : ")
615    
616     (:request
617     (:method ((basic amqp:basic) &rest args)
618     (apply #'amqp::send-qos basic args)
619     (command-loop (basic)
620     (amqp:qos-ok (basic) (return-from command-loop)))
621     basic)))
622    
623     (def-amqp-command amqp:qos-ok (class &key)
624     (:documentation "C<-S : ")
625    
626     (:response
627     (:method ((class amqp:basic) &key)
628     class)))
629    
630     (def-amqp-command amqp:recover (class &key requeue)
631     (:documentation "C-->S : ")
632    
633     (:request
634     (:method ((basic amqp:basic) &rest args)
635     (apply #'amqp::send-recover basic args)
636     (command-loop (basic)
637     (amqp:recover-ok (basic) (return-from command-loop)))
638     basic)))
639    
640     (def-amqp-command amqp:recover-async (class &key requeue)
641     (:documentation "C-->S : ")
642    
643     (:request
644     (:method ((basic amqp:basic) &rest args)
645     (apply #'amqp::send-recover-async basic args)
646     basic)))
647    
648     (def-amqp-command amqp:recover-ok (class &key )
649     (:documentation "C<-S : ")
650    
651     (:response
652     (:method ((class amqp:basic) &key)
653     class)))
654    
655    
656     (def-amqp-command amqp:Redirect (class &key)
657     (:documentation ""))
658    
659    
660     (def-amqp-command amqp:Reject (class &key delivery-tag multiple)
661     (:documentation "C-->S : reject an incoming message"))
662    
663    
664     (def-amqp-command amqp:request (class &key realm exclusive passive active write read)
665     (:documentation "C-->S : ")
666    
667     (:request
668     (:method ((access amqp:access) &rest args)
669     (apply #'amqp::send-request access args)
670     (command-loop (access)
671     (amqp:request-ok (access) (return-from command-loop)))
672     access)))
673    
674     (def-amqp-command amqp:request-ok (class &key)
675     (:documentation "C<-S : ")
676    
677     (:response
678     (:method ((access amqp:access) &key ticket)
679     (declare (ignore ticket))
680     access)))
681    
682    
683     (def-amqp-command amqp:Return (class &key reply-code reply-text exchange routing-key)
684     (:documentation "C<--S : return a failed message"))
685    
686    
687     (def-amqp-command amqp:rollback (class &key)
688     (:documentation "C-->S : ")
689    
690     (:request
691     (:method ((tx amqp:tx) &key)
692     "Send the command and wait for the response."
693    
694     (amqp::send-rollback tx)
695     (command-loop (tx)
696     (amqp:rollback-ok ((tx amqp:tx)) (return-from command-loop)))
697     tx)))
698    
699    
700     (def-amqp-command amqp:rollback-ok (class &key queue message-count)
701     (:documentation "C<--S : ")
702     (:response
703     (:method ((class amqp::connection) &key &allow-other-keys)
704     class)
705     (:method ((class amqp::channel) &key &allow-other-keys)
706     class)))
707    
708    
709     (def-amqp-command amqp:Secure (class &key challenge)
710     (:documentation "C<--S : security mechanism challenge ")
711    
712     (:response
713     (:method ((connection amqp:connection) &key &allow-other-keys)
714     (amqp::send-secure-ok connection :response (uri-userinfo (connection-uri connection))))))
715    
716    
717     (def-amqp-command amqp:Secure-Ok (class &key response)
718     (:documentation "C->S : security mechanism response")
719    
720     (:request
721     (:method ((connection amqp:connection) &key response)
722     (declare (ignore response))
723     connection)))
724    
725    
726     (def-amqp-command amqp:select (class &key)
727     (:documentation "C-->S : Select transaction mode.")
728    
729     (:request
730     (:method ((tx amqp:tx) &key)
731     "Send the command and wait for the response."
732    
733     (amqp::send-select tx)
734     (command-loop (tx)
735     (amqp:select-ok ((tx amqp:tx)) (return-from command-loop)))
736     tx)))
737    
738    
739     (def-amqp-command amqp:select-ok (class &key)
740     (:documentation "C<--S : Confirm a transaction as a syncronous response to select
741     This command appears as eventual response to select and should be processed
742     synchronously together with that. I one appears independently, ignore it.")
743    
744     (:response
745     (:method ((tx amqp:tx) &key)
746     tx)))
747    
748    
749     ;; SASL rfc4422
750     ;; . anonymous rfc4606
751     ;; . plain rfc4616
752     ;; QPID configuration : http://qpid.apache.org/qpid-design-authentication.html
753     (def-amqp-command amqp:start (class &key version-major version-minor server-properties mechanisms locales)
754     (:documentation "C<--S : start connection negotiation")
755    
756     (:response
757     (:method ((connection amqp:connection)
758     &key version-major version-minor server-properties mechanisms locales)
759     (declare (ignore version-major version-minor))
760     (with-slots (amqp:locale amqp:mechanism) connection
761     (setf (amqp:connection-server-properties connection) server-properties)
762     (cond (amqp:locale
763     (unless (search amqp:locale locales)
764     (error "Specified locale not supported by server: ~s: ~s"
765     amqp:locale locales)))
766     ((stringp (setf amqp:locale (first (split-string " " locales)))))
767     (t
768     (error "No locale available.")))
769     (cond (amqp:mechanism
770     (unless (search amqp:mechanism mechanisms)
771     (error "Specified mechanism not supported by server: ~s: ~s"
772     amqp:mechanism mechanisms)))
773     ((stringp (setf amqp:mechanism (first (split-string " " mechanisms)))))
774     (t
775     (error "No mechanism available.")))
776    
777     (amqp::send-start-ok connection
778     :client-properties nil
779     :mechanism amqp:mechanism
780     :response (format nil "~c~a~c~a"
781     #\null (or (uri-user (connection-uri connection)) "")
782     #\null (or (uri-password (connection-uri connection)) ""))
783     :locale amqp:locale)
784     connection))))
785    
786    
787     (def-amqp-command amqp:start-ok (class &key client-properties mechanism response locale)
788     (:documentation "C->S : select security mechanism and locale")
789    
790     (:request
791     (:method ((connection amqp:connection) &rest args)
792     (declare (dynamic-extent args))
793     (apply #'amqp::send-start-ok connection args))))
794    
795    
796     (def-amqp-command amqp:tune (class &key channel-max frame-max heartbeat)
797     (:documentation "C<--S : propose connection tuning parameters")
798    
799     (:response
800     (:method ((connection amqp:connection) &key channel-max frame-max heartbeat)
801     (when (> channel-max 0)
802     (setf channel-max (min channel-max *max-channels*))
803     (unless (> channel-max (position-if #'identity (get-connection-channels connection) :from-end t))
804     (amqp:not-allowed-error :connection connection
805     "Attempt to tune an active connection: ~s." connection)
806     (setf-connection-channels (adjust-array (get-connection-channels connection)
807     (1+ channel-max) :initial-element nil)
808     connection)))
809     (when (> frame-max 0)
810     (assert (>= frame-max (connection-frame-max connection)) ()
811     "Connection frame size too small: ~s, ~s" connection frame-max))
812     (setf (connection-heartbeat connection) heartbeat)
813     (amqp::send-tune-ok connection :channel-max channel-max :frame-max frame-max :heartbeat heartbeat))))
814    
815    
816     (def-amqp-command amqp:tune-ok (class &key channel-max frame-max heartbeat)
817     (:documentation "C->S : negotiate connection tuning parameters")
818    
819     (:request
820     (:method ((connection amqp:connection) &rest args)
821     (apply 'amqp::send-tune-ok connection args))))
822    
823    
824     (def-amqp-command amqp:unbind (class &key queue exchange routing-key arguments)
825     (:documentation "C<->S : ")
826    
827     (:request
828     (:method ((class amqp:queue) &rest args)
829     (apply #'amqp::send-unbind class args)
830     (command-loop (class)
831     (amqp:unbind-ok ((class amqp:queue))
832     (return-from command-loop)))
833     class))
834    
835     (:response
836     (:method ((queue amqp::queue) &rest args)
837     (declare (ignore args))
838     queue)))
839    
840     (def-amqp-command amqp:unbind-ok (class &key)
841     (:documentation "C<->S : ")
842    
843     (:request
844     (:method ((queue amqp::queue) &rest args)
845     (apply 'amqp::send-unbind-ok queue args)))
846    
847     (:response
848     (:method ((queue amqp::queue) &rest args)
849     (declare (ignore args))
850     queue)))
851    
852    
853     ;;;
854     ;;; convenience operators
855    
856     (defgeneric call-with-consumer (operator channel &key queue consumer-tag no-local no-ack exclusive no-wait arguments)
857    
858     (:method (operator (channel amqp:channel) &rest args)
859     (declare (dynamic-extent args))
860    
861     (apply #'amqp:request-consume channel args)
862     (command-loop (channel)
863     ;; up to the caller to rtansfer out
864     (amqp:deliver ((basic amqp:basic) &rest args)
865     (apply operator basic args)))))
866    

  ViewVC Help
Powered by ViewVC 1.1.5