/[de-setf-amqp]/classes.lisp
ViewVC logotype

Contents of /classes.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 54770 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
5 (document :file
6 (description "This file defines the CLOS model for AMQP `object` and `method` entities for the
7 'de.setf.amqp' library.")
8 (copyright
9 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
10 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
11 of the GNU Affero General Public License as published by the Free Software Foundation.
12
13 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
14 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 See the Affero General Public License for more details.
16
17 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
18 If not, see the GNU [site](http://www.gnu.org/licenses/).")
19
20 (long-description
21 "Each AMQP protocol version presents a particular model for message processing. At
22 the same time, despite the variations, all message-level models distinguish
23 between 'class' and 'operator' entities, and the transport-level models all
24 concern exchanges of 'framed' data through 'connections'. At the message
25 level the classes exhibit various relations - either connection/channel/class
26 or connection/session/channel/links/class, while operators are termed variously
27 'methods', 'commands', and 'controls'. at the same time, their specialized roles do not affect
28 their basic representation. All are 'classes' on an abstract level. The distinction
29 is of a containment hierarchy.
30
31 At the transport level, transported data is broken into various comstiteunts -
32 either 'content' and 'frames' or 'assemblies', 'segments' and 'frames', but
33 this does not affect the respective representation either.
34
35 This implementation reflects the follwoing general order.
36
37 1. The primary interface classes are amqp:object and amqp:method. These are specialized in terms
38 of protocol version, and according to the respetive protocol's functional model. In terms of the
39 protocols' taxonometric class hierarchy - eg, for 0.8r0, 0.9r1, this manifests in the class definitions
40 and their relations:
41 connection (1)--
42 -- (*) channel (1) ---
43 --(*) access basic cluster dtx exchange file link queue stream tx
44 with each class associated with the respective methods, eg.
45 CHANNEL : OPEN OPEN-OK FLOW FLOW-OK CLOSE CLOSE-OK
46 EXCHANGE : DECLARE DECLARE-OK DELETE DELETE-OK
47 (See the version's respective classes.lisp)
48
49 2. At the transport level, each protocol's model is reflected in the
50 data containers - eg channel, content, assembly, segment, link, and frame.
51
52 3. At the wire level each protocol version specifies encoding rules and
53 how to map its defined data types to lisp data.
54
55 This file defines the abstract classes, the 'commands' file implements the
56 protocol logic in terms of them, and the 'data-wire-encoding' file defines the
57 general data codecs. Each protocol is implemented in three files in the
58 eponymous directory:
59 'data-wire-encoding' : defines codecs in terms of the protocol's type domain names
60 which are used in the protocol operators. Compound codecs - eg, map,
61 array, and list, require version-specific type codes.
62 'abstract-classes' : defines the protocol-specific abstract classes.
63 'classes' : defines codecs for the concrete classes and methods."))
64
65
66
67 (defun amqp-version-p (x) (assoc x amqp.u:*version-headers*))
68
69 (deftype amqp:version () '(satisfies amqp-version-p))
70
71
72 ;;;
73 ;;; frame decoders
74
75 (defclass frame-decoder ()
76 ((context
77 :initform nil :initarg :context
78 :accessor frame-context
79 :type (or null amqp:object)
80 :documentation "Records the frames context. If the frame is an autonomous prototype, no context is
81 present, as the same instance is reused to specialize initial processing. Once an object is associated
82 with the frame, a context-specific decocder (most likely the respective method) is interposed."))
83 (:documentation "The abstract root class for all parsed representations for frames. This includes the
84 command methods as well as content haeaders and bodies. Each frame encodes a command. In order to perform if
85 the frame is decomposed into two constituents: and object and an operation. The decomposition step first
86 associates a decoder with the frame and then delegates to parsing functions to extract the constituent
87 details. The initial association is with abstract singletons which belong to htbased on frame type
88 singletons based on the frame class and method codes. Once parsing proceeds, methods
89 are decimaed into keywords and the class is replaced with a channel-specific instance
90 which can cache the of the frame content. In the case of content bodies, however, no replacement
91 is necessary, as the body passes through the processing tree opaquely."))
92
93 (defclass supported-frame-decoder (frame-decoder) ())
94 (defclass unsupported-frame-decoder (frame-decoder) ())
95 (defclass deferrable-frame-decoder (supported-frame-decoder) ())
96 (defclass immediate-frame-decoder (supported-frame-decoder) ())
97 (defclass content-frame-decoder (deferrable-frame-decoder) ())
98
99 (defclass amqp:body (content-frame-decoder) ())
100
101 (defclass amqp:header (content-frame-decoder) ())
102
103 (defclass amqp:heartbeat (immediate-frame-decoder) ())
104
105 (defclass amqp:oob-method (unsupported-frame-decoder) ())
106
107 (defclass amqp:oob-header (unsupported-frame-decoder) ())
108
109 (defclass amqp:oob-body (unsupported-frame-decoder) ())
110
111 (defclass amqp:trace (unsupported-frame-decoder) ())
112
113 #+(or ) ;; this inteferes with the access.request method class
114 (defclass amqp:request (unsupported-frame-decoder) ())
115
116 (defclass amqp:response (unsupported-frame-decoder) ())
117
118 (defmethod shared-initialize ((instance frame-decoder) (slots t) &rest initargs
119 &key context (channel nil channel-s) (connection nil connection-s))
120 (when (and (not channel-s) (amqp:channel-p context))
121 (setf channel context))
122 (when (and (not connection-s) (amqp:connection-p context))
123 (setf channel context))
124 (apply #'call-next-method instance slots
125 :channel channel
126 :connection connection
127 initargs))
128
129 ;;;
130 ;;; the abstract class of amqp protocol objects
131
132 (defclass amqp:object ()
133 ((id :reader amqp:class-id)
134 (protocol-version
135 :initform *default-version* :allocation :class
136 :reader class-protocol-version)
137 #+(or ) ; supplanted by specific slots
138 (protocol-instances
139 :initform (make-hash-table :test 'equal)
140 :reader class-protocol-instances
141 :documentation "Caches methods for uses as protocol elements.")
142 (state
143 :initform nil
144 :accessor class-state)
145 (frame
146 :initform nil
147 :accessor object-frame
148 :documentation "Caches the current frame while the class is processed.")
149 (property-slot-names
150 :reader class-property-slot-names
151 :documentation "A list of slot names for those slots which correspond to
152 protocol class properties.")
153 (argument-slot-names
154 :reader class-argument-slot-names
155 :documentation "A list of slot names for those slots which correspond to
156 protocol method arguments.")
157 (method-names
158 :initform nil
159 :reader class-method-names
160 :allocation :class)
161 (context
162 :initform nil :initarg :context
163 :accessor object-context
164 :type (or null amqp:object)))
165 (:documentation "The abstract root class for all protocol classes."))
166
167
168 (defclass amqp-connected-object (amqp:object)
169 ((context
170 :initform (error "connection required")
171 :initarg :connection
172 :reader object-connection))
173 (:documentation "Specified for all classes other than the connection itself in
174 order to delegate to it."))
175
176 (defclass amqp-channeled-object (amqp-connected-object)
177 ((context
178 :initarg :channel
179 :reader object-channel))
180 (:documentation "Specified for all classes other than the channel itself in
181 order to delegate to it."))
182
183 (defclass amqp-content-object (amqp:object)
184 ((weight
185 :initform 0 :initarg :weight
186 :accessor class-weight)
187 (body-size
188 :initform 0 :initarg :body-size
189 :accessor class-body-size))
190 (:documentation "Mixed into classes which are associated with content, to provide
191 slots for the data which is carried directly in the header, as the protocol objects does not include those
192 fields."))
193
194
195
196 ;;;
197 ;;; the abstract method class is not differentiated client/server as operations
198 ;;; are specialized by connection
199
200
201 (defclass amqp:method (deferrable-frame-decoder)
202 ((id :reader amqp:method-id)
203 (context :reader method-object)
204 (name
205 :initform (error "name required.")
206 :reader amqp:method-name
207 :documentation "The protocol's version-independent name for this method.
208 Defined per abstract method class.")
209 (request-function
210 :reader method-request-function)
211 (response-function
212 :reader method-response-function)
213 (argument-slot-names
214 :reader method-argument-slot-names
215 :documentation "A list of slot names for those slots which correspond to
216 protocol arguments.")))
217
218 (defclass amqp:message ()
219 ((channel :initform nil :initarg :channel)
220 (class :initform nil :initarg :class)
221 (weight :initform nil :initarg :weight)
222 (content :initform nil :initarg :content)
223 (flags :initform nil :initarg :flags)
224 (properties :initform nil :initarg :properties)))
225
226
227
228 (defgeneric object-connection (class)
229 )
230
231 (defgeneric object-channel (class)
232 )
233
234
235
236 ;;;
237 ;;; abstract class classes
238
239
240 (def-amqp-abstract-class amqp:access (amqp-channeled-object)
241 ())
242
243
244 (def-amqp-abstract-class amqp:basic (amqp-channeled-object amqp-content-object)
245 ((context
246 :reader basic-channel)
247 (exchange-instance
248 :initform nil
249 :accessor basic-exchange
250 :type (or string null)
251 :documentation "Caches the exchange from the most recent publish for re-use in chunked content.")
252 (mime-type
253 :initform nil :initarg :mime-type
254 :accessor class-mime-type)))
255
256
257 (def-amqp-abstract-class amqp:channel (amqp-connected-object amqp-connection-device)
258 ((context
259 :reader channel-connection)
260 (uri
261 :reader channel-uri)
262 (name
263 :initform (gensym "channel")
264 :reader channel-name)
265 (number
266 :initform (error "number required") :initarg :number
267 :reader channel-number :writer setf-channel-number
268 :type fixnum)
269 (track
270 :initform 0 :initarg :track
271 :reader channel-track)
272 (state
273 :initform amqp.s:open-channel
274 :type amqp.s:channel-state)
275 (thread
276 :initform (bt:current-thread)
277 :reader channel-thread
278 :documentation "The thread which processes this channel.
279 Likely, the thread which created it.")
280 (conditions
281 :initform nil
282 :accessor channel-conditions
283 :documentation "Caches conditions raised on the channel for re-use.")
284 (command-handler
285 :initform nil
286 :documentation "Binds a handler function, with the signature (class method &rest arguments),
287 which are applied when handle-channel-methods.")
288 #+(or) ;; needs to be specific to basic instance
289 (exchange-instance
290 :initform nil
291 :reader channel-exchange :writer setf-channel-exchange)
292 (realm
293 :initform nil :initarg :realm
294 :reader amqp.u:channel-realm
295 :documentation "Should the protocol support/reauire realms, the channel negotiates access
296 as a late step in the device-open procedure and bind both the realm and the tick for future use.
297 The channel's realm comprises the realm proper +value exclusive passive active write read")
298 (ticket
299 :initform nil :initarg :ticket
300 :accessor amqp.u:channel-ticket
301 :documentation "Should the protocol support/reauire realms, the channel negotiates access
302 as a late step in the device-open procedure and bind both the realm and the tick for future use.")
303 (content-object
304 :initform nil
305 :accessor channel-content-object
306 :documentation "The most recent class which contained (the current) content.
307 Updated by respond to typed-frame, but not cleared.")
308 (acknowledge-messages
309 :initform nil
310 :accessor channel-acknowledge-messages
311 :documentation "Indicates whether no-ack was selected when a a message was
312 requested via Consume or Get. If selected, then each receipt is automatically
313 acknowledged as the last step of the respond-to operation.")
314 ;; caches for protocol instances
315 (amqp-basic
316 :initform nil
317 :reader get-channel-basic :writer setf-channel-basic
318 :type (or null amqp:basic)
319 :documentation "Caches the channel basic instance.")
320 (amqp-body
321 :initform nil
322 :reader get-channel-body :writer setf-channel-body
323 :type (or null amqp:body)
324 :documentation "Caches the channel body instance.")
325 (amqp-exchanges
326 :initform nil
327 :reader get-channel-exchanges :writer setf-channel-exchanges
328 :type list
329 :documentation "Caches the channel exchange instances by exchange name.")
330 (amqp-file
331 :initform nil
332 :reader get-channel-file :writer setf-channel-file
333 :type (or null amqp:file)
334 :documentation "Caches the channel file instance.")
335 (amqp-header
336 :initform nil
337 :reader get-channel-header :writer setf-channel-header
338 :type (or null amqp:header)
339 :documentation "Caches the channel header instance.")
340 (amqp-queues
341 :initform nil
342 :reader get-channel-queues :writer setf-channel-queues
343 :type list
344 :documentation "Caches channel queues according to queue name.")
345 (amqp-stream
346 :initform nil
347 :reader get-channel-stream :writer setf-channel-stream
348 :type (or null amqp:stream)
349 :documentation "Caches the channel stream instance.")
350 (amqp-tx
351 :initform nil
352 :reader get-channel-tx :writer setf-channel-tx
353 :type (or null amqp:tx)
354 :documentation "Caches the channel tx instance.")))
355
356 (def-amqp-abstract-class amqp:cluster (amqp-channeled-object)
357 ())
358
359 (def-amqp-abstract-class amqp:connection (amqp:object amqp-socket-device)
360 ((uri
361 :reader connection-uri)
362 (frame-size :type number
363 :initform *frame-size*
364 :initarg :frame-size
365 :reader connection-frame-max :reader connection-frame-size
366 :writer set-connection-frame-max)
367 (amqp:heartbeat
368 :initform 0 :initarg :heartbeat
369 :accessor connection-heartbeat
370 :type (unsigned-byte 16))
371 (mode
372 :initform :simplex :initarg :mode
373 :reader connection-mode
374 :type (member :multiplex :simplex))
375 (amqp:mechanism
376 :initform *default-mechanism* :initarg :mechanism
377 :reader connection-mechanism)
378 (amqp:locale
379 :initform *default-locale* :initarg :locale
380 :reader connection-locale
381 :documentation "Specifies the connection's locale. If set as an initializati on argument, this
382 constrains the connection negotiation. If not set, the first of the server's suggestions is
383 adopted.")
384 (lock
385 :reader connection-lock)
386 (state
387 :initform amqp.s:open-connection
388 :type amqp.s:connection-state)
389 (protocol-version
390 :reader connection-protocol-version
391 :documentation "Provide a connection- accessor and also a default version for the
392 abstract class, for use to start the negotiation process.")
393 (input-frame-class
394 :initarg :input-frame-class
395 :reader connection-input-frame-class
396 :type symbol
397 :documentation "Specifies the class to use to decode wire-level frames.
398 The default value is specific per protocol version. each is wrapped around
399 an input data buffer and header to control the decoding process.")
400 (output-frame-class
401 :initarg :output-frame-class
402 :reader connection-output-frame-class
403 :type symbol
404 :documentation "Specifies the class to use to encode wire-level frames.
405 The default value is specific per protocol version. These are wrapped around
406 and output data buffer and header to control the encoding process.")
407 (amqp::server-properties
408 :initform nil
409 :accessor amqp:connection-server-properties)
410 (amqp::client-properties
411 :initform nil :initarg :client-properties
412 :accessor amqp:connection-client-properties
413 :documentation "The properties sent to the server in as start-ok response.")
414 (protocol-instances
415 :reader connection-protocol-instances
416 :documentation "Caches classes for uses as protocol elements.
417 Ideintical with the method cache, but they concern different namespaces")
418 (thread
419 :initform nil
420 :reader connection-thread
421 :documentation "If null, then no asynchronous processing occurs.
422 Otherwise, it is the thread which reads/writes the connection's
423 socket, dispatches read frames to responds finctions, and generates
424 heartbeats. The initial value is nil, a true :threaded initialization
425 argument cause creation and activation of a new thread.")
426 (read-frame-timestamp
427 :initform 0
428 :accessor connection-read-frame-timestamp)
429 (write-frame-timestamp
430 :initform 0
431 :accessor connection-write-frame-timestamp)
432 ;; protocol instance cache slots
433 (amqp-channels
434 :initform nil
435 :reader get-connection-channels :writer setf-connection-channels
436 :type (or null vector)
437 :documentation "Caches the connection channel instances by number.")
438 (amqp-heartbeat
439 :initform nil
440 :reader get-connection-heartbeat :writer setf-connection-heartbeat
441 :type (or null amqp:heartbeat)
442 :documentation "Caches the connection heartbeat instance.")))
443
444 (def-amqp-abstract-class amqp:dtx (amqp-channeled-object)
445 ((context
446 :reader dtx-channel)))
447
448 (def-amqp-abstract-class amqp:exchange (amqp-channeled-object)
449 ((context
450 :reader exchange-channel)
451 (amqp::type
452 :initform "direct"
453 :initarg :type
454 :type string
455 :reader amqp::exchange-type)))
456
457 (def-amqp-abstract-class amqp:file (amqp-channeled-object)
458 ((context
459 :reader file-channel)))
460
461 (def-amqp-abstract-class amqp:link (amqp-channeled-object)
462 ((context
463 :reader link-channel)))
464
465 (def-amqp-abstract-class amqp:queue (amqp-channeled-object)
466 ((context
467 :reader queue-channel)))
468
469 (def-amqp-abstract-class amqp:stream (amqp-channeled-object)
470 ((context
471 :reader stream-channel)))
472
473 (def-amqp-abstract-class amqp:session (amqp-channeled-object)
474 ((context
475 :reader session-channel)))
476
477 (def-amqp-abstract-class amqp:tx (amqp-channeled-object)
478 ((context
479 :reader tx-channel)))
480
481 (def-amqp-abstract-class amqp:test (amqp-channeled-object)
482 ((context
483 :reader test-connection))) ; ??
484
485 (def-amqp-abstract-class amqp:tunnel (amqp-channeled-object)
486 ((context
487 :reader tunnel-connection))) ; ??
488
489
490 ;;;
491 ;;; constructors - class and connection relative, to map abstract types
492 ;;; to version-specific classes. the primary operators (ensure-method ensure-object)
493 ;;; combine a context and a designator - either a code when parsing, or an abstract
494 ;;; name in processing functions, and produce an instance of the concrete
495 ;;; versioned class. in the case of methods, the instance never changes state, while
496 ;;; in the case of classes, each is reinitialized if supplied initargs.
497 ;;; the primary operators rely on versiour resolution operators which map between
498 ;;; class/method names and codes for the given version.
499
500 (defgeneric connection-class-code-class-name (connection class-code)
501 (:method ((connection amqp:connection) (code (eql 0)))
502 nil))
503
504 (defgeneric connection-class-name-class-code (connection class-name)
505 )
506
507 (defgeneric class-method-code-method-name (class method-code)
508 )
509
510 (defgeneric class-method-name-method-code (class method-name)
511 )
512
513 (defgeneric connection-method-code-method-name (connection class method-code)
514 (:method ((connection amqp:connection) (class null) (code (eql 0)))
515 nil)
516 (:method ((connection amqp:connection) (class-code integer) (method-name t))
517 (connection-method-code-method-name connection
518 (connection-class-code-class-name connection class-code)
519 method-name)))
520
521 (defgeneric connection-method-name-method-code (connection class method-name)
522 (:method ((connection amqp:connection) (class-code integer) method)
523 (connection-method-name-method-code connection
524 (connection-class-code-class-name connection class-code)
525 method)))
526
527
528 (defgeneric class-find-object-class (class class-class-designator)
529 (:method ((connection amqp:channel) (designator (eql 'amqp:header)))
530 (find-class 'amqp:header))
531 (:method ((connection amqp:channel) (designator (eql 'amqp:body)))
532 (find-class 'amqp:body))
533 (:method ((connection amqp:connection) (designator (eql 'amqp:heartbeat)))
534 (find-class 'amqp:heartbeat)))
535
536 (defgeneric class-find-method-class (class method-class-designator)
537 )
538
539
540 (document (amqp:ensure-method amqp:ensure-object)
541 "Each class combines with its operators to perform commands. In addition each channel is associated with
542 class.command instances which apply to it and a connection is assocaiated with it channels. In order that
543 subsequent operations reflect previous settings, each context caches constituents. In the case of the
544 (connection x channel) relation the channel number is the designator. For (channel x (exchange + queue))
545 relations it is the respective name. For anonymous entities, the type suffices.")
546
547
548 (defgeneric amqp:ensure-method (class designator &rest initargs)
549 (:documentation "Retrieve or create a version-specific method instance given the a class instance and a
550 method designator. As designator, accept either a wire code or an abstract method name. Concrete method
551 names should not be specified. The name is used as a cache key to treat the methon as a singleton with
552 respect to the class. If none exists, a new instance is cached and returned.")
553 (declare (dynamic-extent initargs))
554
555 )
556
557
558 (defgeneric amqp:ensure-object (context class-designator &rest initargs)
559 (:documentation "Construct a new class instance respective the given context.
560 CONTEXT : (or connection channel) : the context for the class. channels are relative
561 to a connection, all others relative to a channel.
562 CLASS-NAME : symbol : the abstract protocol class name
563 . INITARGS : list : initialization arguments supplied to create a new instance or reinitialize a cache done.
564
565 A connection allows channels only. A channel treats the channel and connection
566 types as designating the respective instances and everything else as a
567 channel-relative singleton. All other conntected contexts delegate to their channel.")
568 (declare (dynamic-extent initargs))
569
570 )
571
572
573
574
575
576
577
578 ;;;
579 ;;; class methods
580
581 ;;; on input, methods act as markers to permit filtering rather than calling a static function
582 ;;; (even dynamically rebound). the arguments are passed on the stack, but also cached for future reference
583 ;;; ?in the method, the class or the channel?
584 ;;; channel, no - since things like queue, exchange, realm input is specific to that class
585 ;;; class, no -a queue.bind can specify more than one exchange and a channel.publish takes exchange, and
586 ;;; routing.
587 ;;; method-scoped binding is required.
588 ;;; exceptions can be implemented for specific things, like basic's content type, as additional methods.
589 ;;;
590 ;;; operators
591 ;;; (method-name class . args) : perform the command request. this is a useful shorthand, but
592 ;;; !! is not sufficient for method re-use. publish, for example, allows as arguments exchange and
593 ;;; !! routing-key, which are multiple-per-channel. one needs to cache them in the publish instance
594 ;;; !! and apply them to the channel & explicit args to generate the effective request. to accomplish this,
595 ;;; !! the elementary method-name operator delegates to the request- operator, which takes explicit
596 ;;; !! arguments or defaults them from the class. if applied to a method instance, the defaults come
597 ;;; !! first from the method, which then delegates to the class.
598 ;;; -> (REQUEST-method-name class . args) [explicitly coded]
599 ;;; -> (send-method-name class . args)
600 ;;; (class-name.class-name class . args) : make a class-scoped class [explicitly coded according to dependency]
601 ;;; (class-name.method-name class . args) : make a class-scoped method [in def-amqp-method]
602 ;;; (SEND-METHOD method class . args) : encode and send request w/ default arguments from the method
603 ;;; (SEND-method-name class . args) :
604 ;;; -> (SEND-METHOD (class-name.method-name class . designator-arg) class . args)
605
606 ;;; eg.
607 ;;;
608 ;;; (defmethod amqp:send-publish ((class amqp:basic) &rest arguments)
609 ;;; (declare (dynamic-extent arguments))
610 ;;; (apply #'send-method (amqp:basic.publish class :exchange (getf arguments :exchange)) class arguments))
611
612 ;;;
613 ;;; amqp:access
614
615 #+(or )
616 (progn ;; this conflicts with the class for request methods.
617 ;; ?? change method classes to *-method?
618 (def-ensure-method (amqp:access amqp:request) )
619 (def-ensure-method (amqp:access amqp:request-ok)))
620
621
622 ;;; basic
623
624 (def-ensure-method (amqp:basic amqp:qos) )
625 (def-ensure-method (amqp:basic amqp:qos-ok) )
626 (def-ensure-method (amqp:basic amqp:consume) )
627 (def-ensure-method (amqp:basic amqp:consume-ok) )
628 (def-ensure-method (amqp:basic amqp:cancel) )
629 (def-ensure-method (amqp:basic amqp:cancel-ok) )
630 (def-ensure-method (amqp:basic amqp:publish) )
631 (def-ensure-method (amqp:basic amqp:return) )
632 (def-ensure-method (amqp:basic amqp:deliver) )
633 (def-ensure-method (amqp:basic amqp:get) )
634 (def-ensure-method (amqp:basic amqp:get-ok) )
635 (def-ensure-method (amqp:basic amqp:get-empty) )
636 (def-ensure-method (amqp:basic amqp:ack) )
637 (def-ensure-method (amqp:basic amqp:reject) )
638 (def-ensure-method (amqp:basic amqp:recover-async) )
639 (def-ensure-method (amqp:basic amqp:recover) )
640 (def-ensure-method (amqp:basic amqp:recover-ok) )
641
642 (defmethod shared-initialize ((instance amqp:basic) (slots t) &rest args
643 &key content-type content-encoding
644 (context (bound-slot-value instance 'context))
645 (channel context)
646 (mime-type (when channel (device-content-type channel)))
647 (body nil body-s) (body-size nil) headers
648 (package *package*))
649 "Initialize a basic class by augmenting the content type/encoding from the
650 respective channel's values and coercing them to their respective
651 specifications. the effect is that - even after defaulting from the channel they must both
652 be strings"
653
654 (declare (dynamic-extent args))
655 (assert-argument-types shared-initialize
656 (channel amqp:channel)
657 (content-type (or null string))
658 (content-encoding (or null string)))
659 (unless content-type
660 (setf content-type (if channel
661 (string (type-of (channel-content-type channel)))
662 "")))
663 (unless content-encoding
664 (setf content-encoding (if channel
665 (string (or (mime-type-charset (channel-content-type channel)) ""))
666 "")))
667 (setf mime-type (mime-type content-type))
668 (unless (string-equal content-encoding (mime-type-charset mime-type))
669 (setf mime-type (clone-instance mime-type :charset content-encoding)))
670
671 ;; if given a body, but no body size, try to figure it out.
672 ;; if that's not possible, indicate continued in the header
673 (when body-s
674 (assert (typep mime-type 'mime:*/*) ()
675 "Supplied body requires a content type.")
676 (unless body-size
677 ;; try to determine the size
678 (setf body-size (channel-compute-body-size channel body mime-type))
679 (etypecase body-size
680 (null
681 (setf (getf headers :transfer-encoding) "chunked")
682 (setf body-size (device-buffer-length channel)))
683 (integer
684 (remf headers :transfer-encoding))))
685 (unless (getf headers :package)
686 (setf (getf headers :package) (package-name *package*)))
687 (unless (getf headers :element-type)
688 (multiple-value-bind (concrete effective-type match-p)
689 (canonical-element-type channel body (device-element-type channel))
690 (declare (ignore concrete))
691 (assert match-p ()
692 "Supplied body type is invalid for channel: ~s, ~s"
693 (type-of body) (device-element-type channel))
694 (assert (eq (find-symbol (string effective-type) package)
695 effective-type) ()
696 "Invalid type x package combination: ~s, ~s."
697 effective-type package)
698 (setf (getf headers :element-type) (string effective-type)))))
699
700 (apply #'call-next-method instance slots
701 :mime-type mime-type ; always reset this upon initialization
702 :content-type content-type
703 :content-encoding content-encoding
704 :body-size body-size
705 :headers headers
706 args))
707
708
709 (defgeneric channel-compute-body-size (channel object encoding)
710 (:method ((channel amqp:channel) (object null) (encoding mime:*/*))
711 0)
712 (:method ((channel amqp:channel) (object function) (encoding mime:*/*))
713 nil)
714 (:method ((channel amqp:channel) (object list) (encoding mime:*/*))
715 nil)
716
717 (:method ((channel amqp:channel) (object string) (encoding mime:text/*))
718 (multiple-value-bind (decoder encoder)
719 (compute-charset-codecs encoding)
720 (declare (ignore decoder))
721 (let ((bytes 0))
722 ;; use the prospective encoder to count bytes
723 (flet ((count-byte (stream byte)
724 (declare (ignore stream byte))
725 (incf bytes)))
726 (declare (dynamic-extent #'count-byte))
727 (do ((i 0 (1+ i)))
728 ((>= i (length object)))
729 (funcall encoder (char object i) #'count-byte nil)))
730 bytes)))
731
732 (:method ((channel amqp:channel) (object vector) (encoding mime:application/*))
733 ;;!!! todo: binary codecs:
734 ;; this supports multi-byte binary vectors even though the i/o operator do not
735 (let* ((type (array-element-type object))
736 (element-size (typecase type
737 (symbol )
738 (cons (case (first type)
739 ((signed-byte unsigned-byte)
740 (ceiling (second type) 8)))))))
741 (when element-size (* (length object) element-size))))
742
743 (:method ((channel amqp:channel) (object standard-object) (encoding mime:application/*))
744 nil))
745
746
747 (defmethod mime:mime-type ((basic amqp:basic) &rest args)
748 (declare (ignore args))
749 (class-mime-type basic))
750
751
752 (defgeneric canonical-element-type (channel concrete-type abstract-typ)
753
754 (:method ((channel t) (object symbol) (element-type symbol))
755 (cond ((subtypep object element-type)
756 ;; ok, if there is a subtype relation
757 (values object element-type t))
758 ((and (eq object element-type) (fboundp element-type))
759 (values element-type (fdefinition element-type) t))))
760
761 (:method ((channel t) (object symbol) (element-type (eql 'character)))
762 (canonical-element-type channel object 'string))
763
764 (:method ((channel t) (object (eql 'cons)) (element-type (eql 'list)))
765 (values 'list 'list t))
766
767 (:method ((channel t) (object (eql 'vector)) (element-type (eql 'integer)))
768 (canonical-element-type channel object 'vector))
769
770 (:method ((channel t) (object cons) (element-type t))
771 (if (symbolp (first object))
772 (case (first object)
773 ((array simple-array)
774 (case (second object)
775 (character (canonical-element-type channel 'string element-type))
776 (t (canonical-element-type channel 'vector element-type))))
777 (t
778 (canonical-element-type channel (first object) element-type)))
779 (canonical-element-type channel 'list element-type)))
780
781 (:method ((channel t) (object t) (element-type t))
782 (canonical-element-type channel (type-of object) element-type))
783
784 (:method ((channel t) (operator function) (element-type t))
785 (values operator element-type t))
786
787 (:method ((channel t) (object symbol) (element-type t))
788 nil))
789
790 ;;; (canonical-element-type nil 'ctring 'character)
791 ;;; (canonical-element-type nil "string" 'character)
792
793 (defgeneric basic-headers (basic)
794 (:documentation "Returns the basic instance's headers."))
795
796 (defgeneric basic-no-ack (basic)
797 (:documentation "Returns the basic instance's acknowledgement setting"))
798
799 #+(or ) ;; mcl's clos implements method dispatch by hand
800 (progn
801 (defclass subtyping-generic-function (standard-generic-function)
802 (:metaclass c2mop:funcallable-standard-class))
803
804
805 (defgeneric canonical-element-type (channel concrete-type abstract-type)
806 (:documentation "Recognizes combinations based on subtype behavior and
807 return the maximal permited type to encode in the message. If if null is
808 returned, the type is not supported.")
809 (:generic-function-class subtyping-generic-function)
810
811 (:method ((channel t) (concrete character) (abstract string))
812 'string)
813 (:method ((channel t) (concrete string) (abstract string))
814 'string)
815
816 (:method ((channel t) (concrete integer) (abstract vector))
817 'vector)
818 (:method ((channel t) (concrete string) (abstract string))
819 'vector)
820
821 (:method ((channel t) (concrete standard-object) (abstract standard-object))
822 concrete)
823 (:argument-precedence-order abstract-type concrete-type channel))
824
825 (defmethod compute-applicable-methods ((function subtyping-generic-function) arguments)
826 (let ((methods (c2mop:generic-function-methods function)))
827 (print arguments)
828 (print methods)
829 (flet ((matches (method)
830 (let ((specializers (c2mop:method-specializers method)))
831 (and (typep (first arguments) (first specializers))
832 (every #'(lambda (atype mtype) (subtypep atype mtype))
833 (rest arguments)
834 (rest specializers)))))
835 (preceeds (s1 s2)
836 (subtypep s1 s2)))
837 (let ((length (length arguments)))
838 (setf methods (remove-if-not #'matches methods)) (print methods)
839 (dotimes (i length)
840 (setf methods (sort methods #'preceeds
841 :key #'(lambda (method)
842 (elt (c2mop:method-specializers method)
843 (- length i)))))))
844 methods))))
845
846 ;;; (compute-applicable-methods #'canonical-element-type (list nil 'character 'string))
847 ;;; in mcl, this works, but it's never called
848
849 ;;;
850 ;;; channel
851
852 ;;; interface classes
853 (def-ensure-object (amqp:channel amqp:basic) ())
854 (def-ensure-object (amqp:channel amqp:exchange) (exchange))
855 (def-ensure-object (amqp:channel amqp:file) ())
856 (def-ensure-object (amqp:channel amqp:queue) (queue))
857 (def-ensure-object (amqp:channel amqp:stream) ())
858 (def-ensure-object (amqp:channel amqp:tx) ())
859 ;;; internal classes
860 (def-ensure-object (amqp:channel amqp:body) ())
861 (def-ensure-object (amqp:channel amqp:header) ())
862 ;;; it's own connection (for channel-zero)
863 (defmethod amqp:ensure-object ((channel amqp:channel) (type (eql 'amqp:connection)) &key)
864 (channel-connection channel))
865 (defmethod amqp:channel.connection ((channel amqp:channel) )
866 (channel-connection channel))
867 ;;; itself
868 (defmethod amqp:ensure-object ((channel amqp:channel) (type (eql 'amqp:channel)) &key)
869 channel)
870
871 (def-ensure-method (amqp:channel amqp:open))
872 (def-ensure-method (amqp:channel amqp:open-ok))
873 (def-ensure-method (amqp:channel amqp:flow))
874 (def-ensure-method (amqp:channel amqp:flow-ok))
875 (def-ensure-method (amqp:channel amqp:alert))
876 (def-ensure-method (amqp:channel amqp:close))
877 (def-ensure-method (amqp:channel amqp:close-ok))
878
879 (defmethod initialize-instance ((instance amqp:channel) &rest initargs
880 &key
881 context (connection context) uri
882 exchange queue path)
883 (declare (dynamic-extent initargs))
884 (assert-argument-type initilialize-instance connection amqp:connection)
885
886 (apply #'call-next-method instance
887 :channel instance
888 :context connection
889 :connection connection
890 :uri (if uri
891 (merge-uris uri (connection-uri connection))
892 (merge-uris (uri (list :plist (list :exchange exchange :queue queue :path path)))
893 (connection-uri connection)))
894 initargs))
895
896 (defmethod print-object ((instance amqp:channel) stream)
897 (with-slots (uri number) instance
898 (print-unreadable-object (instance stream :identity t :type t)
899 (write-char #\[ stream)
900 (print-object uri stream)
901 (format stream "].~d" number))))
902
903
904 (defmethod object-channel ((channel amqp:channel))
905 channel)
906
907 (defgeneric channel-state (channel)
908 (:documentation "The channel-specific state accessors delegate
909 to the class methods. These esist as an alternative prrotocol hook.")
910
911 (:method ((channel amqp:channel))
912 (class-state channel)))
913
914 (defgeneric (setf channel-state) (state channel)
915 (:documentation "The channel-specific state accessors delegate
916 to the class methods. These esist as an alternative prrotocol hook.")
917
918 (:method (state (channel amqp:channel))
919 (setf (class-state channel) state)))
920
921 (defmethod channel-content-type ((channel amqp:channel))
922 (device-content-type channel))
923
924 (defmethod (setf channel-content-type) (type (channel amqp:channel))
925 (setf (device-content-type channel) type))
926
927
928 ;;; nb. the channel number is one-based! so channel[0] is always the connection's.
929
930 (defmethod connection-frame-max ((channel amqp:channel))
931 (with-slots (connection) channel
932 (if connection
933 (connection-frame-max connection)
934 0)))
935
936
937 (defmethod channel-condition (channel type)
938 (or (assoc type (channel-conditions channel))
939 (first (push (make-condition type :channel channel)
940 (channel-conditions channel)))))
941
942
943 (defgeneric connected-channel-p (channel)
944 (:documentation "Indicate whether the channel is still bound to a
945 connection.")
946
947 (:method ((channel amqp:channel))
948 (let ((connection (channel-connection channel)))
949 (and connection
950 (integerp (channel-number channel))
951 (eq (connection.channel connection :number (channel-number channel))
952 channel)))))
953
954 (defgeneric channel-track (channel)
955 (:documentation "Defines a default method for protocols which have none.")
956
957 (:method ((class t))
958 0))
959
960 (defgeneric channel-number (channel)
961 (:documentation "Definee a method to delegate from channeled classes to their channel.")
962
963 (:method ((class amqp-channeled-object))
964 (let ((channel (object-channel class)))
965 (if channel
966 (channel-number channel)
967 (error "Class has no channel: ~s." class)))))
968
969
970 ;;;
971 ;;; amqp:cluster - nothing yet
972
973 ;;;
974 ;;; amqp:connection
975
976 (def-ensure-object (amqp:connection amqp:channel) ((number fixnum))
977 (:class.object ((connection amqp:connection) &rest initargs &key number &allow-other-keys)
978 "Ensure a (connection channel), cached by number."
979 (declare (dynamic-extent initargs))
980 (etypecase number
981 ((member t nil)
982 (bt:with-lock-held ((connection-lock connection))
983 (apply #'amqp::connection.channel connection
984 :number (or (position nil (get-connection-channels connection))
985 (error "Connection full."))
986 initargs)))
987 (fixnum
988 (assert-condition (< number (length (get-connection-channels connection)))
989 connection.channel)
990 (let* ((instance (connection.channel connection :number number)))
991 (if instance
992 (if (cddr initargs)
993 (apply #'reinitialize-instance instance initargs)
994 instance)
995 (setf (connection.channel connection :number number)
996 (apply #'make-instance (class-find-object-class connection 'amqp:channel)
997 :context connection
998 initargs))))))))
999
1000
1001 (def-ensure-object (amqp:connection amqp:heartbeat) ())
1002
1003 (def-ensure-method (amqp:connection amqp:start))
1004 (def-ensure-method (amqp:connection amqp:start-ok))
1005 (def-ensure-method (amqp:connection amqp:secure))
1006 (def-ensure-method (amqp:connection amqp:secure-ok))
1007 (def-ensure-method (amqp:connection amqp:tune))
1008 (def-ensure-method (amqp:connection amqp:tune-ok))
1009 (def-ensure-method (amqp:connection amqp:open))
1010 (def-ensure-method (amqp:connection amqp:open-ok))
1011 (def-ensure-method (amqp:connection amqp:redirect))
1012 (def-ensure-method (amqp:connection amqp:close))
1013 (def-ensure-method (amqp:connection amqp:close-ok))
1014
1015
1016 (defmethod shared-initialize ((instance amqp:connection) (slots t) &key
1017 (threaded nil))
1018 (with-slots (lock thread amqp-channels) instance
1019 (unless (slot-boundp instance 'lock)
1020 (setf lock (bt:make-lock (make-instance-tag instance))))
1021 (unless (and (slot-boundp instance 'amqp-channels) amqp-channels)
1022 (setf-connection-channels (make-array (1+ *max-channels*) :initial-element nil) instance))
1023 (unless (slot-boundp instance 'thread) ; happens if reinitializing
1024 (setf thread
1025 (ecase threaded
1026 ((nil) )
1027 ((t)
1028 ;; find the current socket io handler create a new thread
1029 (setf thread (make-connection-thread instance))))))
1030
1031 ;; nb. may not yet be able to create channel-0, since the class may be abstract
1032 (call-next-method)))
1033
1034 (defmethod initialize-instance ((instance amqp:connection) &rest initargs &key
1035 uri
1036 (remote-host (unless uri (error "uri or remote-host required")))
1037 (remote-port *standard-port*))
1038 (declare (dynamic-extent initargs))
1039 (flet ((make-name (tag)
1040 (with-output-to-string (ss)
1041 (print-unreadable-object (instance ss :identity t :type t) (princ tag ss))))
1042 (generate-output-frame () (make-output-frame instance))
1043 (generate-input-frame () (make-input-frame instance)))
1044 (with-slots (free-input-frames free-output-frames read-frames encoded-frames)
1045 instance
1046 (setf free-input-frames
1047 (make-instance 'locked-stack :name (make-name :free-input-frames)
1048 :if-empty #'generate-input-frame))
1049 (setf read-frames
1050 (make-instance 'locked-queue :name (make-name :read-frames)))
1051 (setf free-output-frames
1052 (make-instance 'locked-stack :name (make-name :free-output-frames)
1053 :if-empty #'generate-output-frame))
1054 (setf encoded-frames
1055 (make-instance 'locked-queue :name (make-name :encoded-frames)))
1056
1057 (apply #'call-next-method instance
1058 :uri (if uri
1059 (uri uri)
1060 (uri 'amqp :scheme :amqp :host remote-host :port remote-port))
1061 initargs))))
1062
1063
1064 (defmethod object-connection ((connection amqp:connection))
1065 connection)
1066
1067
1068 (defmethod object-channel ((connection amqp:connection))
1069 "Any connection commands use channel 0"
1070 (connection.channel connection :number 0))
1071
1072
1073 (defgeneric connection-state (connection)
1074 (:documentation "The connection-specific state accessors delegate
1075 to the class methods. These esist as an alternative prrotocol hook.")
1076
1077 (:method ((connection amqp:connection))
1078 (class-state connection)))
1079
1080 (defgeneric (setf connection-state) (state connection)
1081 (:documentation "The connection-specific state accessors delegate
1082 to the class methods. These exist as an alternative protocol hook.")
1083
1084 (:method (state (connection amqp:connection))
1085 (setf (class-state connection) state)))
1086
1087
1088 (defmethod channel-number ((class amqp:connection))
1089 0)
1090
1091 (defmethod connection-host ((connection amqp:connection))
1092 (uri-host (connection-uri connection)))
1093
1094 (defmethod connection-port ((connection amqp:connection))
1095 (uri-port (connection-uri connection)))
1096
1097 (defmethod connection-virtual-host ((connection amqp:connection))
1098 (uri-virtual-host (connection-uri connection)))
1099
1100
1101 (defgeneric connect-channel (connection channel)
1102 (:method ((connection t) (channel t))
1103 channel)
1104
1105 (:method ((connection amqp:connection) (channel amqp:channel))
1106 (with-slots (free-input-frames free-output-frames read-frames encoded-frames)
1107 channel
1108
1109 (call-next-method)
1110
1111 ;; share queues
1112 (setf free-input-frames (device-free-input-frames connection)
1113 free-output-frames (device-free-output-frames connection)
1114 read-frames (device-read-frames connection)
1115 encoded-frames (device-encoded-frames connection))
1116 ;; flush anything from an earlier iaancarnation
1117 (loop (when (null (get-read-frame channel :wait nil)) (return)))
1118 ;; and initialize buffers
1119 (device-initialize-buffers channel)
1120 channel)))
1121
1122
1123 (defgeneric disconnect-channel (connection channel)
1124 (:method ((connection null) (channel amqp:channel))
1125 nil)
1126 (:method ((connection amqp:connection) (channel amqp:channel))
1127 (slot-makunbound channel 'free-input-frames)
1128 (slot-makunbound channel 'free-output-frames)
1129 (slot-makunbound channel 'read-frames)
1130 (slot-makunbound channel 'encoded-frames)
1131 (when (connected-channel-p channel)
1132 (setf (connection.channel connection :number (channel-number channel)) nil))
1133 (setf-channel-number 0 channel)
1134 (setf (object-context channel) nil)))
1135
1136
1137
1138 ;;;
1139 ;;; amqp:dtx
1140
1141 ;;; class: dtx [id method-names]
1142 ;;; dtx.select
1143 ;;; dtx.select-ok
1144 ;;; dtx.start [dtx-identifier]
1145 ;;; dtx.start-ok
1146
1147
1148 ;;;
1149 ;;; amqp:exchange
1150
1151 (def-ensure-method (amqp:exchange amqp:declare))
1152 (def-ensure-method (amqp:exchange amqp:declare-ok))
1153 (def-ensure-method (amqp:exchange amqp:delete))
1154 (def-ensure-method (amqp:exchange amqp:delete-ok))
1155
1156
1157 (defgeneric amqp:exchange-exchange (object)
1158 (:documentation "The exchange name accessor is extended with a string method to allow to
1159 coerce arguments to a string value in request/response operators.")
1160
1161 (:method ((exchange-name string))
1162 exchange-name))
1163
1164
1165 ;;;
1166 ;;; amqp:file
1167
1168 (def-ensure-method (amqp:file amqp:qos))
1169 (def-ensure-method (amqp:file amqp:qos-ok))
1170 (def-ensure-method (amqp:file amqp:consume))
1171 (def-ensure-method (amqp:file amqp:consume-ok))
1172 (def-ensure-method (amqp:file amqp:cancel))
1173 (def-ensure-method (amqp:file amqp:cancel-ok))
1174 (def-ensure-method (amqp:file amqp:open))
1175 (def-ensure-method (amqp:file amqp:open-ok))
1176 (def-ensure-method (amqp:file amqp:stage))
1177 (def-ensure-method (amqp:file amqp:publish))
1178 (def-ensure-method (amqp:file amqp:return))
1179 (def-ensure-method (amqp:file amqp:deliver))
1180 (def-ensure-method (amqp:file amqp:ack))
1181 (def-ensure-method (amqp:file amqp:reject))
1182
1183
1184 ;;;
1185 ;;; amqp:link - nyi
1186
1187
1188 ;;;
1189 ;;; amqp:object
1190
1191 (defmethod reinitialize-instance :before ((instance amqp-connected-object) &key class)
1192 "When reinitialized from a frame, check that the class is correct."
1193 (when (and class (not (typep instance class)))
1194 (amqp:frame-error :channel (object-channel instance)
1195 :message-string "Attempt to reinitialize from a different frame class: ~s, ~s."
1196 :message-arguments (list instance class))))
1197
1198 (defmethod connection-frame-size ((object amqp-connected-object))
1199 (connection-frame-size (object-connection object)))
1200
1201
1202
1203 ;;;
1204 ;;; amqp:queue
1205
1206 (def-ensure-method (amqp:queue amqp:declare))
1207 (def-ensure-method (amqp:queue amqp:declare-ok))
1208 (def-ensure-method (amqp:queue amqp:bind))
1209 (def-ensure-method (amqp:queue amqp:bind-ok))
1210 (def-ensure-method (amqp:queue amqp:unbind))
1211 (def-ensure-method (amqp:queue amqp:unbind-ok))
1212 (def-ensure-method (amqp:queue amqp:purge))
1213 (def-ensure-method (amqp:queue amqp:purge-ok))
1214 (def-ensure-method (amqp:queue amqp:delete))
1215 (def-ensure-method (amqp:queue amqp:delete-ok))
1216
1217 (defgeneric amqp:queue-queue (object)
1218 (:documentation "The queue name accessor is extended with a string method to allow to
1219 coerce arguments to a string value in request/response operators.")
1220 (:method ((queue-name string))
1221 queue-name))
1222
1223
1224 ;;;
1225 ;;; amqp:session - nyi
1226
1227 ;;;
1228 ;;; amqp:stream
1229
1230 (def-ensure-method (amqp:stream amqp:qos))
1231 (def-ensure-method (amqp:stream amqp:qos-ok))
1232 (def-ensure-method (amqp:stream amqp:consume))
1233 (def-ensure-method (amqp:stream amqp:consume-ok))
1234 (def-ensure-method (amqp:stream amqp:cancel))
1235 (def-ensure-method (amqp:stream amqp:cancel-ok))
1236 (def-ensure-method (amqp:stream amqp:publish))
1237 (def-ensure-method (amqp:stream amqp:return))
1238 (def-ensure-method (amqp:stream amqp:deliver))
1239
1240
1241 ;;;
1242 ;;; amqp:tx
1243
1244 (def-ensure-method (amqp:tx amqp:select))
1245 (def-ensure-method (amqp:tx amqp:select-ok))
1246 (def-ensure-method (amqp:tx amqp:commit))
1247 (def-ensure-method (amqp:tx amqp:commit-ok))
1248 (def-ensure-method (amqp:tx amqp:rollback))
1249 (def-ensure-method (amqp:tx amqp:rollback-ok))
1250
1251
1252 ;;;
1253
1254 ;;; class: test [id method-names]
1255 ;;; test.integer [integer-1 integer-2 integer-3 integer-4 operation]
1256 ;;; test.integer-ok [result]
1257 ;;; test.string [string-1 string-2 operation]
1258 ;;; test.string-ok [result]
1259 ;;; test.table [table integer-op string-op]
1260 ;;; test.table-ok [integer-result string-result]
1261 ;;; test.content
1262 ;;; test.content-ok [content-checksum]
1263
1264 ;;;
1265 ;;; amqp:tunnel
1266
1267 ;;; tunnel.request [meta-data]
1268
1269
1270
1271 ;;;
1272 ;;; decoding definition operators
1273
1274
1275 (defgeneric method-argument-offset (method)
1276 (:documentation "Contributes to codecs for version-specific methods,
1277 as the method body layout varies."))
1278
1279 (defgeneric class-property-offset (class)
1280 (:documentation "Contributes to codecs for version-specific classes,
1281 if the class content header layout varies.")
1282
1283 (:method ((class amqp:object))
1284 12))
1285
1286
1287
1288
1289
1290
1291 ;;; frame operations:
1292 ;;; allocate, resource-manage, read, and write immediately through
1293 ;;; a connection or indirectly through a channel.
1294
1295
1296 (defgeneric claim-input-frame (device)
1297 (:documentation "Returns a free input frame or creates a new one.")
1298
1299 (:method ((channel amqp:channel))
1300 (let ((frame (dequeue (device-free-input-frames channel))))
1301 (setf (frame-channel-number frame) (channel-number channel))
1302 frame))
1303
1304 (:method ((connection amqp:connection))
1305 (let ((frame (dequeue (device-free-input-frames connection))))
1306 (setf (frame-channel-number frame) 0)
1307 frame)))
1308
1309
1310
1311 (defgeneric claim-output-frame (connection)
1312 (:documentation "Returns a free input frame or creates a new one.")
1313
1314 (:method ((channel amqp:channel))
1315 (flet ((make-channel-frame ()
1316 (make-output-frame channel)))
1317 (declare (dynamic-extent #'make-channel-frame))
1318 (let ((frame (dequeue (device-free-output-frames channel)
1319 :if-empty #'make-channel-frame)))
1320 (setf (frame-channel-number frame) (channel-number channel))
1321 frame)))
1322
1323 (:method ((class amqp-channeled-object))
1324 (claim-output-frame (object-channel class)))
1325
1326 (:method ((connection amqp:connection))
1327 (flet ((make-connection-frame ()
1328 (make-output-frame connection)))
1329 (declare (dynamic-extent #'make-connection-frame))
1330 (let ((frame (dequeue (device-free-output-frames connection)
1331 :if-empty #'make-connection-frame)))
1332 (setf (frame-channel-number frame) 0)
1333 frame))))
1334
1335
1336
1337 ;;;
1338 ;;; resolving an abstract class to the version specific one
1339
1340 (defgeneric amqp:find-protocol-class (abstract-class version &key if-does-not-exist)
1341 (:documentation "GIven an abstract protocol class and a version,
1342 examine the desigated package for a class with the same name.")
1343
1344 (:method ((instance amqp:object) version &rest args)
1345 (apply #'amqp:find-protocol-class (type-of instance) version args))
1346
1347 (:method ((designator t) (version symbol) &rest args)
1348 (apply #'amqp:find-protocol-class designator
1349 (or (find-package version)
1350 (error "no protocol implementation for version: ~s" version))
1351 args))
1352
1353 (:method ((class-name symbol) (package package) &key (if-does-not-exist :error))
1354 (let ((found
1355 (or (if (eq package (symbol-package class-name))
1356 (find-class class-name if-does-not-exist)
1357 (let ((symbol (find-symbol (symbol-name class-name) package)))
1358 (and symbol (find-class symbol nil))))
1359 (ecase if-does-not-exist
1360 ((nil) nil)
1361 (:error (error "no protocol implementation for class in version: ~s, ~s"
1362 class-name (package-name package)))))))
1363 (when found
1364 (c2mop:finalize-inheritance found)
1365 (assert (find 'amqp:object (closer-mop:class-precedence-list found) :key #'class-name) ()
1366 "Class is not a protocol class: ~s." class-name)
1367 found))))
1368
1369
1370 #+(or ) ;; a version which worked by searching class specializations...
1371 (defgeneric amqp:find-protocol-class (abstract-class version &key if-does-not-exist)
1372 (:documentation "GIven an abstract protocol class and a version,
1373 return the most specialized class with the highest matching version.")
1374
1375 (:method ((abstract-class symbol) version &rest args)
1376 (apply #'amqp:find-protocol-class (find-class abstract-class) version args))
1377
1378 (:method ((instance amqp:object) version &rest args)
1379 (apply #'amqp:find-protocol-class (class-of instance) version args))
1380
1381 (:method ((abstract-class class) version &key (if-does-not-exist :error))
1382 (let ((found nil))
1383 (labels ((walk-subclasses (class)
1384 (when (and (typep class 'amqp:class-class)
1385 (null (closer-mop:class-direct-subclasses class)))
1386 (unless (version-lessp version (class-protocol-version class))
1387 (cond ((null found)
1388 (setf found class))
1389 ((equalp (class-protocol-version found)
1390 (class-protocol-version class))
1391 ;; replace the more abstract with the more specific
1392 (unless (subtypep (class-name class) (class-name found))
1393 (warn "duplicate protocol implementations for version: ~s"
1394 (class-protocol-version found)))
1395 (setf found class))
1396 ((version-lessp (class-protocol-version found)
1397 (class-protocol-version class))
1398 (setf found class)))))
1399 (map nil #'walk-subclasses (closer-mop:class-direct-subclasses class))))
1400 (walk-subclasses abstract-class)
1401 (or found
1402 (ecase if-does-not-exist
1403 ((nil) nil)
1404 (:error (error "no protocol implementation for version: ~s" version))))))))
1405

  ViewVC Help
Powered by ViewVC 1.1.5