/[de-setf-amqp]/device-level.lisp
ViewVC logotype

Contents of /device-level.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (hide annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 56135 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 janderson 3 ;;; -*- Package: de.setf.amqp.implementation; -*-
2    
3     (in-package :de.setf.amqp.implementation)
4    
5     (document :file
6     (description "This file implements device-level support for streams based on AMQP connections as part of the
7     'de.setf.amqp' library.")
8     (copyright
9     "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
10     "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
11     of the GNU Affero General Public License as published by the Free Software Foundation.
12    
13     'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
14     implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15     See the Affero General Public License for more details.
16    
17     A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
18     If not, see the GNU [site](http://www.gnu.org/licenses/).")
19    
20     (long-description "Device-level operations are managed by channel instances, which delegete in turn to
21     connection-wrapped tcp sockets. The AMQP standards permit this combination to behave in either of (at
22     least) two ways. On one hand, one can use all allowed protocoal variability and treat the socket
23     connection as a multiplexed multi-channel stream. In this mode each thread would instantiate its own
24     channel on the shared connection. On the other hand, is also possible to constrain the connection's use
25     to dedicate it to a single channel - and thereby to a single thread.
26    
27     The multiplexed mechanism sets a much lower demand for system resources, but increases the processing
28     for a single channel and constrains the i/o operations to block mode. A dedicated mechanims requires
29     more sockets, but can dedicate the buffering to a single thread, which permits more efficient i/o
30     operations.
31    
32     When the socket is multiplexed, then it is possible that content frames for one channel will be
33     interleaved with frames for another channel, which means it is not possible to guarantee that a given
34     socket read operation will contain an expected content frame. The multiplexing requires per-channel
35     queuing and processing upon delivery, whereby the actual buffer is not to be predicted. When the socket
36     is dedicated, read operatation can target a given buffer, since the intra-channel ordering constraints
37     require that content be delivered in-order and uninterrupted. The protocol meta-data can be read and
38     processed in-line ro parameterize the read and recognize exceptions, while data i/o targets given
39     buffers. The 1.0pr documentintroduces additional session and context for frame processing, but those are
40     not considered here. (see amqp0-8.pdf,p.56 amqp0-9.pdf,p.35)
41    
42     In the implementation below, the two amqp classes - connection and channel, are each specialized from
43     simple-stream and implement the standard interface. The required device operators are implemented for
44     both amqp:channel and amqp:connection, despite that stream operations are limited to channels while
45     connections are to be used only to consolidate frame-based operations with the network stream. The
46     principle operators are
47    
48     device-open (stream #-sbcl slots initargs)
49     device-close (stream abort)
50     device-read (stream buffer start end blocking)
51     device-clear-input (stream buffer-only)
52     device-write (stream buffer start end blocking)
53     device-clear-output (stream)
54     device-flush (device)
55     device-read-content (device &rest content-arguments)
56     device-write-content (device body &rest content-arguments)
57    
58     The device-open operator returns an instance which can be used directly. In the case of a channel, it
59     connects to the respective connection and prepares any exchange/queue specified in uri initialization
60     argument. In that state it can be supplied to publish, consume, or get operations to initiate stream
61     i/o, or to transfer isolated objects. In the case of a connection, a socket stream is opened to the
62     designated server and the protocol handshake and open-connection phases are completed. (see
63     open-connection and negotiate-client-connection) In that state, one can construct and use channels. In the
64     case of a connection, it should be used only to make channels. Note that the connection class is adjusted to
65     match that of the protocol version which it negotiates with the remote broker. That is, a
66    
67     (make-instance 'amqp:connection :uri \"amqp://guest:guest@localhost/\")
68    
69     will return a connection sub-class. This places constraints on the effective methods for `device-open` and the
70     various constituents of the standard instantiation protocol for `simple-stream`.
71     The respective effective methods for the base implementation look something like
72    
73     initialize-instance : (CONNECTION) (STANDARD-OBJECT)
74     shared-initialize (in mcl) : (CONNECTION T) (AMQP-DEVICE T) :AFTER (SIMPLE-STREAM T) (STANDARD-OBJECT T)
75     reinitialize-instance : (STANDARD-OBJECT)
76     device-open (in sbcl) : (CONNECTION T) (AMQP-SOCKET-DEVICE T) (AMQP-DEVICE T) (SIMPLE-STREAM T) (SLOT-OBJECT T)
77    
78     The exact form depends on the run-time (cf. `standard-object` v/s `sloto-object`), but all share the topology,
79     that no next method call follows the class change. Should protocol-specific specialization be necessary, any
80     specialized operation subsequent to the change would need to be independent of these effective methods.
81    
82     The device-read and device-write operators are defined at the respective levels to delegate a channel's
83     operations to the respective connection and to perform connection i/o through the socket stream. When
84     channel operations with an explicit buffer, are intended to implement data stream-based data transfer
85     for a channel which has already initiated a message exchange and observe the body size constraints
86     specified fro the message. Where the size is known at the outset, the body-size and body-position govern
87     eof behaviour. Where the size was unkown, the channel implemented chunked transfer in terms of maxmial
88     sized messages, and relies on device-flush and device-clear-input to indicate and manage the effective
89     eof.
90    
91     The state operators for position and length return meaningful information for fixed content sizes only
92     and have no effect to modify channel state.
93    
94     For more information on simple streams, see Franz's documentation[3] and the sbcl implementation[4] of same,
95     as well as the discussions of the the alternative fu interface.[5]
96    
97     ---
98     [1]: https://jira.amqp.org/confluence/download/attachments/720900/amqp0-8.pdf?version=1
99     [2]: https://jira.amqp.org/confluence/download/attachments/720900/amqp0-9-1.pdf?version=1
100     [3]: http://www.franz.com/support/documentation/current/doc/streams.htm
101     [4]: http://sbcl.cvs.sourceforge.net/viewvc/sbcl/sbcl/contrib/sb-simple-streams/
102     [5]: http://paste.lisp.org/display/65229"))
103    
104    
105    
106    
107     (defmacro assert-device-state (device state &optional op)
108     (let ((state (or (find-symbol (string state) :amqp.s)
109     (error "invalid state indicator: ~s." state)))
110     (device-state-var (gensym)))
111     `(let ((,device-state-var (device-state ,device)))
112     (assert (typep ,device-state-var ',state) ()
113     "~@[~a: ~]Required device state ~a is not satisfied by ~a."
114     ',op ',state ,device-state-var))))
115    
116    
117     ;;;
118     ;;; channel operations
119    
120    
121     (defmethod device-open ((device amqp:channel) #-sbcl (slots t) initargs)
122     "Prepare a channel by binding it locally to a connection and declaring its
123     exchange and queue on the connection's server. The connection may be provided
124     as an instance to be re-used, or as a server designator, in which case a
125     news connection is opened."
126    
127     (etypecase (device-state device)
128     (amqp.s:open-channel
129     (if (or (stream-input-handle device)
130     (stream-output-handle device))
131     (call-next-method)
132     (destructuring-bind (&key (connection (or (channel-connection device)
133     (error "connection required.")))
134     (input-handle connection)
135     (output-handle connection)
136     &allow-other-keys)
137     initargs
138     (call-next-method)
139     (setf (stream-input-handle device) input-handle)
140     (setf (stream-output-handle device) output-handle)
141    
142     ;; once that has been done, adjust the channel class
143     (let ((connection-channel-class (class-find-object-class connection 'amqp:channel)))
144     (unless (typep device connection-channel-class)
145     (change-class device connection-channel-class)))
146    
147     ;; bind the channel to the connection to obtain the queues and initialize buffers
148     (connect-channel connection device)
149    
150     ;; unless it's the connection's own channel,
151     ;; open the channel w/ the broker
152    
153     (unless (zerop (channel-number device))
154     ;; resolve the channel's identifer relative to the connection - with
155     ;; non-strict handling to allow a scheme.
156     (let* ((uri (merge-uris (channel-uri device) (connection-uri connection) nil nil))
157     (host (uri-host uri)))
158     (setf-device-uri uri device)
159    
160     ;; if the connection is 'for real', that is, if it specifies a host, open the channel
161     (unless (zerop (length host))
162     (amqp:open device)
163     #+(or ) ;; don't do this here:
164     ;; the 0.8r0 channel specializes device-open to get a ticket, which would need to
165     ;; happen before this, as the ticket is an argument.
166     ;; could go in initialize instance
167     (let ((exchange (uri-exchange uri))
168     (queue (uri-queue uri))
169     (object-path (uri-path uri))
170     (cond (exchange
171     (let ((queue (amqp:channel.queue device :queue queue))
172     (exchange (amqp:channel.exchange device :exchange exchange :type "direct")))
173     (amqp:declare queue)
174     (amqp:declare exchange)
175     (amqp:bind queue :exchange exchange :queue queue
176     :routing-key object-path)))
177     (queue
178     ;; if there is no exchange, allow input only
179     (assert (eq (stream-direction device) :input) ()
180     "An exchange must be provided for channel output.")
181     (amqp:declare (amqp:channel.queue device :queue queue)))))))))
182     (setf (device-state device) amqp.s:use-channel)
183     device)))
184     (amqp.s:use-channel
185     (call-next-method))))
186    
187    
188     (defmethod device-close ((device amqp:channel) (abort t))
189     "remove the channel from the connection."
190     (amqp:log :debug device "Close in state: ~s" (channel-state device))
191     (when (open-stream-p device)
192     (cond (abort
193     (call-next-method))
194     (t
195     (let ((initial-state (shiftf (channel-state device) amqp.s:close-channel)))
196     (typecase initial-state
197     ;; if in use, send the close request, the flush it
198     (amqp.s:use-channel
199     (amqp:request-close device)
200     ;; complete and flush the content.
201     (device-flush device t)))
202     (call-next-method))))
203     ;; in any case disconnect
204     (disconnect-channel (channel-connection device) device)))
205    
206    
207     (defmethod device-read ((device amqp:channel) buffer-arg start end blocking)
208     "Channels read a frame at a time through a connection.
209     the connection manages the actual stream and makes frames available as
210     they appear. the specified 'blocking' mode determines whether to
211     wait if there is nothing present."
212     (assert-device-state device use-channel.body.input device-read)
213     (with-slots (buffer buffpos buffer-ptr buf-len body-position body-length) device
214     (cond ((< buffer-ptr 0)
215     -1)
216     ((eql start end) ; interpret blocking
217     (if blocking
218     ;; nothing is read anyway
219     0
220     ;; iff not blocking , see if anything is present or in the read queue
221     (if (or (< buffpos buffer-ptr) (not (collection-empty-p (device-read-frames device))))
222     -3
223     0)))
224     ((>= body-position body-length)
225     (typecase (device-state device)
226     (amqp.s:use-channel.body.input.chunked
227     ;; chunked => start the next message
228     ;; - if the last was a deliver, wait for the next
229     ;; - if it was a get-ok, ask for the next
230     (command-case (device)
231     ((or amqp:get-ok amqp:deliver) ((basic amqp:basic) &rest args)
232     (amqp:log :debug device "chunk continuation: (~s ~s) . ~s"
233     (type-of amqp:method) (type-of basic) args)
234     t)
235     (t ((class t) &rest args)
236     (amqp:log :error device "Unexpected chunk continuation: (~s ~s) . ~s"
237     (type-of amqp:method) (type-of class) args)
238     t))
239     (let ((result (device-read-content-header device)))
240     (amqp:log :debug device "read chunk header: ~s" result))
241     ;; if that succeeds, read the
242     (if (< body-position body-length)
243     ;; just try again
244     (device-read device buffer-arg start end blocking)
245     -1))
246     (t
247     ; not chunked => mark eof
248     (setf buffer-ptr -1))))
249     (buffer-arg
250     ;; if a buffer is provided, use it+bounds together with the devices buffer+bounds
251     ;; to iteratively fill the argument buffer. recurse for more input
252     ;; maintain body-position since the buffer is accepting the content.
253     (let ((total-count 0)
254     (last-device-read 0))
255     (unless end (setf end (length buffer-arg)))
256     (unless start (setf start 0))
257     (loop (unless (> start end) (return))
258     (when (>= buffpos buffer-ptr)
259     (unless (and (< body-position body-length)
260     (plusp (setf last-device-read (device-read device nil 0 nil blocking))))
261     (return)))
262     (let* ((count (min (- end start) (- buffer-ptr buffpos)))
263     (end1 (+ start count))
264     (end2 (+ buffpos count)))
265     (replace buffer-arg buffer :start1 start :end1 end1 :start2 buffpos :end2 end2)
266     (setf start end1
267     buffpos end2)
268     (incf total-count count)
269     (incf body-position count)))
270     (if (zerop total-count)
271     last-device-read
272     total-count)))
273     (t
274     ;; otherwise read a frame buffer
275     (assert (and (zerop start) (or (null end) (= end (length buffer)))) ()
276     "Frame buffer i/o permitted for entire buffers only.")
277     (let* ((frame nil))
278     (loop (setf frame (get-read-frame device :wait blocking))
279     ;; if non-blocking, maybe no frame
280     (if frame
281     (cond ((plusp (frame-size frame))
282     (let* ((data (frame-data frame))
283     (length (frame-size frame)))
284     (rotatef buffer data)
285     (setf-frame-data data frame)
286     (release-frame frame)
287     (setf buffpos 0)
288     (setf buffer-ptr length)
289     (setf buf-len (length buffer)) ; could change iff possible to re-tune
290     (assert (<= buffer-ptr buf-len) ()
291     "Invalid buffer sizes: ptr ~d, len ~d." buffer-ptr buf-len)
292     (return length)))
293     (t
294     ;; if the frame is a zero-length frame, always skip it
295     (release-frame frame)
296     ;; iff chunking, also end chunking, indicate end of the body and
297     ;; skip next padding frame as well.
298     (typecase (device-state device)
299     (amqp.s:use-channel.body.input.chunked
300     (setf (device-state device) amqp.s:use-channel.body.input)
301     (cond ((setf frame (get-read-frame device :wait t)) ; resolve it now
302     (assert (and (eq (frame-type-class-name frame) 'amqp:body)
303     (= (frame-size frame) (- body-length body-position))) ()
304     "Invalid pad frame: ~s." frame)
305     (setf body-length body-position) ; 'truncate' the body
306     (release-frame frame)
307     (return (setf buffer-ptr -1)))
308     (t
309     (return (setf buffer-ptr -1))))))))
310     (return (when blocking (setf buffer-ptr -1))))))))))
311    
312    
313     (defmethod device-write ((device amqp:channel) buffer-arg start end (blocking t))
314     "Channels read a frame at a time through a connection.
315     The connection manages the actual stream and writes frames as required. The
316     specified 'blocking' mode has no affect on output."
317     (assert-device-state device use-channel.body device-write)
318     (with-slots (out-buffer outpos max-out-pos) device
319     (cond (buffer-arg
320     ;; if a buffer is provided, use it+bounds together with the devices buffer+bounds
321     ;; to iteratively empty the argument buffer. recurse for progressive output
322     (let ((total-count 0))
323     (loop (when (>= start end)
324     (return))
325     (let* ((count (min (- end start) (- max-out-pos outpos)))
326     (source-start start) (source-end (+ source-start count))
327     (buffer-start outpos) (buffer-end (+ outpos count)))
328     (setf start source-end
329     outpos buffer-end)
330     (when (> (+ (device-body-position device) count)
331     (device-body-length device))
332     (error "DEVICE-WRITE: output exceeds content length: ~s, ~s."
333     (+ (device-body-position device) count)
334     (device-body-length device)))
335     (replace out-buffer buffer-arg :start1 buffer-start :end1 buffer-end :start2 source-start :end2 source-end)
336     (incf total-count count)
337     (if (>= outpos max-out-pos)
338     ;; flush the buffer
339     (let ((result (device-write device nil 0 nil blocking)))
340     (when (minusp result)
341     (return-from device-write result)))
342     (incf (device-body-position device) count))))
343     total-count))
344     (t
345     (assert (and (zerop start) (or (null end) (= end (length out-buffer)))) ()
346     "Frame buffer i/o permitted for entire buffers only.")
347     (device-flush device)))))
348    
349    
350    
351     (defmethod device-flush ((device amqp:channel) &optional (complete nil))
352     "Push data to the channel'c connection.
353     DEVICE : amqp:channel : an open channel
354     COMPLETE : boolean : iff true, an in-progress chunked body stream is closed.
355    
356     Given a buffer, its content is passed through the channel's
357     frame buffers. Lacking a buffer, the exisitng frame buffer is sent. The effect is to support
358     single-frame commands, sequence-based stream io, and buffer based stream io. The device state is checked to
359     confirm an operation makes sense.
360    
361     On framed content, streams, and chunking:
362     there are three aspects:
363     * is there buffered content: (zerop outpos) ?
364     * is the content complete?
365     * was the content length known ahead of time: .output or .output.chunked ?
366    
367     if there is content, wrap up the body frame and send it.
368     given the now empty buffer, if the content body complete, then it matters whether the length was
369     predetermined. if it was, that is, in state .output, since this is not an abort, sufficient
370     frames must be sent to achieve the content length. if the length was not predtermined, then the
371     state is .output.chunked and the end of the sequence of commands is indicated by sending a
372     zero-length body frame and then padding to fill the content length. shoudl this have been an
373     empty buffer, then the zero-length frame will be followed by a full length pad."
374    
375     (let ((result-length 0))
376     (typecase (device-state device)
377     (amqp.s:use-channel.body.output
378     (with-slots (out-buffer outpos max-out-pos body-position body-length) device
379     (flet ((flush-frame ()
380     (let* ((frame (claim-output-frame device))
381     (length outpos))
382     (rotatef out-buffer (frame-data frame))
383     (setf-frame-type-class-name 'amqp:body frame)
384     (setf-frame-cycle 0 frame)
385     (setf-frame-channel-number (channel-number device) frame)
386     (setf-frame-track-number (channel-track device) frame)
387     (setf-frame-size outpos frame)
388     (put-encoded-frame device frame)
389     (setf outpos 0)
390     (setf max-out-pos (length out-buffer))
391     length)))
392     ;; check whether there is anything to flush
393     (when (or (plusp outpos) (zerop body-length)) ; always at least one frame
394     ;; if there is content, send the frame out.
395     (setf result-length (flush-frame)))
396     ;; check completion
397     (cond (complete
398     (typecase (device-state device)
399     (amqp.s:use-channel.body.output.chunked
400     ;; if the content was chunked, send a zero-length frame and refert to .output
401     (amqp:log :debug device "Ending chunking. padding ~d bytes..."
402     (- body-length body-position))
403     (flush-frame)
404     (setf (device-state device) amqp.s:use-channel.body.output)))
405     ;; now send frames to fill the difference between content-position and
406     ;; content-length - as many as needed
407     (do ((count (min (- body-length body-position) max-out-pos)
408     (min (- body-length body-position) max-out-pos)))
409     ((<= count 0))
410     (fill out-buffer 0 :start 0 :end count)
411     (setf outpos count)
412     (flush-frame)
413     (incf body-position count))
414     (amqp:log :debug device "Ended padding."))
415     (t
416     (typecase (device-state device)
417     (amqp.s:use-channel.body.output.chunked
418     ;; if so, need to send a new command:
419     ;; send a new publish, reset the buffer positions, and continue streaming
420     (let ((basic (channel.basic device)))
421     (amqp:log :debug basic "Starting next chunk: publish...")
422     (amqp:send-publish basic :exchange (basic-exchange basic))
423     (setf outpos 0
424     max-out-pos (length out-buffer)
425     ;; start a new body
426     body-length (class-body-size basic)
427     body-position 0)
428     (amqp:log :debug basic "Starting next chunk: header...")
429     (send-header basic)
430     (amqp:log :debug basic "Starting next chunk: done."))))))
431     result-length))))))
432    
433    
434     (defmethod device-clear-input ((device amqp:channel) buffer-only)
435     ;;; clear a possible pushed character and "empty" buffer.
436     ;;; unless buffer-only, also flush any not yet read frames until
437     ;;; the end of the body is reached
438     (with-slots (decoder buffer buffpos buffer-ptr body-position body-length) device
439     (funcall decoder #'(lambda (s) (declare (ignore s)) 0) device)
440     ;; skip over anything already in the buffer
441     (setf body-position (+ body-position (- buffer-ptr buffpos)))
442     (setf buffpos buffer-ptr)
443     ;; optionally drain pending input
444     (unless buffer-only
445     ;; flush input
446     (amqp:log :debug device "device-clear-input: drain expected frames: state: ~a, at ~s of ~s"
447     (device-state device) body-position body-length)
448     (loop (when (>= body-position body-length)
449     (return))
450     (unless (plusp (device-read device nil 0 nil t))
451     (return))
452     (incf body-position buffer-ptr)))
453     nil))
454    
455    
456     (defmethod device-buffer-length ((device amqp:channel))
457     (let ((connection (channel-connection device)))
458     (if connection
459     (device-buffer-length connection)
460     0)))
461    
462    
463     (defmethod (setf device-file-position) (position (device amqp:channel))
464     (declare (ignore position))
465     (device-body-length device))
466    
467    
468     (defmethod device-file-length ((device amqp:channel))
469     (device-body-length device))
470    
471    
472     (defmethod (setf device-file-length) (length (device amqp:channel))
473     (declare (ignore length))
474     nil)
475    
476    
477    
478     ;;;
479     ;;; connection operations
480    
481    
482     (defmethod device-open ((device amqp:connection) #-sbcl (slots t) initargs)
483     "Prepare a connection by opening a socket to broker, negotiating the
484     protocol parameter, and opening the virutal host."
485    
486     (etypecase (device-state device)
487     (amqp.s:open-connection
488     (if (or (stream-input-handle device)
489     (stream-output-handle device))
490     (call-next-method)
491     (destructuring-bind (&key (uri (connection-uri device))
492     (version (class-protocol-version device))
493     (direction :io)
494     &allow-other-keys)
495     initargs
496     ;; merge the host and port information from/to the uri
497     (let ((remote-host (uri-host uri))
498     (remote-port (or (uri-port uri) *standard-port*)))
499    
500     (ecase direction
501     (:probe (call-next-method device #-sbcl slots
502     (list* :remote-host remote-host
503     :remote-port remote-port
504     :direction :probe
505     initargs)))
506     (:io (when (call-next-method device #-sbcl slots
507     (list* :remote-host remote-host
508     :remote-port remote-port
509     :direction :io
510     initargs))
511     (when (open-connection device :version version)
512     ;; once the concrete class is fixed, initialize the buffer
513     (device-initialize-buffers device)
514     (negotiate-client-connection device)
515     t))))))))
516     (amqp.s:use-connection
517     (call-next-method))))
518    
519    
520     (defmethod device-close ((device amqp:connection) (abort t))
521     (map nil #'(lambda (c) (when c (device-close c abort)))
522     (get-connection-channels device))
523     (if abort
524     (call-next-method)
525     (typecase (shiftf (device-state device) amqp.s:close-connection)
526     ;; never succeeded to open
527     (amqp.s:open-connection
528     (call-next-method))
529     ;; if it's in use, perform a protocol close, then flush that data
530     ;; close the stream, and reset to the initial state
531     (amqp.s:use-connection
532     (amqp:request-close device)
533     (device-flush device t)
534     (multiple-value-prog1 (call-next-method)
535     (setf (connection-state device) amqp.s:open-connection)))
536     ;; otherwise, the protocol close is already in progress;
537     ;; just flush and close the stream
538     (t
539     (device-flush device t)
540     (call-next-method)))))
541    
542    
543    
544    
545     (defmethod device-read ((device amqp:connection) buffer-arg start end (blocking t))
546     "Connections permit stream io only if there is just one channel read a frame at a time through a connection.
547     the connection manages the actual stream and makes frames available as
548     they appear. the specified 'blocking' mode determines whether to
549     wait of there is noting present."
550    
551     (assert-device-state device use-connection amqp.s:use-channel.body.input)
552     (with-slots (buffer buffpos buffer-ptr buf-len) device
553     (if (or (< buffer-ptr 0)
554     (>= (device-body-position device) (device-body-length device)))
555     -1
556     (cond (buffer-arg
557     ;; if a buffer is provided, use it+bounds together with the devices buffer+bounds
558     ;; to iteratively fill the argument buffer. recurse for more input
559     (let ((total-count 0))
560     (loop (let* ((count (min (- end start) (- buffer-ptr buffpos)))
561     (start1 start) (end1 (+ start1 count))
562     (start2 buffpos) (end2 (+ buffpos count)))
563     (setf start end1
564     buffpos end2)
565     (replace buffer-arg buffer :start1 start1 :end1 end1 :start2 start2 :end2 end2)
566     (incf total-count count)
567     (when (>= start end)
568     (incf (device-body-position device) count)
569     (return))
570     ;; read more
571     (let ((result (device-read device nil 0 nil blocking)))
572     (when (minusp result)
573     (return-from device-read result)))))
574     total-count))
575     (t
576     ;; otherwise read a frame buffer
577     (assert (and (zerop start) (null end)) ()
578     "Frame buffer i/o permitted for entire buffers only.")
579     (loop (let ((frame (get-read-frame device :wait blocking)))
580     (if frame
581     (when (plusp (length frame))
582     (let* ((data (frame-data frame))
583     (length (frame-size frame)))
584     (rotatef buffer data)
585     (setf-frame-data data frame)
586     (release-frame frame)
587     (setf buffpos 0)
588     (setf buffer-ptr length)
589     (setf buf-len (length buffer))
590     (assert (<= buffer-ptr buf-len) ()
591     "Invalid buffer sizes: ptr ~d, len ~d." buffer-ptr buf-len)
592     (return length)))
593     (return (setf buffer-ptr -1))))))))))
594    
595    
596     ;;; this duplicates most of the channel version, but for the buffer source
597     (defmethod device-write ((device amqp:connection) buffer-arg start end (blocking t))
598     (assert-device-state device :use-connection device-write)
599     (with-slots (out-buffer outpos max-out-pos) device
600     (cond (buffer-arg
601     ;; if a buffer is provided, use it+bounds together with the devices buffer+bounds
602     ;; to iteratively empty the argument buffer. recurse for progressive output
603     (let ((total-count 0))
604     (loop (when (>= start end)
605     (return))
606     (let* ((count (min (- end start) (- max-out-pos outpos)))
607     (source-start start) (source-end (+ source-start count))
608     (buffer-start outpos) (buffer-end (+ outpos count)))
609     (setf start source-end
610     outpos buffer-end)
611     (when (> (+ (device-body-position device) count)
612     (device-body-length device))
613     (error "DEVICE-WRITE: output exceeds content length: ~s, ~s."
614     (+ (device-body-position device) count)
615     (device-body-length device)))
616     (replace out-buffer buffer-arg :start1 buffer-start :end1 buffer-end :start2 source-start :end2 source-end)
617     (incf total-count count)
618     (if (>= outpos max-out-pos)
619     ;; flush the buffer
620     (let ((result (device-write device nil 0 nil blocking)))
621     (when (minusp result)
622     (return-from device-write result)))
623     (incf (device-body-position device) count))))
624     total-count))
625     (t
626     (assert (and (zerop start) (null end)) ()
627     "Frame buffer i/o permitted for entire buffers only.")
628     (device-flush device)))))
629    
630     (defmethod device-flush ((device amqp:connection) &optional complete-p)
631     (declare (ignore complete-p))
632     (typecase (device-state device)
633     (amqp.s:use-connection
634     (with-slots (out-buffer outpos max-out-pos) device
635     (when (plusp outpos)
636     (let* ((frame (claim-output-frame device))
637     (length outpos))
638     (rotatef out-buffer (frame-data frame))
639     (setf-frame-type-class-name 'amqp:body frame)
640     (setf-frame-cycle 0 frame)
641     (setf-frame-channel-number (channel-number device) frame)
642     (setf-frame-track-number (channel-track device) frame)
643     (setf-frame-size outpos frame)
644     (put-encoded-frame device frame)
645     (setf outpos 0)
646     (setf max-out-pos (length out-buffer))
647     length))))))
648    
649     (defmethod device-buffer-length ((device amqp:connection))
650     "Until the connection has been specialized for the protocol, propose the full frame
651     as the buffer size."
652     (let ((frame-class (connection-input-frame-class device)))
653     (- (connection-frame-size device)
654     (if frame-class
655     (frame-overhead (allocate-instance (find-class frame-class)))
656     0))))
657    
658     (defmethod device-file-position ((device amqp:connection))
659     (device-body-position device))
660    
661     (defmethod (setf device-file-position) (position (device amqp:connection))
662     (declare (ignore position))
663     nil)
664    
665     (defmethod device-file-length ((device amqp:connection))
666     (device-body-length device))
667    
668     (defmethod (setf device-file-length) (length (device amqp:connection))
669     (declare (ignore length))
670     nil)
671    
672    
673    
674     (defmethod open-connection ((connection amqp:connection)
675     &key (version (class-protocol-version connection))
676     (attempt 1))
677     "Given a CONNECTION and an optional VERSION, open a socket to the respective
678     host/port, negotiate the version and process any initial frame(s).
679    
680     CONNECTION : amqp:connection : an initialized connection w/o a socket. May be
681     an abstract connection
682     :VERSION : amqp:version : Intended version
683     VALUES : (or stream null) : the open stream if succeeded
684     amqp:version : the negotiated version
685    
686     The socket is opened and an attempt is made to read a permitted version. The
687     criteria is, whether a connection class is defined which supports the version.
688     If so, the given connection instance's class is changed to agree.
689     In addition, a possible initial frame is read and queued."
690    
691     (let ((buffer-out (make-array 8 :element-type 'unsigned-byte))
692     (buffer-in (make-array 8 :element-type 'unsigned-byte))
693     (version-received nil)
694     (byte-zero 0))
695     (labels ((negotiation-failed (&optional reason)
696     (error "Connection negotiation failed~@[ (~a)~]: requested ~s, class ~s, received ~s"
697     reason
698     version (class-protocol-version connection) version-received))
699     (update-connection-class (version-received)
700     (let ((new-class (amqp:find-protocol-class connection version-received)))
701     (unless new-class
702     (negotiation-failed "Unsupported protocol version"))
703     (cond ((eq new-class (class-of connection)))
704     ((eql attempt 1)
705     (change-class connection new-class)
706     (amqp:log :debug connection "open-connection: updated class to ~s."
707     (type-of connection)))
708     (t
709     (negotiation-failed "Re-negotiation failed to match")))))
710     (cycle-socket ()
711     (let* ((uri (device-uri connection))
712     (remote-host (uri-host uri))
713     (remote-port (or (uri-port uri) *standard-port*)))
714     (setf (device-socket connection)
715     (usocket:socket-connect remote-host remote-port
716     :element-type 'unsigned-byte)))))
717    
718     (setf (buffer-protocol-header-version buffer-out) version)
719     (amqp:log :debug connection "open-connection: requesting version: ~s/~s."
720     buffer-out version)
721     (write-sequence buffer-out (stream-output-handle connection))
722     (force-output (stream-output-handle connection))
723     (case (setf byte-zero (read-byte (stream-input-handle connection)))
724     ;; the later protocols reply with a version to confirm, but
725     ;; the early ones just send the start frame immediately
726     (#.(char-code #\A)
727     ;; if a protocol header is returned, if the version matches, ok
728     ;; otherwise close the connection and return the version
729     (setf (aref buffer-in 0) #.(char-code #\A))
730     (cond ((= 8 (read-sequence buffer-in (stream-input-handle connection) :start 1))
731     (setf version-received (buffer-protocol-header-version buffer-in nil))
732     (amqp:log :debug connection "open-connection: parsed version: ~s / ~s."
733     buffer-in version-received)
734     ;; negotiate the protocol
735     (cond (version-received
736     ;; use received versions to specialize the given instance
737     (update-connection-class version-received)
738     ;; cycle the socket and retry to connect
739     (cycle-socket)
740     (open-connection connection :attempt (1+ attempt) :version version-received))
741     (t
742     (negotiation-failed (format nil "Unsupported protocol header: ~a." buffer-in)))))
743     (t
744     (negotiation-failed "Incomplete protocol header"))))
745     (t
746     ;; version accepted w/ immediate connection-open
747     ;; still, update to change from abstract to concrete class
748     (amqp:log :debug connection "open-connection: byte-zero: ~s, connection class ~s."
749     byte-zero (type-of connection))
750     (setf version-received version)
751     (update-connection-class version)
752    
753     ;; this had to wait until the connection had been transformed in to a concrete class
754     (let ((frame (claim-input-frame connection)))
755     (setf (aref (frame-header frame) 0) byte-zero)
756     ;; (print :no-header)
757     (read-frame connection frame :start 1)
758     ;; make channel-zero and prime it with the first frame
759     (amqp:connection.channel connection :number 0)
760     (put-read-frame connection frame)
761     (values version frame)))))))
762    
763    
764     (document (negotiate-client-connection open-connection)
765     "AMQP connection negotiation occurs in two steps. First, the peers agree on a protocol version. Second
766     they exchange authentication and control information to set up the connection. The first step is
767     implemented by open-connection. It negotiates with the broker to agree on a supported protocol version
768     and changes the connection instance's class to that of the implementation class for that version. For
769     some protocol versions, it is also confronted with the initial frame, which it buffers for the
770     configuration step.
771    
772     The second step, authentication and configuration, is implemented in negotiate-client-connection. It
773     exchanges connection commands with the broker for authentication and tuning. The configured connection is
774     returned.")
775    
776    
777    
778     (defmethod negotiate-client-connection ((device amqp:connection) &key (retry-limit 2))
779     "Negotiate security and open a virtual host:
780     - construct a channel-zero instance.
781     - go through the handshake w/ start, secure, tune comamnds.
782     - open channel-zero for the connection's host. "
783    
784     (let ((channel (amqp:connection.channel device :number 0)))
785     (command-case (channel)
786     (amqp:start ((class amqp:connection) &rest args)
787     (setf (connection-state device) amqp.s:open-connection.start)
788     (apply #'channel-respond-to-start channel device args)
789     t)
790     (t ((class t) &rest args)
791     (error "Invalid negotiation command: ~s~{ ~s~}." class args)))
792    
793     (dotimes (x retry-limit (error "Security negotiation failed."))
794     (command-case (channel)
795     (amqp:secure ((class amqp:connection) &rest args)
796     (setf (connection-state device) amqp.s:open-connection.secure)
797     (apply #'channel-respond-to-secure channel class args)
798     t)
799     (amqp:tune ((class amqp:connection) &rest args)
800     (setf (connection-state device) amqp.s:open-connection.tune)
801     (apply #'channel-respond-to-tune channel class args)
802     (return))))
803    
804     (setf (connection-state device) amqp.s:open-connection.host)
805     (channel-request-open channel device :virtual-host (connection-virtual-host device))
806     (setf (connection-state device) amqp.s:use-connection)
807    
808     device))
809    
810    
811     (document (device-read-content device-write-content)
812     "The content processing interface comprises the two operators
813     * device-read-message (channel &rest)
814     * device-write-message (channel body &rest)
815    
816     Each accepts the keywords which apply to the respctive protocol operation, that is, any method arguments and the class'
817     header properties. for reading this means the arguments for get and deliver, while for writing those for publish.
818     The interface supports two use patterns : body instances and continuation based. The decision is made by the writer
819     according to whether the body size is known at the outset. For fixed length vectors this is true. For aeverything else, it
820     is not. Where it is known, a single content-bearing command is sent. Where is it not known, A sequence of commands is
821     sent until the writer terminates the stream by indicating completion in a call to device-flush.
822    
823     Streams are broken into and reconstituted from the three frame constituents for a command : method, header, and body.
824     * a method frame : is emitted by a request operator and parsed and processed by the respond-to- operator to cache the
825     arguments in the channel. The protocol interface operator then invokes device-read/write-content.
826     * a content header : ion inpt, s parsed in device-read-content/content-header to extract the properties and cache them
827     in the channel's basic class instance. On output, it is generated by device-write-content based on the channel's current
828     basic instance
829     * the content body : is parsed based on the channel's internal type combined with the content type. It is generated
830     based on the bassed body instance in combination with the content type /encoding.
831    
832     [ channel-request-publish channel-respond-to-get-ok channel-respond-to-deliver ]
833     --> [ device-read-content device-write-content]
834     --> [ device-read-content-header device-write-content-header ]
835     --> [ device-read-content-body device-write-content-body ]
836     ")
837    
838     (defgeneric device-read-content (channel &key delivery-tag redelivered exchange routing-key
839     ;; from get-ok only
840     message-count
841     ;; from deliver only
842     consumer-tag
843     )
844     (:documentation "Given a channel which has received a Basic.Deliver or Basic.Get/Get-ok,
845     firat, prepare the channel based on the content header properties, and read the content
846     according to the combined channel data type and content type. Combine the header's possibly
847     incomplete content type with the channel's to specify the effective decoding.")
848    
849     (:method ((channel amqp:channel) &key body delivery-tag redelivered exchange routing-key message-count consumer-tag)
850     (declare (ignore delivery-tag redelivered exchange routing-key message-count consumer-tag))
851     (setf (channel-state channel) amqp.s:use-channel.body.input)
852     (assert-argument-type device-read-content body (or null function))
853     (let* ((basic (device-read-content-header channel))
854     (headers (amqp:basic-headers basic))
855     (element-type (getf headers :element-type))
856     (package (getf headers :package))
857     (mime-type (class-mime-type basic)))
858     ;; element-type in the basic header combines the read values with the channel's content-type
859     (setf element-type (or (find-symbol element-type package)
860     (error "Invalid type x package combination: ~s, ~s."
861     element-type package)))
862     (amqp:log :debug channel "device-read-content: in (~s ~s) in state ~s x~s"
863     element-type mime-type (channel-state channel) (device-body-length channel))
864     (device-read-content-body channel (or body element-type) mime-type))))
865    
866    
867     (defgeneric device-read-content-header (channel )
868     (:method ((channel amqp:channel))
869     (command-loop (channel)
870     (amqp:header ((basic amqp:basic) &key frame &allow-other-keys)
871     (declare (ignore frame))
872     ;; merge the header's content type with the stream's
873     (let* ((body-size (class-body-size basic))
874     (headers (amqp:basic-headers basic))
875     (mime-type (mime-type basic)))
876     (assert-argument-type device-read-content-body body-size integer)
877    
878     (with-slots (buffer buffer-ptr body-length body-position) channel
879     (unless buffer
880     (device-initialize-input-buffer channel))
881     (setf body-length body-size
882     buffer-ptr 0
883     body-position 0))
884     (update-device-codecs channel mime-type)
885     (setf (channel-state channel)
886     (if (string-equal (getf headers :transfer-encoding) "chunked")
887     amqp.s:use-channel.body.input.chunked
888     amqp.s:use-channel.body.input))
889     (return-from command-loop basic))))))
890    
891    
892     (defgeneric device-read-content-body (device type content-type)
893    
894     (:method ((channel amqp:channel) (type (eql 'string)) (content-type mime:text/plain))
895     (let* ((body-length (device-body-length channel))
896     ;; construct a string with the message content
897     (body (make-string body-length)))
898     (read-sequence body channel :start 0 :end body-length)
899     body))
900    
901     (:method ((channel amqp:channel) (body-op function) (content-type mime:*/*))
902     "Given a the null type, just return the channel as a stream to be read."
903     (prog1 (funcall body-op channel content-type)
904     (device-clear-input channel nil)))
905    
906     (:method ((channel amqp:channel) (type (eql 'vector)) (content-type mime:application/octet-stream))
907     "Given a the type 'vector, create one and copy the stream content into it."
908     (let* ((body-length (device-body-length channel))
909     (body (make-frame-buffer body-length)))
910     (device-read channel body 0 body-length nil)
911     body))
912    
913     (:method ((channel amqp:channel) (channel-type (eql 'list)) (content-type mime::application/sexp))
914     "Given an sexp mime type, then read the form."
915     (let* ((basic (amqp:channel.basic channel))
916     (headers (amqp:basic-headers basic))
917     (package (getf headers :package)))
918     (setf package (or (find-package package)
919     (amqp:syntax-error :channel channel
920     :class-code (amqp:class-id basic)
921     :message-string "Invalid package designator: ~s"
922     :message-arguments (list package)))) ;;;!!! need a text
923     (with-standard-io-syntax
924     (setq *read-eval* nil)
925     (let ((*package* package))
926     (prog1 (read channel)
927     (device-clear-input channel nil))))))
928    
929     (:method ((channel amqp:channel) (channel-type (eql 'standard-object)) (content-type mime::application/sexp))
930     (let* ((basic (amqp:channel.basic channel))
931     (headers (amqp:basic-headers basic))
932     (package (getf headers :package)))
933     (setf package (or (find-package package)
934     (amqp:syntax-error :channel channel
935     :class-code (amqp:class-id basic)
936     :message-string "Invalid package designator: ~s"
937     :message-arguments (list package))))
938     (with-standard-io-syntax
939     (let ((*package* package))
940     (setq *read-eval* nil)
941     (let ((form (read channel)))
942     (prog1 (reconstitute form)
943     (device-clear-input channel nil))))))))
944    
945     (defun reconstitute (channel-form)
946     "Interpret a received object representation as the two values generated by make-load-form.
947     This version expects a let expression, because that expresses the necessary circularity."
948     (let ((object nil))
949     (destructuring-bind (let ((marker creation)) initialization) channel-form
950     (declare (ignore let))
951     (labels ((simple-eval (form)
952     (typecase form
953     (cons (ecase (first form)
954     (quote (second form))
955     (progn (map nil #'simple-eval (rest form)))
956     ((make-instance initialize-instance allocate-instance reinitialize-instance
957     shared-initialize find-class)
958     (apply (first form) (mapcar #'simple-eval (rest form))))
959     (setf (loop for ((slot-value x name) value) on (rest form) by #'cddr
960     do (progn
961     (assert (eq slot-value 'slot-value) () "invalid form: ~s" form)
962     (setf (slot-value (simple-eval x) (simple-eval name))
963     (simple-eval value)))))))
964     (t form))))
965     (setf object (simple-eval creation))
966     (simple-eval (subst object marker initialization))
967     object))))
968    
969     ;(reconstitute '(let ((:m (allocate-instance (find-class 'tc)))) (setf (slot-value ':m 'a) '(as d f))))
970    
971    
972    
973     (defgeneric device-write-content (channel body &rest args
974     &key class-id weight body-size
975     exchange routing-key mandatory immediate
976     content-type content-encoding headers delivery-mode
977     priority correlation-id reply-to expiration message-id timestamp
978     type user-id)
979     (:documentation "Given a channel which has sent a Basic.Publish,
980     firat, write a content header based on the properties, then write the content
981     according to the combined channel data type and content type. Combine the header's possibly
982     incomplete content type with the channel's to specify the effective encoding.")
983     (declare (dynamic-extent args))
984    
985     (:method ((channel amqp:channel) body &rest args)
986     ;; configure the respective basic for the (content x element-type x content-type)
987     ;; combination. this resolve the body size, the transfer encoding, and the
988     ;; transfer element type
989     (let* ((basic (apply #'device-write-content-header channel body args)))
990    
991     (prog1 (apply #'device-write-content-body channel body (mime-type basic) args)
992     (device-flush channel t)))))
993    
994    
995     (defgeneric device-write-content-header (channel body
996     &key class-id weight body-size
997     exchange routing-key mandatory immediate
998     content-type content-encoding headers delivery-mode
999     priority correlation-id reply-to expiration message-id timestamp
1000     type user-id)
1001    
1002     (:method ((channel amqp:channel) (body t) &rest args)
1003     (let* ((basic (apply #'amqp:channel.basic channel :body body args))
1004     (body-size (class-body-size basic))
1005     (headers (amqp:basic-headers basic))
1006     (mime-type (mime-type basic)))
1007     (with-slots (out-buffer max-out-pos outpos body-length body-position) channel
1008     (unless out-buffer
1009     (device-initialize-output-buffer channel))
1010     (setf body-length body-size
1011     body-position 0))
1012     (update-device-codecs channel mime-type)
1013     (setf (channel-state channel)
1014     (if (string-equal (getf headers :transfer-encoding) "chunked")
1015     amqp.s:use-channel.body.output.chunked
1016     amqp.s:use-channel.body.output))
1017     (send-header basic)
1018     basic)))
1019    
1020    
1021     (defgeneric device-write-content-body (device body content-type
1022     &key
1023     body-size class-id consumer-tag content-type content-encoding correlation-id
1024     delivery-mode delivery-tag exchange expiration headers immediate
1025     mandatory message-count
1026     priority routing-key reply-to message-id
1027     redelivered timestamp type user-id weight)
1028    
1029     (:method ((channel amqp:channel) (body null) (content-type mime:*/*) &rest args)
1030     "Given a null body , configure the channel to write the message body
1031     and return the stream."
1032     (declare (dynamic-extent args) (ignore args))
1033     nil)
1034    
1035     (:method ((channel amqp:channel) (body string) (content-type mime:text/plain) &rest args)
1036     (declare (dynamic-extent args) (ignore args))
1037    
1038     ;; write the content,
1039     (stream-write-string channel body 0 nil))
1040    
1041     (:method ((channel amqp:channel) (body-op function) (content-type mime:*/*) &rest args)
1042     (declare (dynamic-extent args) (ignore args))
1043    
1044     ;; call the function
1045     (funcall body-op channel content-type))
1046    
1047     (:method ((channel amqp:channel) (body vector) (content-type mime:application/octet-stream) &rest args)
1048     "Given a the type 'vector, create one and copy the stream content into it."
1049     (declare (dynamic-extent args) (ignore args))
1050    
1051     (device-write channel body 0 (length body) t))
1052    
1053     (:method ((channel amqp:channel) (body cons) (content-type mime::application/sexp) &rest args)
1054     "Given an sexp mime type, set up the stream, then read the form."
1055     (declare (dynamic-extent args) (ignore args))
1056     (with-standard-io-syntax
1057     (write body :stream channel :circle t)))
1058    
1059     (:method ((channel amqp:channel) (body standard-object) (content-type mime::application/sexp) &rest args)
1060     "Given an sexp mime type, set up the stream, then read the form."
1061     (declare (dynamic-extent args) (ignore args))
1062    
1063     (multiple-value-bind (creation initialization)
1064     (make-load-form body)
1065     (with-standard-io-syntax
1066     (let ((marker (list :object)))
1067     (let ((form `(let ((,marker ,creation))
1068     ,(subst marker body initialization))))
1069     (write form :stream channel :circle t)))))))

  ViewVC Help
Powered by ViewVC 1.1.5