/[de-setf-amqp]/processing.lisp
ViewVC logotype

Contents of /processing.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 39083 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
5 (document :file
6 (description "This file defines the AMQP input processing pipeline for the 'de.setf.amqp' library.")
7 (copyright
8 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
9 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
10 of the GNU Affero General Public License as published by the Free Software Foundation.
11
12 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
13 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14 See the Affero General Public License for more details.
15
16 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
17 If not, see the GNU [site](http://www.gnu.org/licenses/).")
18
19 (long-description "The reponse to connected input is computed through a sequence of function calls
20 to decode, dispatch, compute, encode and finally send the response. The default path devolves to the static
21 operators defined in `def-amqp-command` forms. (see `commands.lisp`.) The combination of stages supports
22 sufficient specialization, to permit an application to bind additional mechanisms into the process with both
23 static and dynamic extent, based on both lexical and dynamic definitions, and to achieve both synchronous and
24 asynchronous processing.
25
26 The principal distinction is between read- and event-mode processing. The
27 operators
28
29 - `amqp:command-case`
30 - `amqp:command-loop`
31 - `amqp:with-commands`
32
33 initiate explicit processing of read frames for a given channel. Each frame
34 is tested against clauses and processed by the first which matches. If none
35 matches, the first form, by default, initiates standard processing for the
36 connection - which has the effect of an event dispatch. Optionally, the
37 form can suppress processing, delegate to some other operator, or signal an
38 error. The -ecase version signals an error.
39
40 The alternative path initiates the standard processing for a connection
41 based on read frames either as part of a thread's explicit input processing
42 loop, or when interrupted by a connection's event process in a
43 multi-threaded configuration.
44
45 -`process-frame (connection frame)`
46 decodes the frame into frame class. delegates to process-typed-frame
47 with process-decoded-frame as the continuation.
48 - `process-typed-frame (connection channel type frame)`
49 if the frame is a method, applies process-command via call-with-decoded-arguments.
50 otherwise calls it directly with the class respective the frame type and
51 the frame itself
52 -`respond-to-command (connection channel class method-or-frame &rest args)`
53 computes the class.method function, and applies it in a dynamic context
54 augmented by the channel's response functions. If the command target is
55 a class, that is passed, otherwise - for headers and body, the frame
56 itself is passed.
57
58 For an other takes on processing patterns see alternative implementations:
59 RabbitMQ's java library interposes the AMPCommand [1] class on the channel, which
60 acts as a state machine to filter the incoming frames. It releases composed
61 commands which combine the operator/arguments, an envelope with content header
62 properties, and any content body in on entity or passes it through an event-invocation
63 interface. The filtreing means that the library can impose state constraints - eg,
64 that all message content is the correct length, w/o interleaving that with application
65 processing. On the other hand, it impedes streaming.
66
67
68 [1]: http://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.0/rabbitmq-java-client-javadoc-1.7.0/com/rabbitmq/client/impl/AMQCommand.html"))
69
70
71 (defgeneric process-connection-loop (connection)
72 (:documentation "As run in the connection thread loop:
73 - write any pending output frames.
74 - read any pending input frames and dispatch then through process frame
75 - at the outset and before i/o, check that the connection is still open.
76 if not return nil. if io fails - an interrupt or network failure closed
77 the connection, also return nil.")
78
79 (:method ((connection amqp:connection))
80 (let ((in 0) (out 0) (deadline nil))
81 (loop
82 (flet ((ensure-open (&optional (open-p (open-stream-p connection)))
83 (unless open-p
84 (return-from process-connection-loop (values in out))))
85 (heartbeat-needed ()
86 (and deadline
87 (>= (get-universal-time) deadline)))
88 (update-heartbeat ()
89 (let ((heartbeat (connection-heartbeat connection)))
90 (setf deadline
91 (when (plusp heartbeat)
92 (+ (get-universal-time) heartbeat))))))
93 (loop (ensure-open)
94 (let ((frame (get-encoded-frame connection)))
95 (unless frame (return))
96 (unless (write-frame connection frame)
97 (ensure-open nil))
98 (incf out)
99 (release-frame frame)
100 (update-heartbeat)))
101 (when (heartbeat-needed)
102 (send-heartbeat connection))
103 (loop (ensure-open)
104 (unless (stream-listen connection) (return))
105 (let ((frame (claim-input-frame connection)))
106 (unless (read-frame connection frame)
107 (ensure-open nil))
108 (process-frame connection frame)
109 (incf in)))
110 ;; once all io is finished, step back...
111 (bt:thread-yield))))))
112
113
114 (defun connection-toplevel-loop (&optional (*connection* *connection*))
115 (loop
116 (handling-connection-errors
117 (handling-channel-errors
118 (process-connection-loop *connection*)))))
119
120
121 (defgeneric make-connection-thread (connection)
122 (:method ((connection amqp:connection))
123 (bt:make-thread 'connection-toplevel-loop
124 :name "Socket i/o"
125 :initial-bindings `((*connection . ,connection)))))
126
127
128 (defgeneric process-channel-command (channel &key wait)
129 (:method ((channel amqp:channel) &key (wait t))
130 "Read successive frames from the channel, interpret and respond to them."
131 (unless (open-stream-p channel)
132 (amqp:channel-error :channel channel :message-string "process-channel-command on a closed stream."))
133 (let ((frame (get-read-frame channel :wait wait)))
134 (when frame
135 (process-frame channel frame))))
136 (:method ((connection amqp:connection) &key (wait t))
137 (process-channel-command (connection.channel connection :number 0) :wait wait))
138 (:method ((class amqp-connected-object) &key (wait t))
139 (process-channel-command (object-channel class) :wait wait)))
140
141 (defgeneric process-channel-loop (channel &key wait)
142 (:method ((class amqp:object) &key (wait t))
143 "Read successive frames from the context, interpret and respond to them."
144 (loop (unless (process-channel-command class :wait wait)
145 (return)))))
146
147
148 (defun channel-toplevel-loop (&optional (*channel* *channel*))
149 (loop
150 (handling-channel-errors
151 (process-channel-loop *channel* :wait t))))
152
153
154 (defgeneric process-frame (context frame)
155 (:documentation "Given a frame as read from the connection socket, extract the channel and type
156 information an process accordingly. This step happens either in the socket i/o process for asynchronous
157 configurations, or in synchronous configurations, in a thread which reads a frame for its own channel.
158 In the asynchronous case, the aim is to classify the frame, do any immediate and connection/channel-zero
159 processing, and queue the remainder for the respective channel's process. In the synchronous case, where
160 a process has read-through a get-read-frame call, immediate/c-0 processing is also unconditional, and
161 frames for the respective process' channel are returned to the caller. In addition, any frames for channels
162 which the process owns, are processed in the standard pipeline, while those for for other processes, are
163 queued for later processing, for the respective channel's thread to process on its own.")
164
165 (:method ((connection amqp:connection) (frame input-frame))
166 "Determine the channel - which must exist for read framed, and
167 the frame class, the dispatch for those as well as the connection."
168
169 ;; number and channel are extracted from the frame header.
170 ;; iff the frame is a method, the type will be an instance of the specialized method class,
171 ;; as decoded from the frame data. otherwise - for content header and body and for heartbeat,
172 ;; the type determined by the frame itself.
173 (let* ((number (frame-channel-number frame))
174 (channel (connection.channel connection :number number))
175 (decoder (frame-decoder frame)))
176 (process-typed-frame channel decoder frame)))
177
178 (:method ((channel amqp:channel) (frame input-frame))
179 ;; use an autonomous decoder to specialize initial processing
180 (let* ((decoder (frame-decoder frame)))
181 (process-typed-frame channel decoder frame))))
182
183
184 (defgeneric process-typed-frame (channel decoder frame)
185 (:documentation "Process frames specific to their type,
186 CHANNEL : amqp:channel
187 TYPE : amqp:frame-type
188 FRAME : amqp:frame
189
190 The supported frame types (as of 0.9r1) are
191 METHOD : is decoded and processed as a command
192 HEADER : is expected only when a channel is open and receiving a body;
193 pass it through unparsed to the current response function
194 BODY : is expected when a channel is open and receiving;
195 pass it through unparsed to the current response function
196 HEARTBEAT : generate a response hearbeat in non-threaded connections
197 Others are logged and ignored. In general, the frames' basic attributes are
198 decoded for a type specific action. Where the current process is the channel
199 owner, this happens directly. Otherwise the frame is queued. Whereby, if there
200 is no input processing at the moment, the owner process is interrupted
201 asynchronously to handle the frame.
202 Iff the frame is processed, release it.")
203
204 (:method :around ((*channel* amqp:channel) (type deferrable-frame-decoder) (frame amqp:frame))
205 "The :around method compares the channel's thread with the active one, to decide
206 whether to process immediately, or to delegate the procesing to the other thread."
207 (let ((channel-thread (channel-thread *channel*)))
208 (cond ((eq (bt:current-thread) channel-thread)
209 (call-next-method))
210 ((channel-thread *channel*)
211 ;; interlock with the respective thread process and interrupt it if necessary
212 (enqueue frame (device-read-frames *channel*)
213 :if-empty #'(lambda () (bt:interrupt-thread channel-thread #'process-typed-frame
214 *channel* type frame))))
215 (t
216 (amqp:log :warn *channel* "process-typed-frame: disowned channel.")
217 (release-frame frame)))))
218
219 (:method ((channel amqp:channel) (method amqp:method) (frame amqp:frame))
220 "Determine the particular (class x method) combination - first the class, then the
221 respective method. This combination will be without any keys to designate a specific cache entry,
222 so no history will be apperent in them. They control the method argument decoding, and determine the
223 response function and/or intermediate filtering. Processing code must refer to its own instances in order
224 to access past properties."
225 (let* ((class (amqp:ensure-object channel frame))
226 (method (amqp:ensure-method class frame)))
227 (setf (channel-content-object channel) nil) ; not in content
228 (flet ((call-process-command (class method &rest args)
229 (declare (dynamic-extent args))
230 (when args
231 (apply #'reinitialize-instance method args))
232 (apply #'process-command channel class method args)))
233 (declare (dynamic-extent #'call-process-command))
234 (unwind-protect (call-with-decoded-arguments #'call-process-command class method frame)
235 (release-frame frame)))))
236
237 (:method ((channel amqp:channel) (header amqp:header) (frame amqp:frame))
238 "An header frame is received to initiate a message. The channel state should be open in input mode.
239 Interpose the implicit basic target class and treat the header type as the method."
240 (typecase (channel-state channel)
241 (amqp.s:use-channel)
242 (t
243 (amqp:log :warn channel "~a frame, for inconsistent channel state: ~a, ~a"
244 header (channel-state channel) frame)))
245 ;; the content components are singletons, so this will be the last one used
246 (let ((object (amqp:ensure-object channel (content-header-class-name frame))))
247 (setf (channel-content-object channel) object)
248 (flet ((call-process-command (class &rest args)
249 (declare (dynamic-extent args))
250 (when args
251 (apply #'reinitialize-instance class args))
252 (apply #'process-command channel class header :frame frame args)))
253 (declare (dynamic-extent #'call-process-command))
254 (unwind-protect (call-with-decoded-properties #'call-process-command object frame)
255 (release-frame frame)))))
256
257 (:method ((channel amqp:channel) (body amqp:body) (frame amqp:frame))
258 "A body frame is received as part of a message. The channel state should be open in body input mode.
259 Interpose the implicit basic target object and treat the body type as the method.
260 In this case, there are no properties to decode, just manage the frame cache in the object and take
261 the next process step."
262 (typecase (channel-state channel)
263 (amqp.s:use-channel.body.input)
264 (t
265 (amqp:log :warn channel "~a frame, for inconsistent channel state: ~a, ~a"
266 body (channel-state channel) frame)))
267 (let ((object (channel-content-object channel)))
268 (unless object
269 (amqp:unexpected-frame-error :channel channel
270 :frame frame
271 :message-string "Body frame w/o header: ~s."
272 :message-arguments (list frame)))
273 (setf (object-frame object) frame)
274 (unwind-protect (process-command channel object body :frame frame)
275 (setf (object-frame object) nil)
276 (release-frame frame))))
277
278 (:method ((channel amqp:channel) (type amqp:heartbeat) (frame amqp:frame))
279 "Received heartbeats cause read-frame updates the last frame read
280 timestamp. If there is a connection thread, nothing more needs
281 to be done. If there is none, then just echo the heartbeat.
282 (See [http://qpid.apache.org/configure-broker-and-client-heartbeating.html].)"
283 (let ((connection (channel-connection channel)))
284 (unless (connection-thread connection)
285 (send-heartbeat connection))
286 (amqp:log :debug channel "~a frame: ~a" type frame)))
287
288 (:method ((channel amqp:channel) (type unsupported-frame-decoder) (frame amqp:frame))
289 "Unsupported frames are logged and ignored."
290 (amqp:log :warn channel "Unsupported ~a frame: ~a" type frame))
291
292 (:method ((channel amqp:channel) (type t) (frame amqp:frame))
293 "Unknown frames are logged and ignored."
294 (amqp:log :error channel "Unknown ~a frame: ~a" type frame)))
295
296
297
298
299 ;;;
300 ;;; decoding utilities
301
302 (defgeneric call-with-decoded-arguments (operator class method buffer &rest args)
303 (:method ((operator function) class method (frame amqp:frame) &rest args)
304 "Given a frame, extract the drame's data buffer and continue."
305 (declare (dynamic-extent args))
306 (setf (object-frame class) frame)
307 (unwind-protect
308 (apply #'call-with-decoded-arguments operator class method (frame-data frame)
309 args)
310 (setf (object-frame class) nil)))
311 (:method ((operator function) class (method symbol) (frame t) &rest args)
312 (declare (dynamic-extent args))
313 (apply #'call-with-decoded-arguments operator class (amqp:ensure-method class method) frame
314 args)))
315
316 (defgeneric call-with-encoded-arguments (operator class method &rest args)
317 (declare (dynamic-extent args))
318 (:method (op (class amqp:object) (method symbol) &rest args)
319 (declare (dynamic-extent args))
320 (apply #'call-with-encoded-arguments op class (amqp:ensure-method class method)
321 args))
322 (:method (op (class amqp:object) (method fixnum) &rest args)
323 (declare (dynamic-extent args))
324 (apply #'call-with-encoded-arguments op class (amqp:ensure-method class method)
325 args)))
326
327
328 (defgeneric call-with-decoded-properties (operator class frame &rest args)
329 (declare (dynamic-extent args))
330 (:method ((operator function) class (frame amqp:frame) &rest args)
331 "Given a frame, decode the properties from the data buffer and continue."
332 (declare (dynamic-extent args))
333 (setf (object-frame class) frame)
334 (unwind-protect
335 (apply #'call-with-decoded-properties operator class (frame-data frame)
336 args)
337 (setf (object-frame class) nil))))
338
339
340 (defgeneric call-with-encoded-properties (operator class &rest args)
341 (declare (dynamic-extent args)))
342
343
344 #+(or ) ;; this is ambiguous: is it the response or request side?
345 (defgeneric apply-method (method class &rest argument-list)
346 (:documentation "Apply the function specific to the combination of
347 (class x method) to the class and the given argument list.
348
349 CONNECTION : amqp:connection : the distinction client v/s server connection
350 determines the concrete behaviour
351 METHOD : amqp:object : the version-specific method instance
352 CLASS : amqp:object : the version-specific class instance
353 ARGUMENT-LIST : list : the argument list.")
354 (declare (dynamic-extent argument-list))
355
356 (:method ((method t) (class t) &rest args)
357 (declare (dynamic-extent args))
358 (apply (compute-class-method class method) class args))
359
360 (:method ((method amqp:method) (class amqp:object) &rest args)
361 (declare (dynamic-extent args))
362 (let ((handler (channel-command-handler class)))
363 (or (and handler (apply handler class method args))
364 (apply (method-request-function method) (object-channel class) class
365 (apply #'list* args))))))
366
367
368
369 (defmethod send-method ((method t) (class amqp:object) &rest args)
370 (declare (dynamic-extent args))
371 (flet ((write-encoded-method (frame class method)
372 (amqp:log :debug class "send-method: ~a ~a" method frame)
373 ;; nb. this places the constraint on connections, that they have their channel-0 bound
374 ;; before they send any command to the broker.
375 (put-encoded-frame (object-channel class) frame)))
376 (declare (dynamic-extent #'write-encoded-method))
377 (amqp:log :debug class "send-method: ~a . ~s" method args)
378 (apply #'call-with-encoded-arguments
379 #'write-encoded-method class method
380 args)))
381
382 (defmethod encode-method ((method t) (class amqp:object) &rest args)
383 (declare (dynamic-extent args))
384 (flet ((return-encoded-method (frame class method)
385 (declare (ignore class method))
386 (return-from encode-method frame)))
387 (declare (dynamic-extent #'return-encoded-method))
388 (apply #'call-with-encoded-arguments
389 #'return-encoded-method class method
390 args)))
391
392 (defmethod decode-class-properties ((class amqp:object))
393 (reduce #'nconc
394 (mapcar #'(lambda (name)
395 (list (cons-symbol :keyword name)
396 (slot-value class name)))
397 (class-property-slot-names class))))
398
399 (defmethod decode-method-arguments ((method amqp:method))
400 (reduce #'nconc
401 (mapcar #'(lambda (name)
402 (list (cons-symbol :keyword name)
403 (slot-value method name)))
404 (method-argument-slot-names method))))
405
406 (defun amqp.u:class-properties (class)
407 (decode-class-properties class))
408
409 (defun amqp.u:method-arguments (class)
410 (decode-method-arguments class))
411
412
413 (defun send-method* (method class &rest args)
414 (declare (dynamic-extent args))
415 (apply #'send-method method class (apply #'list* args)))
416
417 (defgeneric send-heartbeat (connection)
418 (:method ((connection amqp:connection))
419 "Encode and enqueue an heartbeat frame. Enqueue it just in case, something
420 else happened between the deadline and now. Most likely passes through and is written directly.
421 managing deadlines is left to the caller(s)."
422 (let ((frame (claim-output-frame connection)))
423 (setf-frame-type-class-name 'amqp:heartbeat frame)
424 (setf-frame-channel-number 0 frame)
425 (setf-frame-track-number 0 frame)
426 (setf-frame-size 0 frame)
427 (write-frame connection frame)
428 (put-encoded-frame connection frame))))
429
430 (defmethod send-header ((class amqp:object) &rest args)
431 (declare (dynamic-extent args))
432 (flet ((write-encoded-properties (frame class)
433 (setf-frame-type-class-name 'amqp:header frame)
434 (put-encoded-frame (object-channel class) frame)))
435 (declare (dynamic-extent #'write-encoded-properties))
436 (apply #'call-with-encoded-properties
437 #'write-encoded-properties class
438 args)))
439
440
441
442 (document (process-command dynamic-process-command)
443 "Interface Operators :
444
445
446 static / dynamic / lexical / instantial command handler binding
447
448 Each read frame, once typed and decoded, must be "handled". This
449 can occur in a dynamic context where a frame appears immediately
450 subsequent, and in response to a command on the same channel, or it can be
451 a frame which has appeared asynchronously on a channel for which
452 the most recent request was to subscribe to a queue, or it could be
453 autononomously sent from the broker - eg, to close a queue or
454 return a message.
455
456 The processing mechanism must permit both forms of processing. In
457 particular, because a given channel may recieve a frame in either
458 category at a given time. Rather than configure a channel for this,
459 the mechanism should interpret respective context and operate accordingly.
460 To achieve this, it integrates the following definition forms:
461
462 with-commands (clauses . body)
463 defines a command processor in the current lexical context and
464 binds it dynamically, such that it takes first precedence.
465
466 (setf channel-command) (channel method &optional class) (function)
467 binds a handler function for that (method x class) for frames to be processed
468 in that channel. any extant binding is returned.
469
470 dynamic-process-command (handler channel class type . args)
471 is implemented to apply the hander iff it is a function, otherwise to
472 apply a channel handler iff it is a function, and otherwise to
473 apply the method's respective response function.
474
475 The standard definitions arrange that dynamic-process-command traces
476 back through dynamically bound handlers until either one handles the frame -
477 in that it returns true, or the binding is null - which is the global default.
478 At that point it tries the channel's handle. If there is none, or it declines,
479 then the method's own response function is used.")
480
481 (defparameter *channel-command-handler* nil
482 "When this is bound, it interposes operators before the channel's bound
483 command handlers.")
484
485 (defgeneric process-command (channel class operation &rest args)
486 (:documentation "Process a decoded, read frame as a 'command'. This applies to
487 both method and content frames. A method is dispatched to the respective
488 respond-to operator.")
489 (declare (dynamic-extent args))
490
491 (:method ((channel amqp:channel) (class amqp:object) (operation frame-decoder) &rest args)
492 ;; at this point, the frame object has been reified, and any operation arguments or properties
493 ;; have been decoded. the frame-decoder either indicated the method, the content element.
494 ;; try the available response functions and apply the first found to the
495 ;; connection, target class, and the argument list
496 (declare (dynamic-extent args))
497 (amqp:log :debug channel "process-command: ~a ~a . ~s" class operation args)
498 (apply #'dynamic-process-command *channel-command-handler*
499 channel class operation args)))
500
501 (defgeneric dynamic-process-command (dynamic-handler channel class type &rest args)
502 (declare (dynamic-extent args))
503
504 (:method ((operator t) (channel t) (class t) (method amqp:method) &rest args)
505 "Given an unhandled method, apply its own operator, for which the initial definition
506 is the static response operator."
507 (declare (dynamic-extent args))
508 (apply (method-response-function method) channel class args))
509
510 (:method ((operator null) (channel amqp:channel) (class t) (type t) &rest args)
511 "Given a channel, if it has a handler try it. If none is present, or it declines
512 continue with the next method, to apply the method's own operator."
513 (declare (dynamic-extent args))
514 (let ((channel-operator (channel-command-handler channel)))
515 (etypecase channel-operator
516 (null (call-next-method))
517 (function (or (apply channel-operator channel class type args)
518 (call-next-method))))))
519
520 (:method ((operator function) (channel t) (class t) (type t) &rest args)
521 (declare (dynamic-extent args))
522 (apply operator channel class type args))
523
524 (:method ((operator t) (channel t) (class t) (frame frame-decoder) &rest args)
525 ;; this shouldn't happen - it means that a content frame arrived outside
526 ;; of a dynamic context which should be integrating it in an earlier operation
527 (declare (dynamic-extent args) (ignore args))
528 (amqp:unexpected-frame-error :connection (when (typep channel 'amqp:channel) (channel-connection channel))
529 :channel channel
530 :frame frame
531 :message-string "[process-command] frame not processed: ~s, ~s."
532 :message-arguments (list class frame))))
533
534
535
536
537 ;;; It would be possible to implement these as methods, install and remove
538 ;;; for the dynamic extent, but that would mean that the effective
539 ;;; function is continuously recomputed.
540 ;;; The implemented mechanism binds a current handler to a dynamic variable
541 ;;; and retains a previous version in a lexical binding, which it calls
542 ;;; should the new handler not 'accept' the combined (class x method). Should
543 ;;; there be no previous version, it calls the respective channel's command
544 ;;; handler. from which, by default, the static function is invoked as the
545 ;;; default method.
546
547 (defun compute-command-clauses (class-var0 method-var0 args-var clauses)
548 "Transform the command clauses, each of the form
549 (method-name ((class type) . args) . body)
550 into cond clauses for inclusion in a command processing operator.
551 This does _not_ add any defaults, it just sorts the clauses by subtype.
552 The with-commands operator intends to pass anything which falls through to the
553 next handler on the dynamic stack."
554 (flet ((rewrite-clause (clause)
555 (destructuring-bind (method-type (class &rest arglist) &rest body)
556 clause
557 (let ((class-var (if (consp class) (first class) class))
558 (class-type (if (consp class) (second class) t))
559 (method-var (gensym (string :method-))))
560
561 `((and (typep ,class-var0 ',class-type) (typep ,method-var0 ',method-type))
562 (let ((,class-var ,class-var0) (,method-var ,method-var0))
563 (declare (ignorable ,class-var ,method-var))
564 (destructuring-bind ,arglist ,args-var
565 ,@body)))))))
566
567 ;; sort by method, then class, then channel
568 (setf clauses (sort (copy-list clauses)
569 #'(lambda (c1 c2)
570 (let* ((m1 (first c1))
571 (m2 (first c2))
572 (c1 (first (second c1)))
573 (c2 (first (second c2))))
574 (setf c1 (if (consp c1) (second c1) t))
575 (setf c2 (if (consp c2) (second c2) t))
576 (if (eql c1 c2)
577 (subtypep m1 m2)
578 (subtypep c1 c2))))))
579 (mapcar #'rewrite-clause clauses)))
580
581
582 (defmacro amqp:with-commands (command-clauses &rest body)
583 "Defines a lexically scoped command handler, which is to take precedence over any extand
584 dynamic handlers. Each clause has the form
585 (method-type (class . arguments) . body)
586 these are collected into an operator which is shadows the existing global
587 definition during the dynamic extend of the forms body. If no form matches a to-be-processed
588 frame, the next handler in the dynamic chain is invoked. (See dynamic-process-command)"
589
590 (let ((handlers-op (gensym (string :command-handler-)))
591 (body-op (gensym (string :command-body-)))
592 (channel-var (gensym (string :channel-)))
593 (class-var (gensym (string :class-)))
594 (type-var 'amqp:method)
595 (args-var (gensym (string :arglist-))))
596 `(flet ((,handlers-op (,channel-var ,class-var ,type-var &rest ,args-var)
597 (declare (dynamic-extent ,args-var)
598 ;; this is present to keep the interface uniform. in the
599 ;; dynamic context, the channel instance is known, so there's
600 ;; no reason for it to appear in the clauses
601 (ignore ,channel-var))
602 (cond ,@(compute-command-clauses class-var type-var args-var command-clauses)))
603 (,body-op () ,@body))
604 (declare (dynamic-extent #',handlers-op #',body-op))
605 (call-with-command-handlers #',body-op #',handlers-op))))
606
607 (defun call-with-command-handlers (op commands-op)
608 (declare (dynamic-extent op commands-op))
609 (let ((previous-handler *channel-command-handler*))
610 (flet ((apply-command-handler (channel class type &rest args)
611 (declare (dynamic-extent args))
612 (or (apply commands-op channel class type args)
613 (apply #'dynamic-process-command previous-handler
614 channel class type args))))
615 (declare (dynamic-extent #'apply-command-handler))
616 (let ((*channel-command-handler* #'apply-command-handler))
617 (funcall op)))))
618
619
620
621 (defmacro command-loop ((channel &key (wait nil wait-s)) &rest command-clauses)
622 `(block command-loop
623 (amqp:with-commands ,command-clauses (process-channel-loop ,channel ,@(when wait-s `(:wait ,wait))))))
624
625 (defmacro command-case ((channel &key (wait nil wait-s)) &rest command-clauses)
626 `(block command-case
627 (amqp:with-commands ,command-clauses (process-channel-command ,channel ,@(when wait-s `(:wait ,wait))))))
628
629 #+mcl
630 (setf (ccl:assq 'command-loop ccl::*fred-special-indent-alist*) 1)
631 #+mcl
632 (setf (ccl:assq 'command-case ccl::*fred-special-indent-alist*) 1)
633
634
635 (document (compute-channel-command-handler channel-command-handler channel-command)
636 "Instance-scoped Commands :
637
638 Instance-scoped commands are integrated into a channel's command-handler function.
639 Each is a implemented in a method, specialized on the method's class
640 the operator is defined on demand and the handlers are added/removed
641 with the (setf channel-command-handler) operator.")
642
643 (defun compute-channel-command-handler (channel)
644 (let* ((name (channel-name channel))
645 (function (ensure-generic-function name
646 :lambda-list '(channel class method &rest args)
647 :declare '((dynamic-extent args)))))
648 (c2mop:ensure-method function
649 '(lambda (channel class method &rest args)
650 (declare (ignore channel class method args)
651 (dynamic-extent args))
652 ;; the default method returns nil
653 nil)
654 :specializers (mapcar #'find-class '(amqp:channel t t)))
655 function))
656
657 (defmethod channel-command-handler ((channel amqp:channel))
658 (with-slots (command-handler) channel
659 (or command-handler
660 (setf command-handler (compute-channel-command-handler channel)))))
661
662 (defmethod (setf channel-command-handler) (function (channel amqp:channel))
663 (with-slots (command-handler) channel
664 (when (typep command-handler 'generic-function)
665 (dolist (method (c2mop:generic-function-methods command-handler))
666 (remove-method method command-handler)))
667 (setf command-handler function)))
668
669 (defgeneric (setf channel-command) (method-function channel method-name)
670 (:documentation "Binds the respective command in the channel's instance
671 context to the given function. If the function is null, then removes the
672 command handler. Returns the existing handler.
673 nb. uses ensure-method, which requires a lambda expression in order to
674 wrap it is a method function. which means function literals are wrapped.")
675
676 (:method :before ((function t) (channel t) (method-name symbol))
677 (assert (subtypep method-name 'amqp:method) ()
678 "Invalid method name: ~s." method-name))
679
680 (:method ((function null) (channel amqp:channel) (method-name symbol))
681 (let* ((operator (channel-command-handler channel))
682 (method (when operator (find-method operator nil (list 'amqp:channel 'amqp:object method-name)
683 nil))))
684 (when method
685 (remove-method operator method)
686 method)))
687
688 (:method ((lambda cons) (channel amqp:channel) (method-name symbol))
689 (let* ((operator (channel-command-handler channel))
690 (method (when operator (find-method operator nil (list 'amqp:channel 'amqp:object method-name)
691 nil))))
692 (when method (remove-method operator method))
693 (c2mop:ensure-method operator lambda
694 :qualifiers nil
695 :specializers (mapcar #'find-class `(amqp:channel amqp:object ,method-name))
696 :lambda-list '(channel class method &rest args))
697 method))
698
699 (:method ((function function) (channel amqp:channel) (method-name symbol))
700 (setf (channel-command channel method-name)
701 `(lambda (channel class method &rest args)
702 (declare (dynamic-extent args))
703 (apply ,function channel class method args)))))
704
705
706 #|
707
708 ;;; initial thoughts on how to process. for the most part adopted by the implementation...
709 ;;;
710 ;;; various methods can intervene in the request/response process. they differ
711 ;;; according to the application architecture
712 ;;;
713 ;;; - a specialized protocol class can be used to extend the request-/respond-to
714 ;;; operators to implement synchronous message processing.
715 ;;; - it can be introduced by specializing the connection or the channel
716 ;;; classes and the ensure-object constructor
717 ;;; - it can be introduced by modifying the connection's type->class map
718 ;;; either statically or dynamically
719 ;;; - custom behavior can be implemented strictly in command-case forms as a
720 ;;; static state machine.
721 ;;; - custom behavior can be implemented by adding a handler to the channel
722 ;;; to apply arbitrary functions to frames universally or selectively.
723
724
725 handle-frame (frame) ->
726 handle-class-and-method (class . command) ->
727 loop handlers (funcall handler class command)
728 ;; if none handles
729 (apply-method method class)
730
731 ? using the (with- ) pattern: (consuming , or (with-comsumer ?
732 analogous to clim accept contexts, it affects the (connection? channel? )input within its dynamic context
733
734 define message processing protocol by specializing individual functions and providing a specialized protocol class instance,
735 that is, by implementing a protocol interface, or by providing a processing object which defines all the requisite methods
736 in terms of the data.
737
738 as clos is not a containment model for specialization, the operators would need an additional initial arguement, event
739 though it is mostly redundant, since most commands apply to just one class.
740 it is more succinct to juet spread the method instance's arguments in the function call and apply it, by name to the class.
741
742 this leaves class specialization as the customization method.
743 if the method operators are statially defined, this s easier to comprehend and maintain, but harder to change the behaviour dynamically.
744 one must substitute a different "factory" class - connection, channel, etc in order to get specialized command method discriminators.
745
746 which makes it difficult to achieve (with semantics
747
748
749 three separate issues
750 - protocol: which operations and which logic apply to which read/to-be-written commands.
751 - control : asynchronous/synchronous binding of control flow to the command.
752 - visibility : the scope and extent of the definition.
753
754 the protocol is specified by combining send-*/receive, command-case with
755 implicit encoding and decoding operations and logic which interprets/specifies
756 class and command fields and properties.
757
758 the control structure concerns how the stream of input frames is coordinated with
759 the stream of output frames and with other application processing. the input and
760 output streams are both queue mediated. each channel is associated with a single
761 process, but multiple channels share a single connection. This must allow for several variations:
762
763 process channel connection control
764 1-1-1-s single single single sync
765 1-1-1-a single single single async
766 1-*-1-s single multiple single sync
767 1-*-1-a single multiple single async
768 *-*-1-s multiple multiple single sync
769 *-*-1-a multiple multiple single async
770 1-*-*-s single multiple multiple sync
771 1-*-*-a single multiple multiple async
772 *-*-*-s multiple multiple multiple sync
773 *-*-*-s multiple multiple multiple async
774
775 the "single" process / "async" control does not necessarily imply an additional
776 "event" process, just that input processong can occur other than as a response
777 to an explicit read/poll
778
779 the variations are implemented in an amqp:client instance, which also caches
780 properties which would be used for connection establishment. one would like to
781 be able to specify as little as possible in advance, but the control adjust to
782 contingencies. thus the input/output queues are always present to arbitrate
783 access to a given connection.
784
785 (defclass amqp:client ()
786 (user)
787 (password))
788
789 with a 1-1-1-S, in synchronous mode, the process writes frames or reads
790 them as explicit operations. nothing happens to read implicitly when writing
791 and write block, so they leave nothing queued.
792
793
794 |#

  ViewVC Help
Powered by ViewVC 1.1.5