ViewVC logotype

Contents of /processing.lisp

Parent Directory Parent Directory | Revision Log Revision Log

Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 39083 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
3 (in-package :de.setf.amqp.implementation)
5 (document :file
6 (description "This file defines the AMQP input processing pipeline for the 'de.setf.amqp' library.")
7 (copyright
8 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
9 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
10 of the GNU Affero General Public License as published by the Free Software Foundation.
12 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
14 See the Affero General Public License for more details.
16 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
17 If not, see the GNU [site](http://www.gnu.org/licenses/).")
19 (long-description "The reponse to connected input is computed through a sequence of function calls
20 to decode, dispatch, compute, encode and finally send the response. The default path devolves to the static
21 operators defined in `def-amqp-command` forms. (see `commands.lisp`.) The combination of stages supports
22 sufficient specialization, to permit an application to bind additional mechanisms into the process with both
23 static and dynamic extent, based on both lexical and dynamic definitions, and to achieve both synchronous and
24 asynchronous processing.
26 The principal distinction is between read- and event-mode processing. The
27 operators
29 - `amqp:command-case`
30 - `amqp:command-loop`
31 - `amqp:with-commands`
33 initiate explicit processing of read frames for a given channel. Each frame
34 is tested against clauses and processed by the first which matches. If none
35 matches, the first form, by default, initiates standard processing for the
36 connection - which has the effect of an event dispatch. Optionally, the
37 form can suppress processing, delegate to some other operator, or signal an
38 error. The -ecase version signals an error.
40 The alternative path initiates the standard processing for a connection
41 based on read frames either as part of a thread's explicit input processing
42 loop, or when interrupted by a connection's event process in a
43 multi-threaded configuration.
45 -`process-frame (connection frame)`
46 decodes the frame into frame class. delegates to process-typed-frame
47 with process-decoded-frame as the continuation.
48 - `process-typed-frame (connection channel type frame)`
49 if the frame is a method, applies process-command via call-with-decoded-arguments.
50 otherwise calls it directly with the class respective the frame type and
51 the frame itself
52 -`respond-to-command (connection channel class method-or-frame &rest args)`
53 computes the class.method function, and applies it in a dynamic context
54 augmented by the channel's response functions. If the command target is
55 a class, that is passed, otherwise - for headers and body, the frame
56 itself is passed.
58 For an other takes on processing patterns see alternative implementations:
59 RabbitMQ's java library interposes the AMPCommand [1] class on the channel, which
60 acts as a state machine to filter the incoming frames. It releases composed
61 commands which combine the operator/arguments, an envelope with content header
62 properties, and any content body in on entity or passes it through an event-invocation
63 interface. The filtreing means that the library can impose state constraints - eg,
64 that all message content is the correct length, w/o interleaving that with application
65 processing. On the other hand, it impedes streaming.
68 [1]: http://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.0/rabbitmq-java-client-javadoc-1.7.0/com/rabbitmq/client/impl/AMQCommand.html"))
71 (defgeneric process-connection-loop (connection)
72 (:documentation "As run in the connection thread loop:
73 - write any pending output frames.
74 - read any pending input frames and dispatch then through process frame
75 - at the outset and before i/o, check that the connection is still open.
76 if not return nil. if io fails - an interrupt or network failure closed
77 the connection, also return nil.")
79 (:method ((connection amqp:connection))
80 (let ((in 0) (out 0) (deadline nil))
81 (loop
82 (flet ((ensure-open (&optional (open-p (open-stream-p connection)))
83 (unless open-p
84 (return-from process-connection-loop (values in out))))
85 (heartbeat-needed ()
86 (and deadline
87 (>= (get-universal-time) deadline)))
88 (update-heartbeat ()
89 (let ((heartbeat (connection-heartbeat connection)))
90 (setf deadline
91 (when (plusp heartbeat)
92 (+ (get-universal-time) heartbeat))))))
93 (loop (ensure-open)
94 (let ((frame (get-encoded-frame connection)))
95 (unless frame (return))
96 (unless (write-frame connection frame)
97 (ensure-open nil))
98 (incf out)
99 (release-frame frame)
100 (update-heartbeat)))
101 (when (heartbeat-needed)
102 (send-heartbeat connection))
103 (loop (ensure-open)
104 (unless (stream-listen connection) (return))
105 (let ((frame (claim-input-frame connection)))
106 (unless (read-frame connection frame)
107 (ensure-open nil))
108 (process-frame connection frame)
109 (incf in)))
110 ;; once all io is finished, step back...
111 (bt:thread-yield))))))
114 (defun connection-toplevel-loop (&optional (*connection* *connection*))
115 (loop
116 (handling-connection-errors
117 (handling-channel-errors
118 (process-connection-loop *connection*)))))
121 (defgeneric make-connection-thread (connection)
122 (:method ((connection amqp:connection))
123 (bt:make-thread 'connection-toplevel-loop
124 :name "Socket i/o"
125 :initial-bindings `((*connection . ,connection)))))
128 (defgeneric process-channel-command (channel &key wait)
129 (:method ((channel amqp:channel) &key (wait t))
130 "Read successive frames from the channel, interpret and respond to them."
131 (unless (open-stream-p channel)
132 (amqp:channel-error :channel channel :message-string "process-channel-command on a closed stream."))
133 (let ((frame (get-read-frame channel :wait wait)))
134 (when frame
135 (process-frame channel frame))))
136 (:method ((connection amqp:connection) &key (wait t))
137 (process-channel-command (connection.channel connection :number 0) :wait wait))
138 (:method ((class amqp-connected-object) &key (wait t))
139 (process-channel-command (object-channel class) :wait wait)))
141 (defgeneric process-channel-loop (channel &key wait)
142 (:method ((class amqp:object) &key (wait t))
143 "Read successive frames from the context, interpret and respond to them."
144 (loop (unless (process-channel-command class :wait wait)
145 (return)))))
148 (defun channel-toplevel-loop (&optional (*channel* *channel*))
149 (loop
150 (handling-channel-errors
151 (process-channel-loop *channel* :wait t))))
154 (defgeneric process-frame (context frame)
155 (:documentation "Given a frame as read from the connection socket, extract the channel and type
156 information an process accordingly. This step happens either in the socket i/o process for asynchronous
157 configurations, or in synchronous configurations, in a thread which reads a frame for its own channel.
158 In the asynchronous case, the aim is to classify the frame, do any immediate and connection/channel-zero
159 processing, and queue the remainder for the respective channel's process. In the synchronous case, where
160 a process has read-through a get-read-frame call, immediate/c-0 processing is also unconditional, and
161 frames for the respective process' channel are returned to the caller. In addition, any frames for channels
162 which the process owns, are processed in the standard pipeline, while those for for other processes, are
163 queued for later processing, for the respective channel's thread to process on its own.")
165 (:method ((connection amqp:connection) (frame input-frame))
166 "Determine the channel - which must exist for read framed, and
167 the frame class, the dispatch for those as well as the connection."
169 ;; number and channel are extracted from the frame header.
170 ;; iff the frame is a method, the type will be an instance of the specialized method class,
171 ;; as decoded from the frame data. otherwise - for content header and body and for heartbeat,
172 ;; the type determined by the frame itself.
173 (let* ((number (frame-channel-number frame))
174 (channel (connection.channel connection :number number))
175 (decoder (frame-decoder frame)))
176 (process-typed-frame channel decoder frame)))
178 (:method ((channel amqp:channel) (frame input-frame))
179 ;; use an autonomous decoder to specialize initial processing
180 (let* ((decoder (frame-decoder frame)))
181 (process-typed-frame channel decoder frame))))
184 (defgeneric process-typed-frame (channel decoder frame)
185 (:documentation "Process frames specific to their type,
186 CHANNEL : amqp:channel
187 TYPE : amqp:frame-type
188 FRAME : amqp:frame
190 The supported frame types (as of 0.9r1) are
191 METHOD : is decoded and processed as a command
192 HEADER : is expected only when a channel is open and receiving a body;
193 pass it through unparsed to the current response function
194 BODY : is expected when a channel is open and receiving;
195 pass it through unparsed to the current response function
196 HEARTBEAT : generate a response hearbeat in non-threaded connections
197 Others are logged and ignored. In general, the frames' basic attributes are
198 decoded for a type specific action. Where the current process is the channel
199 owner, this happens directly. Otherwise the frame is queued. Whereby, if there
200 is no input processing at the moment, the owner process is interrupted
201 asynchronously to handle the frame.
202 Iff the frame is processed, release it.")
204 (:method :around ((*channel* amqp:channel) (type deferrable-frame-decoder) (frame amqp:frame))
205 "The :around method compares the channel's thread with the active one, to decide
206 whether to process immediately, or to delegate the procesing to the other thread."
207 (let ((channel-thread (channel-thread *channel*)))
208 (cond ((eq (bt:current-thread) channel-thread)
209 (call-next-method))
210 ((channel-thread *channel*)
211 ;; interlock with the respective thread process and interrupt it if necessary
212 (enqueue frame (device-read-frames *channel*)
213 :if-empty #'(lambda () (bt:interrupt-thread channel-thread #'process-typed-frame
214 *channel* type frame))))
215 (t
216 (amqp:log :warn *channel* "process-typed-frame: disowned channel.")
217 (release-frame frame)))))
219 (:method ((channel amqp:channel) (method amqp:method) (frame amqp:frame))
220 "Determine the particular (class x method) combination - first the class, then the
221 respective method. This combination will be without any keys to designate a specific cache entry,
222 so no history will be apperent in them. They control the method argument decoding, and determine the
223 response function and/or intermediate filtering. Processing code must refer to its own instances in order
224 to access past properties."
225 (let* ((class (amqp:ensure-object channel frame))
226 (method (amqp:ensure-method class frame)))
227 (setf (channel-content-object channel) nil) ; not in content
228 (flet ((call-process-command (class method &rest args)
229 (declare (dynamic-extent args))
230 (when args
231 (apply #'reinitialize-instance method args))
232 (apply #'process-command channel class method args)))
233 (declare (dynamic-extent #'call-process-command))
234 (unwind-protect (call-with-decoded-arguments #'call-process-command class method frame)
235 (release-frame frame)))))
237 (:method ((channel amqp:channel) (header amqp:header) (frame amqp:frame))
238 "An header frame is received to initiate a message. The channel state should be open in input mode.
239 Interpose the implicit basic target class and treat the header type as the method."
240 (typecase (channel-state channel)
241 (amqp.s:use-channel)
242 (t
243 (amqp:log :warn channel "~a frame, for inconsistent channel state: ~a, ~a"
244 header (channel-state channel) frame)))
245 ;; the content components are singletons, so this will be the last one used
246 (let ((object (amqp:ensure-object channel (content-header-class-name frame))))
247 (setf (channel-content-object channel) object)
248 (flet ((call-process-command (class &rest args)
249 (declare (dynamic-extent args))
250 (when args
251 (apply #'reinitialize-instance class args))
252 (apply #'process-command channel class header :frame frame args)))
253 (declare (dynamic-extent #'call-process-command))
254 (unwind-protect (call-with-decoded-properties #'call-process-command object frame)
255 (release-frame frame)))))
257 (:method ((channel amqp:channel) (body amqp:body) (frame amqp:frame))
258 "A body frame is received as part of a message. The channel state should be open in body input mode.
259 Interpose the implicit basic target object and treat the body type as the method.
260 In this case, there are no properties to decode, just manage the frame cache in the object and take
261 the next process step."
262 (typecase (channel-state channel)
263 (amqp.s:use-channel.body.input)
264 (t
265 (amqp:log :warn channel "~a frame, for inconsistent channel state: ~a, ~a"
266 body (channel-state channel) frame)))
267 (let ((object (channel-content-object channel)))
268 (unless object
269 (amqp:unexpected-frame-error :channel channel
270 :frame frame
271 :message-string "Body frame w/o header: ~s."
272 :message-arguments (list frame)))
273 (setf (object-frame object) frame)
274 (unwind-protect (process-command channel object body :frame frame)
275 (setf (object-frame object) nil)
276 (release-frame frame))))
278 (:method ((channel amqp:channel) (type amqp:heartbeat) (frame amqp:frame))
279 "Received heartbeats cause read-frame updates the last frame read
280 timestamp. If there is a connection thread, nothing more needs
281 to be done. If there is none, then just echo the heartbeat.
282 (See [http://qpid.apache.org/configure-broker-and-client-heartbeating.html].)"
283 (let ((connection (channel-connection channel)))
284 (unless (connection-thread connection)
285 (send-heartbeat connection))
286 (amqp:log :debug channel "~a frame: ~a" type frame)))
288 (:method ((channel amqp:channel) (type unsupported-frame-decoder) (frame amqp:frame))
289 "Unsupported frames are logged and ignored."
290 (amqp:log :warn channel "Unsupported ~a frame: ~a" type frame))
292 (:method ((channel amqp:channel) (type t) (frame amqp:frame))
293 "Unknown frames are logged and ignored."
294 (amqp:log :error channel "Unknown ~a frame: ~a" type frame)))
299 ;;;
300 ;;; decoding utilities
302 (defgeneric call-with-decoded-arguments (operator class method buffer &rest args)
303 (:method ((operator function) class method (frame amqp:frame) &rest args)
304 "Given a frame, extract the drame's data buffer and continue."
305 (declare (dynamic-extent args))
306 (setf (object-frame class) frame)
307 (unwind-protect
308 (apply #'call-with-decoded-arguments operator class method (frame-data frame)
309 args)
310 (setf (object-frame class) nil)))
311 (:method ((operator function) class (method symbol) (frame t) &rest args)
312 (declare (dynamic-extent args))
313 (apply #'call-with-decoded-arguments operator class (amqp:ensure-method class method) frame
314 args)))
316 (defgeneric call-with-encoded-arguments (operator class method &rest args)
317 (declare (dynamic-extent args))
318 (:method (op (class amqp:object) (method symbol) &rest args)
319 (declare (dynamic-extent args))
320 (apply #'call-with-encoded-arguments op class (amqp:ensure-method class method)
321 args))
322 (:method (op (class amqp:object) (method fixnum) &rest args)
323 (declare (dynamic-extent args))
324 (apply #'call-with-encoded-arguments op class (amqp:ensure-method class method)
325 args)))
328 (defgeneric call-with-decoded-properties (operator class frame &rest args)
329 (declare (dynamic-extent args))
330 (:method ((operator function) class (frame amqp:frame) &rest args)
331 "Given a frame, decode the properties from the data buffer and continue."
332 (declare (dynamic-extent args))
333 (setf (object-frame class) frame)
334 (unwind-protect
335 (apply #'call-with-decoded-properties operator class (frame-data frame)
336 args)
337 (setf (object-frame class) nil))))
340 (defgeneric call-with-encoded-properties (operator class &rest args)
341 (declare (dynamic-extent args)))
344 #+(or ) ;; this is ambiguous: is it the response or request side?
345 (defgeneric apply-method (method class &rest argument-list)
346 (:documentation "Apply the function specific to the combination of
347 (class x method) to the class and the given argument list.
349 CONNECTION : amqp:connection : the distinction client v/s server connection
350 determines the concrete behaviour
351 METHOD : amqp:object : the version-specific method instance
352 CLASS : amqp:object : the version-specific class instance
353 ARGUMENT-LIST : list : the argument list.")
354 (declare (dynamic-extent argument-list))
356 (:method ((method t) (class t) &rest args)
357 (declare (dynamic-extent args))
358 (apply (compute-class-method class method) class args))
360 (:method ((method amqp:method) (class amqp:object) &rest args)
361 (declare (dynamic-extent args))
362 (let ((handler (channel-command-handler class)))
363 (or (and handler (apply handler class method args))
364 (apply (method-request-function method) (object-channel class) class
365 (apply #'list* args))))))
369 (defmethod send-method ((method t) (class amqp:object) &rest args)
370 (declare (dynamic-extent args))
371 (flet ((write-encoded-method (frame class method)
372 (amqp:log :debug class "send-method: ~a ~a" method frame)
373 ;; nb. this places the constraint on connections, that they have their channel-0 bound
374 ;; before they send any command to the broker.
375 (put-encoded-frame (object-channel class) frame)))
376 (declare (dynamic-extent #'write-encoded-method))
377 (amqp:log :debug class "send-method: ~a . ~s" method args)
378 (apply #'call-with-encoded-arguments
379 #'write-encoded-method class method
380 args)))
382 (defmethod encode-method ((method t) (class amqp:object) &rest args)
383 (declare (dynamic-extent args))
384 (flet ((return-encoded-method (frame class method)
385 (declare (ignore class method))
386 (return-from encode-method frame)))
387 (declare (dynamic-extent #'return-encoded-method))
388 (apply #'call-with-encoded-arguments
389 #'return-encoded-method class method
390 args)))
392 (defmethod decode-class-properties ((class amqp:object))
393 (reduce #'nconc
394 (mapcar #'(lambda (name)
395 (list (cons-symbol :keyword name)
396 (slot-value class name)))
397 (class-property-slot-names class))))
399 (defmethod decode-method-arguments ((method amqp:method))
400 (reduce #'nconc
401 (mapcar #'(lambda (name)
402 (list (cons-symbol :keyword name)
403 (slot-value method name)))
404 (method-argument-slot-names method))))
406 (defun amqp.u:class-properties (class)
407 (decode-class-properties class))
409 (defun amqp.u:method-arguments (class)
410 (decode-method-arguments class))
413 (defun send-method* (method class &rest args)
414 (declare (dynamic-extent args))
415 (apply #'send-method method class (apply #'list* args)))
417 (defgeneric send-heartbeat (connection)
418 (:method ((connection amqp:connection))
419 "Encode and enqueue an heartbeat frame. Enqueue it just in case, something
420 else happened between the deadline and now. Most likely passes through and is written directly.
421 managing deadlines is left to the caller(s)."
422 (let ((frame (claim-output-frame connection)))
423 (setf-frame-type-class-name 'amqp:heartbeat frame)
424 (setf-frame-channel-number 0 frame)
425 (setf-frame-track-number 0 frame)
426 (setf-frame-size 0 frame)
427 (write-frame connection frame)
428 (put-encoded-frame connection frame))))
430 (defmethod send-header ((class amqp:object) &rest args)
431 (declare (dynamic-extent args))
432 (flet ((write-encoded-properties (frame class)
433 (setf-frame-type-class-name 'amqp:header frame)
434 (put-encoded-frame (object-channel class) frame)))
435 (declare (dynamic-extent #'write-encoded-properties))
436 (apply #'call-with-encoded-properties
437 #'write-encoded-properties class
438 args)))
442 (document (process-command dynamic-process-command)
443 "Interface Operators :
446 static / dynamic / lexical / instantial command handler binding
448 Each read frame, once typed and decoded, must be "handled". This
449 can occur in a dynamic context where a frame appears immediately
450 subsequent, and in response to a command on the same channel, or it can be
451 a frame which has appeared asynchronously on a channel for which
452 the most recent request was to subscribe to a queue, or it could be
453 autononomously sent from the broker - eg, to close a queue or
454 return a message.
456 The processing mechanism must permit both forms of processing. In
457 particular, because a given channel may recieve a frame in either
458 category at a given time. Rather than configure a channel for this,
459 the mechanism should interpret respective context and operate accordingly.
460 To achieve this, it integrates the following definition forms:
462 with-commands (clauses . body)
463 defines a command processor in the current lexical context and
464 binds it dynamically, such that it takes first precedence.
466 (setf channel-command) (channel method &optional class) (function)
467 binds a handler function for that (method x class) for frames to be processed
468 in that channel. any extant binding is returned.
470 dynamic-process-command (handler channel class type . args)
471 is implemented to apply the hander iff it is a function, otherwise to
472 apply a channel handler iff it is a function, and otherwise to
473 apply the method's respective response function.
475 The standard definitions arrange that dynamic-process-command traces
476 back through dynamically bound handlers until either one handles the frame -
477 in that it returns true, or the binding is null - which is the global default.
478 At that point it tries the channel's handle. If there is none, or it declines,
479 then the method's own response function is used.")
481 (defparameter *channel-command-handler* nil
482 "When this is bound, it interposes operators before the channel's bound
483 command handlers.")
485 (defgeneric process-command (channel class operation &rest args)
486 (:documentation "Process a decoded, read frame as a 'command'. This applies to
487 both method and content frames. A method is dispatched to the respective
488 respond-to operator.")
489 (declare (dynamic-extent args))
491 (:method ((channel amqp:channel) (class amqp:object) (operation frame-decoder) &rest args)
492 ;; at this point, the frame object has been reified, and any operation arguments or properties
493 ;; have been decoded. the frame-decoder either indicated the method, the content element.
494 ;; try the available response functions and apply the first found to the
495 ;; connection, target class, and the argument list
496 (declare (dynamic-extent args))
497 (amqp:log :debug channel "process-command: ~a ~a . ~s" class operation args)
498 (apply #'dynamic-process-command *channel-command-handler*
499 channel class operation args)))
501 (defgeneric dynamic-process-command (dynamic-handler channel class type &rest args)
502 (declare (dynamic-extent args))
504 (:method ((operator t) (channel t) (class t) (method amqp:method) &rest args)
505 "Given an unhandled method, apply its own operator, for which the initial definition
506 is the static response operator."
507 (declare (dynamic-extent args))
508 (apply (method-response-function method) channel class args))
510 (:method ((operator null) (channel amqp:channel) (class t) (type t) &rest args)
511 "Given a channel, if it has a handler try it. If none is present, or it declines
512 continue with the next method, to apply the method's own operator."
513 (declare (dynamic-extent args))
514 (let ((channel-operator (channel-command-handler channel)))
515 (etypecase channel-operator
516 (null (call-next-method))
517 (function (or (apply channel-operator channel class type args)
518 (call-next-method))))))
520 (:method ((operator function) (channel t) (class t) (type t) &rest args)
521 (declare (dynamic-extent args))
522 (apply operator channel class type args))
524 (:method ((operator t) (channel t) (class t) (frame frame-decoder) &rest args)
525 ;; this shouldn't happen - it means that a content frame arrived outside
526 ;; of a dynamic context which should be integrating it in an earlier operation
527 (declare (dynamic-extent args) (ignore args))
528 (amqp:unexpected-frame-error :connection (when (typep channel 'amqp:channel) (channel-connection channel))
529 :channel channel
530 :frame frame
531 :message-string "[process-command] frame not processed: ~s, ~s."
532 :message-arguments (list class frame))))
537 ;;; It would be possible to implement these as methods, install and remove
538 ;;; for the dynamic extent, but that would mean that the effective
539 ;;; function is continuously recomputed.
540 ;;; The implemented mechanism binds a current handler to a dynamic variable
541 ;;; and retains a previous version in a lexical binding, which it calls
542 ;;; should the new handler not 'accept' the combined (class x method). Should
543 ;;; there be no previous version, it calls the respective channel's command
544 ;;; handler. from which, by default, the static function is invoked as the
545 ;;; default method.
547 (defun compute-command-clauses (class-var0 method-var0 args-var clauses)
548 "Transform the command clauses, each of the form
549 (method-name ((class type) . args) . body)
550 into cond clauses for inclusion in a command processing operator.
551 This does _not_ add any defaults, it just sorts the clauses by subtype.
552 The with-commands operator intends to pass anything which falls through to the
553 next handler on the dynamic stack."
554 (flet ((rewrite-clause (clause)
555 (destructuring-bind (method-type (class &rest arglist) &rest body)
556 clause
557 (let ((class-var (if (consp class) (first class) class))
558 (class-type (if (consp class) (second class) t))
559 (method-var (gensym (string :method-))))
561 `((and (typep ,class-var0 ',class-type) (typep ,method-var0 ',method-type))
562 (let ((,class-var ,class-var0) (,method-var ,method-var0))
563 (declare (ignorable ,class-var ,method-var))
564 (destructuring-bind ,arglist ,args-var
565 ,@body)))))))
567 ;; sort by method, then class, then channel
568 (setf clauses (sort (copy-list clauses)
569 #'(lambda (c1 c2)
570 (let* ((m1 (first c1))
571 (m2 (first c2))
572 (c1 (first (second c1)))
573 (c2 (first (second c2))))
574 (setf c1 (if (consp c1) (second c1) t))
575 (setf c2 (if (consp c2) (second c2) t))
576 (if (eql c1 c2)
577 (subtypep m1 m2)
578 (subtypep c1 c2))))))
579 (mapcar #'rewrite-clause clauses)))
582 (defmacro amqp:with-commands (command-clauses &rest body)
583 "Defines a lexically scoped command handler, which is to take precedence over any extand
584 dynamic handlers. Each clause has the form
585 (method-type (class . arguments) . body)
586 these are collected into an operator which is shadows the existing global
587 definition during the dynamic extend of the forms body. If no form matches a to-be-processed
588 frame, the next handler in the dynamic chain is invoked. (See dynamic-process-command)"
590 (let ((handlers-op (gensym (string :command-handler-)))
591 (body-op (gensym (string :command-body-)))
592 (channel-var (gensym (string :channel-)))
593 (class-var (gensym (string :class-)))
594 (type-var 'amqp:method)
595 (args-var (gensym (string :arglist-))))
596 `(flet ((,handlers-op (,channel-var ,class-var ,type-var &rest ,args-var)
597 (declare (dynamic-extent ,args-var)
598 ;; this is present to keep the interface uniform. in the
599 ;; dynamic context, the channel instance is known, so there's
600 ;; no reason for it to appear in the clauses
601 (ignore ,channel-var))
602 (cond ,@(compute-command-clauses class-var type-var args-var command-clauses)))
603 (,body-op () ,@body))
604 (declare (dynamic-extent #',handlers-op #',body-op))
605 (call-with-command-handlers #',body-op #',handlers-op))))
607 (defun call-with-command-handlers (op commands-op)
608 (declare (dynamic-extent op commands-op))
609 (let ((previous-handler *channel-command-handler*))
610 (flet ((apply-command-handler (channel class type &rest args)
611 (declare (dynamic-extent args))
612 (or (apply commands-op channel class type args)
613 (apply #'dynamic-process-command previous-handler
614 channel class type args))))
615 (declare (dynamic-extent #'apply-command-handler))
616 (let ((*channel-command-handler* #'apply-command-handler))
617 (funcall op)))))
621 (defmacro command-loop ((channel &key (wait nil wait-s)) &rest command-clauses)
622 `(block command-loop
623 (amqp:with-commands ,command-clauses (process-channel-loop ,channel ,@(when wait-s `(:wait ,wait))))))
625 (defmacro command-case ((channel &key (wait nil wait-s)) &rest command-clauses)
626 `(block command-case
627 (amqp:with-commands ,command-clauses (process-channel-command ,channel ,@(when wait-s `(:wait ,wait))))))
629 #+mcl
630 (setf (ccl:assq 'command-loop ccl::*fred-special-indent-alist*) 1)
631 #+mcl
632 (setf (ccl:assq 'command-case ccl::*fred-special-indent-alist*) 1)
635 (document (compute-channel-command-handler channel-command-handler channel-command)
636 "Instance-scoped Commands :
638 Instance-scoped commands are integrated into a channel's command-handler function.
639 Each is a implemented in a method, specialized on the method's class
640 the operator is defined on demand and the handlers are added/removed
641 with the (setf channel-command-handler) operator.")
643 (defun compute-channel-command-handler (channel)
644 (let* ((name (channel-name channel))
645 (function (ensure-generic-function name
646 :lambda-list '(channel class method &rest args)
647 :declare '((dynamic-extent args)))))
648 (c2mop:ensure-method function
649 '(lambda (channel class method &rest args)
650 (declare (ignore channel class method args)
651 (dynamic-extent args))
652 ;; the default method returns nil
653 nil)
654 :specializers (mapcar #'find-class '(amqp:channel t t)))
655 function))
657 (defmethod channel-command-handler ((channel amqp:channel))
658 (with-slots (command-handler) channel
659 (or command-handler
660 (setf command-handler (compute-channel-command-handler channel)))))
662 (defmethod (setf channel-command-handler) (function (channel amqp:channel))
663 (with-slots (command-handler) channel
664 (when (typep command-handler 'generic-function)
665 (dolist (method (c2mop:generic-function-methods command-handler))
666 (remove-method method command-handler)))
667 (setf command-handler function)))
669 (defgeneric (setf channel-command) (method-function channel method-name)
670 (:documentation "Binds the respective command in the channel's instance
671 context to the given function. If the function is null, then removes the
672 command handler. Returns the existing handler.
673 nb. uses ensure-method, which requires a lambda expression in order to
674 wrap it is a method function. which means function literals are wrapped.")
676 (:method :before ((function t) (channel t) (method-name symbol))
677 (assert (subtypep method-name 'amqp:method) ()
678 "Invalid method name: ~s." method-name))
680 (:method ((function null) (channel amqp:channel) (method-name symbol))
681 (let* ((operator (channel-command-handler channel))
682 (method (when operator (find-method operator nil (list 'amqp:channel 'amqp:object method-name)
683 nil))))
684 (when method
685 (remove-method operator method)
686 method)))
688 (:method ((lambda cons) (channel amqp:channel) (method-name symbol))
689 (let* ((operator (channel-command-handler channel))
690 (method (when operator (find-method operator nil (list 'amqp:channel 'amqp:object method-name)
691 nil))))
692 (when method (remove-method operator method))
693 (c2mop:ensure-method operator lambda
694 :qualifiers nil
695 :specializers (mapcar #'find-class `(amqp:channel amqp:object ,method-name))
696 :lambda-list '(channel class method &rest args))
697 method))
699 (:method ((function function) (channel amqp:channel) (method-name symbol))
700 (setf (channel-command channel method-name)
701 `(lambda (channel class method &rest args)
702 (declare (dynamic-extent args))
703 (apply ,function channel class method args)))))
706 #|
708 ;;; initial thoughts on how to process. for the most part adopted by the implementation...
709 ;;;
710 ;;; various methods can intervene in the request/response process. they differ
711 ;;; according to the application architecture
712 ;;;
713 ;;; - a specialized protocol class can be used to extend the request-/respond-to
714 ;;; operators to implement synchronous message processing.
715 ;;; - it can be introduced by specializing the connection or the channel
716 ;;; classes and the ensure-object constructor
717 ;;; - it can be introduced by modifying the connection's type->class map
718 ;;; either statically or dynamically
719 ;;; - custom behavior can be implemented strictly in command-case forms as a
720 ;;; static state machine.
721 ;;; - custom behavior can be implemented by adding a handler to the channel
722 ;;; to apply arbitrary functions to frames universally or selectively.
725 handle-frame (frame) ->
726 handle-class-and-method (class . command) ->
727 loop handlers (funcall handler class command)
728 ;; if none handles
729 (apply-method method class)
731 ? using the (with- ) pattern: (consuming , or (with-comsumer ?
732 analogous to clim accept contexts, it affects the (connection? channel? )input within its dynamic context
734 define message processing protocol by specializing individual functions and providing a specialized protocol class instance,
735 that is, by implementing a protocol interface, or by providing a processing object which defines all the requisite methods
736 in terms of the data.
738 as clos is not a containment model for specialization, the operators would need an additional initial arguement, event
739 though it is mostly redundant, since most commands apply to just one class.
740 it is more succinct to juet spread the method instance's arguments in the function call and apply it, by name to the class.
742 this leaves class specialization as the customization method.
743 if the method operators are statially defined, this s easier to comprehend and maintain, but harder to change the behaviour dynamically.
744 one must substitute a different "factory" class - connection, channel, etc in order to get specialized command method discriminators.
746 which makes it difficult to achieve (with semantics
749 three separate issues
750 - protocol: which operations and which logic apply to which read/to-be-written commands.
751 - control : asynchronous/synchronous binding of control flow to the command.
752 - visibility : the scope and extent of the definition.
754 the protocol is specified by combining send-*/receive, command-case with
755 implicit encoding and decoding operations and logic which interprets/specifies
756 class and command fields and properties.
758 the control structure concerns how the stream of input frames is coordinated with
759 the stream of output frames and with other application processing. the input and
760 output streams are both queue mediated. each channel is associated with a single
761 process, but multiple channels share a single connection. This must allow for several variations:
763 process channel connection control
764 1-1-1-s single single single sync
765 1-1-1-a single single single async
766 1-*-1-s single multiple single sync
767 1-*-1-a single multiple single async
768 *-*-1-s multiple multiple single sync
769 *-*-1-a multiple multiple single async
770 1-*-*-s single multiple multiple sync
771 1-*-*-a single multiple multiple async
772 *-*-*-s multiple multiple multiple sync
773 *-*-*-s multiple multiple multiple async
775 the "single" process / "async" control does not necessarily imply an additional
776 "event" process, just that input processong can occur other than as a response
777 to an explicit read/poll
779 the variations are implemented in an amqp:client instance, which also caches
780 properties which would be used for connection establishment. one would like to
781 be able to specify as little as possible in advance, but the control adjust to
782 contingencies. thus the input/output queues are always present to arbitrate
783 access to a given connection.
785 (defclass amqp:client ()
786 (user)
787 (password))
789 with a 1-1-1-S, in synchronous mode, the process writes frames or reads
790 them as explicit operations. nothing happens to read implicitly when writing
791 and write block, so they leave nothing queued.
794 |#

  ViewVC Help
Powered by ViewVC 1.1.5