/[de-setf-amqp]/commands.lisp
ViewVC logotype

Contents of /commands.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 33250 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
5 (document :file
6 (description "This file defines the protocol operators for AMQP `class` and `METHOD` entities for the
7 'de.setf.amqp' library.")
8 (copyright
9 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
10 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
11 of the GNU Affero General Public License as published by the Free Software Foundation.
12
13 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
14 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 See the Affero General Public License for more details.
16
17 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
18 If not, see the GNU [site](http://www.gnu.org/licenses/).")
19
20 (long-description "Each (object . method) combination corresponds to several operators, which act in concert
21 to implement the protocol:
22
23 - `respond-to-_method_` peforms the command on a client object in response to a broker message.
24 This includes changes to instance state, open/close side effects for `connection` and `channel` methods,
25 instantiating and binding, or releasing any related, as well as any requisite broker message responses
26 as confirmation or as further processing.
27
28 - `request-_method_` issues the request to the broker, together with any client object operations
29 required by the protocol.
30
31 - `send-_method_` encodes frames and performs transport-level operations to send the command to
32 the broker. This delegates to protocol-specific methods, which encode the respective arguments, and to
33 the network device operations for the network stream functions.
34
35 - no explicit receive operators are defined, as messages are self-describing and decoded accordingly.
36 Application code is writtein in terms of `command-case` or `command-loop` statements which dispatch based
37 on received commands type.
38
39 The `def-amqp-command` forms below define the protocol class and the generic method operators.
40 The `:response` and `:request` clauses include methods as appropriate to whether both the broker and
41 the client implement the operation. A `:request` clause automatically defined a `send-` operators.
42 An additional (possibly blank) `:send` clause can be included if sending must be supported in addition to
43 a complete command request.
44
45 The respective respond-to and and request operators are implemented in two layers.
46 The interface operator, which uses the elementary name, is implemented in terms of a second
47 operator: `channel-respond-to-`, or `channel-request-`, which requires an additional initial argument,
48 the `channel`. The delegation call interposes the respective `objects-channel` value as this initial
49 argument. The specialized methods are defined with `amqp:channel` as the initial specialization.
50
51 The interface architecture makes it possible for applications to alter the api behavior by specializing
52 just the channel, just the protocol class, or both."))
53
54
55
56
57 (defmacro response-function (name)
58 "For use as the intiform for method response functions, if the target is defined, ok. Otherwise use instead
59 the decult response function, which signals an error."
60 `(if (fboundp ',name) ',name 'default-channel-respond-to))
61
62
63 (defgeneric default-channel-respond-to
64 (channel class &rest args)
65 (:documentation "the base protocol response operator for alert.")
66 (:method :before ((channel t) (class t) &rest args) "a before method logs the response-to-be and updates the class instance."
67 (declare (dynamic-extent args))
68 (amqp:log* default-channel-respond-to class args))
69 (:method ((channel amqp:channel) (class t) &rest args)
70 (amqp:not-implemented-error :message-string "Unimplemented method: ~s . ~s"
71 :message-arguments (list class args))))
72
73
74 (def-amqp-command amqp:ack (class &key delivery-tag multiple)
75 (:documentation "C-->S : acknowledge one of more messages")
76
77 (:request
78 (:method ((class amqp::basic) &rest args &key delivery-tag multiple)
79 (declare (ignore delivery-tag multiple))
80 (apply #'amqp:send-ack class args)
81 class)
82
83 (:method ((class amqp::file) &rest args &key delivery-tag multiple)
84 (declare (ignore delivery-tag multiple))
85 (apply #'amqp:send-ack class args)
86 class)))
87
88
89 (def-amqp-command amqp:alert (class &key reply-code reply-text details)
90 (:documentation "C<--S : send a non-fatal warning message : Async , carries content ")
91
92 (:response
93 (:method ((class amqp::channel) &rest args)
94 (declare (ignore args))
95 "Do nothing more than log the message."
96 class)))
97
98
99 (def-amqp-command amqp:bind (class &key ticket queue exchange routing-key no-wait arguments)
100 (:documentation "C-->S: Bind queue to an exchange")
101
102 (:request
103 (:method ((queue-class amqp::queue) &rest args &key ticket queue exchange routing-key no-wait arguments)
104 (declare (dynamic-extent args))
105
106 (assert-argument-types amqp:bind
107 (ticket integer nil)
108 (queue (or string amqp:queue))
109 (exchange (or string amqp:exchange))
110 (routing-key string nil)
111 (no-wait amqp:bit nil)
112 (arguments list nil))
113
114 (setf exchange (amqp:exchange-exchange exchange))
115 (setf queue (amqp:queue-queue queue))
116
117 (apply #'amqp::send-bind queue-class :exchange exchange :queue queue
118 args)
119 (command-loop (queue-class)
120 (amqp:bind-ok (queue)
121 (amqp:log :debug queue "bound.")
122 (return-from command-loop))
123 (t
124 (class &rest args)
125 (amqp:log :warn class "Unexpected response: ~s . ~s" class args)
126 (return-from command-loop)))
127 queue-class)))
128
129
130 (def-amqp-command amqp:bind-ok (class &key)
131 (:documentation "C<--S : Confirm bind successful.
132 This command appears as eventual response to a Bind, and should be processed
133 synchronously by a request-bind. If one appears independently, log it.
134 and continue.")
135
136 (:response
137 (:method ((queue amqp:queue) &key)
138 "Simply log and continue."
139 queue)))
140
141
142 (def-amqp-command amqp:cancel (class &rest args &key consumer-tag no-wait)
143 (:documentation "C-->S :
144 This method cancels a consumer. This does not affect already delivered messages, but it does mean the
145 server will not send any more messages for that consumer. The client may receive an arbitrary number of
146 messages in between sending the cancel method and receiving the cancel-ok reply.")
147
148 (:request
149 (:method ((basic amqp::basic) &rest args &key consumer-tag no-wait)
150 (declare (dynamic-extent args))
151 (assert-argument-types amqp:cancel
152 (consumer-tag (amqp:string 8) nil)
153 (no-wait amqp:bit nil))
154
155 (apply #'amqp:send-cancel basic args)
156
157 (command-loop (basic)
158 ;; skip everything except the -ok
159 (amqp:cancel-ok (class) (return-from command-loop))
160 (amqp:header (frame) t)
161 (amqp:body (frame) t))
162 basic)))
163
164
165 (def-amqp-command amqp:cancel-ok (class &key consumer-tag)
166 (:documentation "C<--S : confirm a canceled consumer.
167 This command appears as eventual response to Cancel and should be processed
168 synchronously by a request-cancel. If one appears independently, log it.
169 and continue.")
170
171 (:response
172 (:method ((basic amqp::basic) &key consumer-tag)
173 (declare (ignore consumer-tag))
174 "Simply log and continue."
175 basic)))
176
177
178 (def-amqp-command amqp:close (class &key reply-code reply-text class-id method-id)
179 (:documentation "C<->S : request a connection or a channel close")
180
181 (:request
182 (:method ((channel amqp:channel) &key (reply-code 0) (reply-text "") (class-id 0) (method-id 0))
183 "Perform a local channel close and forward the reauest to the broker.
184 Invoked ambivalently with close->device-close. The channel
185 state indicates the progress: if it's :close-channel, then the stream close is
186 in progress. stream close after the protocol close. The broker request
187 entails a synchronous close-ok response."
188
189 (let ((initial-state (shiftf (channel-state channel) amqp.s:close-channel)))
190 (etypecase initial-state
191 ((or amqp.s:use-channel amqp.s:close-channel)
192 (when (connected-channel-p channel)
193 (amqp::send-close channel
194 :reply-code reply-code
195 :reply-text reply-text
196 :class-id class-id
197 :method-id method-id)
198 (command-loop (channel)
199 (amqp:header (basic &rest args)
200 (declare (dynamic-extent args))
201 (amqp:log :debug basic "Draining closed channel: ~s . ~s" basic args)
202 nil)
203 (amqp:body (basic &rest args)
204 (declare (dynamic-extent args))
205 (amqp:log :debug basic "Draining closed channel: ~s . ~s" basic args)
206 nil)
207 (amqp:close-ok (channel &key &allow-other-keys) (return-from command-loop)))
208
209 ;; once the channel is flushed, close the stream if that's not already in progress
210 (unless (typep initial-state 'amqp.s:close-channel)
211 (device-close channel nil)))))
212 channel))
213
214 (:method ((connection amqp:connection) &key (reply-code 0) (reply-text "")
215 (class-id (amqp:class-id connection))
216 (method-id 0))
217 "Perform a local connection close and forward the request to the broker.
218 Then close the local stream."
219
220 (let ((initial-state (shiftf (connection-state connection) amqp.s:close-connection)))
221 (etypecase initial-state
222 ;; if in use, or closing due to stream close, then send the close, and
223 ;; check whether to close the stream.
224 ((or amqp.s:use-connection amqp.s:close-connection)
225 (amqp::send-close connection
226 :reply-code reply-code
227 :reply-text reply-text
228 :class-id class-id
229 :method-id method-id)
230
231 (command-loop ((connection.channel connection :number 0))
232 (amqp:close-ok (connection) (return-from command-loop)))
233
234 ;; once the connection is flushed, if the initial state was in use, close the stream
235 (typecase initial-state
236 (amqp.s:use-connection
237 (close connection)
238 ;; once it has been closed, reset to the initial state
239 (setf (connection-state connection) amqp.s:open-connection))))
240 ;; if, eg. already closing, do nothing
241 (amqp.s:connection-state ))
242 connection)))
243
244 (:response
245 (:method ((channel amqp:channel) &key &allow-other-keys)
246 "Perform a remotely requested on the channel by sending the ok to the server and
247 disconnecting and closing the local stream."
248
249 (when (connected-channel-p channel)
250 (amqp::send-close-ok channel)
251
252 ;; disconnect it and close the stream
253 (disconnect-channel (channel-connection channel) channel)
254 (close channel)
255 channel))
256
257 (:method ((connection amqp:connection) &key reply-code reply-text class-id method-id)
258 "Perform a remotely requested connection close by sending the ok to the server and
259 closing the local stream."
260 (declare (ignore reply-code reply-text class-id method-id))
261
262 (when (open-stream-p connection)
263 (amqp::send-close-ok connection)
264
265 ;; once the response is sent, close the stream
266 (close connection))
267 connection)))
268
269
270 (def-amqp-command amqp:close-ok (class &key)
271 (:documentation "C<->S : confirm a channel or connection close close : Sync response to Close.
272 This command appears as the eventual response to Cancel and should be processes
273 synchronously together with that. I one appears independently, ignore it.")
274
275 (:request
276 (:method ((class amqp:connection) &key)
277 (amqp::send-close-ok class)
278 class)
279
280 (:method ((class amqp:channel) &key)
281 (amqp::send-close-ok class)
282 class))
283
284 (:response
285 (:method ((class amqp:connection) &key)
286 class)
287
288 (:method ((class amqp:channel) &key)
289 class)))
290
291
292 (def-amqp-command amqp:commit (class &key)
293 (:documentation "C-->S : Commit the current transaction.")
294
295 (:request
296 (:method ((tx amqp:tx) &key)
297 "Send the command and wait for the response."
298
299 (amqp::send-commit tx)
300 (command-loop (tx)
301 (amqp:commit-ok (class) (return-from command-loop))))))
302
303
304 (def-amqp-command amqp:commit-ok (class &key)
305 (:documentation "C<--S : Confirm a transaction as a syncronous response to Commit
306 This command appears as eventual response to Commit and should be processed
307 synchronously together with that. I one appears independently, ignore it.")
308
309 (:response
310 (:method ((tx amqp:tx) &key)
311 tx)))
312
313
314 (def-amqp-command amqp:consume (class &key queue consumer-tag no-local no-ack exclusive no-wait arguments)
315 (:documentation "C-->S : Create a consumer for a given queue.
316
317 CLASS : amqp:basic : a basic class instance bound to a channel.
318
319 The passed basic instance mediates a consume request on the channel and is
320 returned as a handle to mediate responses. In a synchronous application,
321 the channel owner can proceed directly to process deliver replies. In an
322 event-driven application, the owner can register a handler for future
323 deliver commands and process them either as polled or asynchronous events.")
324
325 (:request
326 (:method ((basic amqp:basic) &rest args &key queue consumer-tag no-local no-ack exclusive no-wait arguments)
327 (declare (dynamic-extent args))
328
329 (assert-argument-types amqp:consume
330 (queue (or (amqp:string 8) amqp:queue))
331 (consumer-tag (amqp:string 8) nil)
332 (no-local amqp:bit nil)
333 (no-ack amqp:bit nil)
334 (exclusive amqp:bit nil)
335 (no-wait amqp:bit nil)
336 (arguments list nil))
337
338 (setf queue (amqp:queue-queue queue))
339 (apply #'amqp::send-consume basic :queue queue args)
340
341 (command-loop (basic)
342 (amqp:consume-ok ((class amqp:basic) &key consumer-tag)
343 (amqp:log :debug class "consume ok: ~s" consumer-tag)
344 (return-from command-loop)))
345
346 ;; once the request is acknowledged, return the issuing class
347 basic)))
348
349
350 (def-amqp-command amqp:consume-ok (class &key consumer-tag)
351 (:documentation "C<--S : Confirm a consume. Sync response to Commit
352 This command appears as eventual response to Consume and should be processed
353 synchronously together with that. If one appears independently, ignore it.")
354
355 (:response
356 (:method ((basic amqp:basic) &key consumer-tag)
357 (declare (ignore consumer-tag))
358 basic)))
359
360
361 (def-amqp-command amqp:declare (class &key ticket queue exchange passive durable exclusive auto-delete no-wait arguments
362 type)
363 (:documentation "C-->S : Request the broker to declare an exchange or a queue,
364 and create it if needed.")
365
366 (:request
367 (:method ((exchange amqp:exchange) &rest args)
368 (declare (dynamic-extent args))
369 (apply #'amqp::send-declare exchange args)
370 (command-loop (exchange)
371 (amqp:declare-ok ((class amqp:exchange) &key ) (return-from command-loop)))
372 exchange)
373
374 (:method ((queue amqp:queue) &rest args)
375 (apply #'amqp::send-declare queue args)
376 (command-loop (queue)
377 (amqp:declare-ok ((class amqp:queue) &key queue message-count consumer-count)
378 (amqp:log :debug queue "queue declared: ~a ~a ~a" queue message-count consumer-count)
379 (return-from command-loop)))
380 queue)))
381
382
383 (def-amqp-command amqp:Declare-ok (class &key queue message-count consumer-count)
384 (:documentation "C<--S : Confirm a declare. Sync response to Declare.
385 This command appears as eventual response to Declare and should be processed
386 synchronously together with that. I one appears independently, ignore it.")
387
388 (:response
389 (:method ((class amqp:object) &rest args)
390 (declare (dynamic-extent args) (ignore args))
391 class)))
392
393
394 (def-amqp-command amqp:Delete (class &key queue if-unused if-empty)
395 (:documentation "C-->S : ")
396
397 (:request
398 (:method ((exchange amqp:exchange) &rest args)
399 (declare (dynamic-extent args))
400 (apply #'amqp::send-delete exchange args)
401 (command-loop (exchange)
402 (amqp:delete-ok (class) (return-from command-loop)))
403 exchange)
404
405 (:method ((queue amqp:queue) &rest args)
406 (declare (dynamic-extent args))
407 (apply #'amqp::send-declare queue args)
408 (command-loop (queue)
409 (amqp:declare-ok (class) (return-from command-loop)))
410 queue)))
411
412
413 (def-amqp-command amqp:delete-ok (class &key queue message-count)
414 (:documentation "C<--S : ")
415
416 (:response
417 (:method ((class amqp:object) &rest args)
418 (declare (dynamic-extent args) (ignore args))
419 class)))
420
421
422 (def-amqp-command amqp:deliver (class &key consumer-tag delivery-tag redelivered exchange routing-key)
423 (:documentation "C<--S : notify a client of an incoming consumer message.
424 CLASS : The client class to which the message is being delivered.
425 A read frame generates an immediate basic instance, which then delegates
426 further processing based on the connection's mode:
427 :queue causes the entire message to be read and enqueued as a raw sequence
428 :stream causes the connection/channel to be placed in content mode to, with
429 adjustments to stream parameters for future reading.")
430
431 (:response
432 (:method ((basic amqp:basic) &rest args)
433 (declare (dynamic-extent args))
434 (let ((channel (object-channel basic)))
435 (prog1 (apply #'device-read-content channel args)
436 (when (channel-acknowledge-messages channel)
437 (amqp::send-ack basic)))))))
438
439
440 (def-amqp-command amqp:Flow (class &key active)
441 (:documentation "C<->S : enable/disable flow from peer : Sync request ")
442
443 (:response
444 (:method ((channel amqp:channel) &key active)
445
446 (amqp::send-flow-ok channel :active active)
447 (ecase active
448 (0 (signal (channel-condition channel 'channel-flow-stop-condition)))
449 (1 (signal (channel-condition channel 'channel-flow-start-condition))))))
450
451 (:request
452 (:method ((channel amqp:channel) &key active)
453 (amqp::send-flow channel :active active)
454 ;; what happens now? the flow-ok appears in the content stream?
455 channel)))
456
457
458 (def-amqp-command amqp:Flow-Ok (class &key active)
459 (:documentation "C<->S : confirm a flow method : Async response to Flow
460 This command appears as eventual response to Flow and should be processed
461 synchronously together with that. I one appears independently, ignore it.")
462
463 (:response
464 (:method ((class amqp:channel) &key active)
465 (declare (ignore active))
466 class))
467
468 (:send )) ; needed for the send rsponse
469
470
471 (def-amqp-command amqp:get (object &key queue no-ack body)
472 (:documentation "C-->S : C:GET ( S:GET-OK content / S:GET-EMPTY )
473 Request the 'next' message for the given queue.
474 OBJECT : (or amqp:channel amqp:basic amqp:queue) : designates the queue
475
476 Resolves the given object to the queue and encodes a Basic.Get with the appropriate arguments.
477 Processes the responses get-ok and get-empty. If the reply is -ok invoke `device-read-content`
478 and return the result. If -empty, return nil.")
479
480 (:request
481 (:method ((channel amqp:channel) &rest args)
482 (declare (dynamic-extent args))
483 (apply #'channel-request-get channel (amqp:channel.basic channel) args))
484
485 (:method ((channel amqp:queue) &rest args &key queue no-ack body)
486 (declare (dynamic-extent args) (ignore no-ack body))
487 (apply #'channel-request-get amqp:channel (amqp:channel.basic amqp:channel)
488 :queue queue
489 args))
490
491 (:method ((basic amqp:basic) &rest args &key queue no-ack (body nil body-s))
492 (declare (dynamic-extent args))
493 (assert-argument-type amqp:get queue (or string amqp:queue))
494 (setf queue (amqp:queue-queue queue))
495 (setf (channel-acknowledge-messages (object-channel basic)) (not no-ack))
496 (when body-s
497 (setf args (copy-list args))
498 (remf args :body))
499 (apply #'amqp::send-get basic :queue queue args)
500
501 (command-case (basic)
502 (amqp:get-empty ((basic amqp:basic) &rest get-empty-args)
503 (declare (dynamic-extent get-empty-args))
504 (amqp:log :debug basic "respond-to-get, get-empty: ~s" get-empty-args)
505 (return-from command-case nil))
506 (amqp:get-ok ((basic amqp:basic) &rest get-ok-args
507 &key delivery-tag redelivered exchange routing-key message-count)
508 (declare (dynamic-extent get-ok-args)
509 (ignore redelivered exchange routing-key message-count))
510 (amqp:log :debug basic "respond-to-get, get-ok: ~s" get-ok-args)
511 (let ((channel (object-channel basic)))
512 (return-from command-case
513 (prog1 (apply #'device-read-content channel :body body get-ok-args)
514 (unless (amqp:basic-no-ack basic)
515 (amqp::send-ack basic :delivery-tag delivery-tag))))))))))
516
517
518 (def-amqp-command amqp:get-ok (class &key delivery-tag redelivered exchange routing-key message-count)
519 (:documentation "C<--S : provide client with a message")
520
521 (:response
522 (:method ((basic amqp:basic) &rest args)
523 (declare (dynamic-extent args))
524 (let ((channel (object-channel basic)))
525 ;;; nb. do not ack a get-ok
526 (prog1 (apply #'device-read-content channel args))))))
527
528
529 (def-amqp-command amqp:Get-Empty (class &key)
530 (:documentation "C<--S : indicate no message available")
531
532 (:response
533 (:method ((basic amqp:basic) &key)
534 nil)))
535
536
537 (def-amqp-command amqp:open (class &key virtual-host)
538 (:documentation "C-->S : open a connection or channel for use : Sync request , carries content.
539 If on a connection, it specifies the virtual host name. On a channel, the id is in the header.")
540
541 (:request
542 (:method ((class amqp:connection) &rest args
543 &key virtual-host &allow-other-keys)
544 "Set-Up the connection for a given virutal host"
545 (declare (dynamic-extent args))
546 (assert (stringp virtual-host) ()
547 "The required virtual-host must be a string: ~s" virtual-host)
548 (apply #'amqp::send-open class args)
549 (command-loop (class)
550 (amqp:open-ok (class &rest args)
551 (declare (dynamic-extent args))
552 (apply #'amqp::respond-to-open-ok class args)
553 (return-from command-loop)))
554 class)
555
556 (:method ((class amqp:channel) &rest args)
557 (apply #'amqp::send-open class args)
558 (command-loop (class)
559 ; qpid answers with a channel command
560 (amqp:open-ok (class &rest args)
561 (amqp:log :debug class "Opened: ~s" args)
562 (return-from command-loop))))))
563
564
565 (def-amqp-command amqp:Open-Ok (class &key)
566 (:documentation "C<--S : signal that connection is ready")
567
568 (:response
569 (:method ((class amqp::connection) &key &allow-other-keys)
570 class)
571 (:method ((class amqp::channel) &key &allow-other-keys)
572 class)))
573
574
575 (def-amqp-command amqp:Publish (class &key body exchange routing-key mandatory immediate)
576 (:documentation "C-->S : publish a message :
577 This method publishes a message to a specific exchange. The message will be routed to queues as
578 defined by the exchange configuration and distributed to any active consumers when the transaction, if
579 any, is committed.")
580
581 (:request
582 (:method ((channel amqp:channel) &rest args)
583 "The class' channel is state is set to use-channel.body.output, the stream is cleared,
584 and the encoding is asserted. If a body is supplied, then, it is written. Otherwise the
585 channel is left available as a stream."
586
587 (declare (dynamic-extent args))
588 ;; delegate to the channel's basic class
589 (apply #'amqp::request-publish (amqp:channel.basic channel)
590 args))
591
592 (:method ((basic amqp:basic) &rest args &key body exchange routing-key mandatory immediate)
593 (declare (ignore routing-key mandatory immediate))
594 (setf exchange (amqp:exchange-exchange exchange)) ; coerce to a string
595 (setf (basic-exchange basic) exchange) ; cache for possible use in chunk headers
596 (when body
597 (setf args (copy-list args))
598 (remf args :body))
599 (apply #'amqp::send-publish basic :exchange exchange args)
600
601 (let ((channel (object-channel basic)))
602 (apply #'device-write-content channel body :exchange exchange args)))))
603
604
605 (def-amqp-command amqp:purge (class &key queue no-wait)
606 (:documentation "C<->S : "))
607
608
609 (def-amqp-command amqp:purge-ok (class &key message-count)
610 (:documentation "C<->S : "))
611
612
613 (def-amqp-command amqp:qos (class &key prefetch-size prefetch-count global)
614 (:documentation "C-->S : ")
615
616 (:request
617 (:method ((basic amqp:basic) &rest args)
618 (apply #'amqp::send-qos basic args)
619 (command-loop (basic)
620 (amqp:qos-ok (basic) (return-from command-loop)))
621 basic)))
622
623 (def-amqp-command amqp:qos-ok (class &key)
624 (:documentation "C<-S : ")
625
626 (:response
627 (:method ((class amqp:basic) &key)
628 class)))
629
630 (def-amqp-command amqp:recover (class &key requeue)
631 (:documentation "C-->S : ")
632
633 (:request
634 (:method ((basic amqp:basic) &rest args)
635 (apply #'amqp::send-recover basic args)
636 (command-loop (basic)
637 (amqp:recover-ok (basic) (return-from command-loop)))
638 basic)))
639
640 (def-amqp-command amqp:recover-async (class &key requeue)
641 (:documentation "C-->S : ")
642
643 (:request
644 (:method ((basic amqp:basic) &rest args)
645 (apply #'amqp::send-recover-async basic args)
646 basic)))
647
648 (def-amqp-command amqp:recover-ok (class &key )
649 (:documentation "C<-S : ")
650
651 (:response
652 (:method ((class amqp:basic) &key)
653 class)))
654
655
656 (def-amqp-command amqp:Redirect (class &key)
657 (:documentation ""))
658
659
660 (def-amqp-command amqp:Reject (class &key delivery-tag multiple)
661 (:documentation "C-->S : reject an incoming message"))
662
663
664 (def-amqp-command amqp:request (class &key realm exclusive passive active write read)
665 (:documentation "C-->S : ")
666
667 (:request
668 (:method ((access amqp:access) &rest args)
669 (apply #'amqp::send-request access args)
670 (command-loop (access)
671 (amqp:request-ok (access) (return-from command-loop)))
672 access)))
673
674 (def-amqp-command amqp:request-ok (class &key)
675 (:documentation "C<-S : ")
676
677 (:response
678 (:method ((access amqp:access) &key ticket)
679 (declare (ignore ticket))
680 access)))
681
682
683 (def-amqp-command amqp:Return (class &key reply-code reply-text exchange routing-key)
684 (:documentation "C<--S : return a failed message"))
685
686
687 (def-amqp-command amqp:rollback (class &key)
688 (:documentation "C-->S : ")
689
690 (:request
691 (:method ((tx amqp:tx) &key)
692 "Send the command and wait for the response."
693
694 (amqp::send-rollback tx)
695 (command-loop (tx)
696 (amqp:rollback-ok ((tx amqp:tx)) (return-from command-loop)))
697 tx)))
698
699
700 (def-amqp-command amqp:rollback-ok (class &key queue message-count)
701 (:documentation "C<--S : ")
702 (:response
703 (:method ((class amqp::connection) &key &allow-other-keys)
704 class)
705 (:method ((class amqp::channel) &key &allow-other-keys)
706 class)))
707
708
709 (def-amqp-command amqp:Secure (class &key challenge)
710 (:documentation "C<--S : security mechanism challenge ")
711
712 (:response
713 (:method ((connection amqp:connection) &key &allow-other-keys)
714 (amqp::send-secure-ok connection :response (uri-userinfo (connection-uri connection))))))
715
716
717 (def-amqp-command amqp:Secure-Ok (class &key response)
718 (:documentation "C->S : security mechanism response")
719
720 (:request
721 (:method ((connection amqp:connection) &key response)
722 (declare (ignore response))
723 connection)))
724
725
726 (def-amqp-command amqp:select (class &key)
727 (:documentation "C-->S : Select transaction mode.")
728
729 (:request
730 (:method ((tx amqp:tx) &key)
731 "Send the command and wait for the response."
732
733 (amqp::send-select tx)
734 (command-loop (tx)
735 (amqp:select-ok ((tx amqp:tx)) (return-from command-loop)))
736 tx)))
737
738
739 (def-amqp-command amqp:select-ok (class &key)
740 (:documentation "C<--S : Confirm a transaction as a syncronous response to select
741 This command appears as eventual response to select and should be processed
742 synchronously together with that. I one appears independently, ignore it.")
743
744 (:response
745 (:method ((tx amqp:tx) &key)
746 tx)))
747
748
749 ;; SASL rfc4422
750 ;; . anonymous rfc4606
751 ;; . plain rfc4616
752 ;; QPID configuration : http://qpid.apache.org/qpid-design-authentication.html
753 (def-amqp-command amqp:start (class &key version-major version-minor server-properties mechanisms locales)
754 (:documentation "C<--S : start connection negotiation")
755
756 (:response
757 (:method ((connection amqp:connection)
758 &key version-major version-minor server-properties mechanisms locales)
759 (declare (ignore version-major version-minor))
760 (with-slots (amqp:locale amqp:mechanism) connection
761 (setf (amqp:connection-server-properties connection) server-properties)
762 (cond (amqp:locale
763 (unless (search amqp:locale locales)
764 (error "Specified locale not supported by server: ~s: ~s"
765 amqp:locale locales)))
766 ((stringp (setf amqp:locale (first (split-string " " locales)))))
767 (t
768 (error "No locale available.")))
769 (cond (amqp:mechanism
770 (unless (search amqp:mechanism mechanisms)
771 (error "Specified mechanism not supported by server: ~s: ~s"
772 amqp:mechanism mechanisms)))
773 ((stringp (setf amqp:mechanism (first (split-string " " mechanisms)))))
774 (t
775 (error "No mechanism available.")))
776
777 (amqp::send-start-ok connection
778 :client-properties nil
779 :mechanism amqp:mechanism
780 :response (format nil "~c~a~c~a"
781 #\null (or (uri-user (connection-uri connection)) "")
782 #\null (or (uri-password (connection-uri connection)) ""))
783 :locale amqp:locale)
784 connection))))
785
786
787 (def-amqp-command amqp:start-ok (class &key client-properties mechanism response locale)
788 (:documentation "C->S : select security mechanism and locale")
789
790 (:request
791 (:method ((connection amqp:connection) &rest args)
792 (declare (dynamic-extent args))
793 (apply #'amqp::send-start-ok connection args))))
794
795
796 (def-amqp-command amqp:tune (class &key channel-max frame-max heartbeat)
797 (:documentation "C<--S : propose connection tuning parameters")
798
799 (:response
800 (:method ((connection amqp:connection) &key channel-max frame-max heartbeat)
801 (when (> channel-max 0)
802 (setf channel-max (min channel-max *max-channels*))
803 (unless (> channel-max (position-if #'identity (get-connection-channels connection) :from-end t))
804 (amqp:not-allowed-error :connection connection
805 "Attempt to tune an active connection: ~s." connection)
806 (setf-connection-channels (adjust-array (get-connection-channels connection)
807 (1+ channel-max) :initial-element nil)
808 connection)))
809 (when (> frame-max 0)
810 (assert (>= frame-max (connection-frame-max connection)) ()
811 "Connection frame size too small: ~s, ~s" connection frame-max))
812 (setf (connection-heartbeat connection) heartbeat)
813 (amqp::send-tune-ok connection :channel-max channel-max :frame-max frame-max :heartbeat heartbeat))))
814
815
816 (def-amqp-command amqp:tune-ok (class &key channel-max frame-max heartbeat)
817 (:documentation "C->S : negotiate connection tuning parameters")
818
819 (:request
820 (:method ((connection amqp:connection) &rest args)
821 (apply 'amqp::send-tune-ok connection args))))
822
823
824 (def-amqp-command amqp:unbind (class &key queue exchange routing-key arguments)
825 (:documentation "C<->S : ")
826
827 (:request
828 (:method ((class amqp:queue) &rest args)
829 (apply #'amqp::send-unbind class args)
830 (command-loop (class)
831 (amqp:unbind-ok ((class amqp:queue))
832 (return-from command-loop)))
833 class))
834
835 (:response
836 (:method ((queue amqp::queue) &rest args)
837 (declare (ignore args))
838 queue)))
839
840 (def-amqp-command amqp:unbind-ok (class &key)
841 (:documentation "C<->S : ")
842
843 (:request
844 (:method ((queue amqp::queue) &rest args)
845 (apply 'amqp::send-unbind-ok queue args)))
846
847 (:response
848 (:method ((queue amqp::queue) &rest args)
849 (declare (ignore args))
850 queue)))
851
852
853 ;;;
854 ;;; convenience operators
855
856 (defgeneric call-with-consumer (operator channel &key queue consumer-tag no-local no-ack exclusive no-wait arguments)
857
858 (:method (operator (channel amqp:channel) &rest args)
859 (declare (dynamic-extent args))
860
861 (apply #'amqp:request-consume channel args)
862 (command-loop (channel)
863 ;; up to the caller to rtansfer out
864 (amqp:deliver ((basic amqp:basic) &rest args)
865 (apply operator basic args)))))
866

  ViewVC Help
Powered by ViewVC 1.1.5