/[de-setf-amqp]/device-level.lisp
ViewVC logotype

Contents of /device-level.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 56135 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
5 (document :file
6 (description "This file implements device-level support for streams based on AMQP connections as part of the
7 'de.setf.amqp' library.")
8 (copyright
9 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
10 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
11 of the GNU Affero General Public License as published by the Free Software Foundation.
12
13 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
14 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 See the Affero General Public License for more details.
16
17 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
18 If not, see the GNU [site](http://www.gnu.org/licenses/).")
19
20 (long-description "Device-level operations are managed by channel instances, which delegete in turn to
21 connection-wrapped tcp sockets. The AMQP standards permit this combination to behave in either of (at
22 least) two ways. On one hand, one can use all allowed protocoal variability and treat the socket
23 connection as a multiplexed multi-channel stream. In this mode each thread would instantiate its own
24 channel on the shared connection. On the other hand, is also possible to constrain the connection's use
25 to dedicate it to a single channel - and thereby to a single thread.
26
27 The multiplexed mechanism sets a much lower demand for system resources, but increases the processing
28 for a single channel and constrains the i/o operations to block mode. A dedicated mechanims requires
29 more sockets, but can dedicate the buffering to a single thread, which permits more efficient i/o
30 operations.
31
32 When the socket is multiplexed, then it is possible that content frames for one channel will be
33 interleaved with frames for another channel, which means it is not possible to guarantee that a given
34 socket read operation will contain an expected content frame. The multiplexing requires per-channel
35 queuing and processing upon delivery, whereby the actual buffer is not to be predicted. When the socket
36 is dedicated, read operatation can target a given buffer, since the intra-channel ordering constraints
37 require that content be delivered in-order and uninterrupted. The protocol meta-data can be read and
38 processed in-line ro parameterize the read and recognize exceptions, while data i/o targets given
39 buffers. The 1.0pr documentintroduces additional session and context for frame processing, but those are
40 not considered here. (see amqp0-8.pdf,p.56 amqp0-9.pdf,p.35)
41
42 In the implementation below, the two amqp classes - connection and channel, are each specialized from
43 simple-stream and implement the standard interface. The required device operators are implemented for
44 both amqp:channel and amqp:connection, despite that stream operations are limited to channels while
45 connections are to be used only to consolidate frame-based operations with the network stream. The
46 principle operators are
47
48 device-open (stream #-sbcl slots initargs)
49 device-close (stream abort)
50 device-read (stream buffer start end blocking)
51 device-clear-input (stream buffer-only)
52 device-write (stream buffer start end blocking)
53 device-clear-output (stream)
54 device-flush (device)
55 device-read-content (device &rest content-arguments)
56 device-write-content (device body &rest content-arguments)
57
58 The device-open operator returns an instance which can be used directly. In the case of a channel, it
59 connects to the respective connection and prepares any exchange/queue specified in uri initialization
60 argument. In that state it can be supplied to publish, consume, or get operations to initiate stream
61 i/o, or to transfer isolated objects. In the case of a connection, a socket stream is opened to the
62 designated server and the protocol handshake and open-connection phases are completed. (see
63 open-connection and negotiate-client-connection) In that state, one can construct and use channels. In the
64 case of a connection, it should be used only to make channels. Note that the connection class is adjusted to
65 match that of the protocol version which it negotiates with the remote broker. That is, a
66
67 (make-instance 'amqp:connection :uri \"amqp://guest:guest@localhost/\")
68
69 will return a connection sub-class. This places constraints on the effective methods for `device-open` and the
70 various constituents of the standard instantiation protocol for `simple-stream`.
71 The respective effective methods for the base implementation look something like
72
73 initialize-instance : (CONNECTION) (STANDARD-OBJECT)
74 shared-initialize (in mcl) : (CONNECTION T) (AMQP-DEVICE T) :AFTER (SIMPLE-STREAM T) (STANDARD-OBJECT T)
75 reinitialize-instance : (STANDARD-OBJECT)
76 device-open (in sbcl) : (CONNECTION T) (AMQP-SOCKET-DEVICE T) (AMQP-DEVICE T) (SIMPLE-STREAM T) (SLOT-OBJECT T)
77
78 The exact form depends on the run-time (cf. `standard-object` v/s `sloto-object`), but all share the topology,
79 that no next method call follows the class change. Should protocol-specific specialization be necessary, any
80 specialized operation subsequent to the change would need to be independent of these effective methods.
81
82 The device-read and device-write operators are defined at the respective levels to delegate a channel's
83 operations to the respective connection and to perform connection i/o through the socket stream. When
84 channel operations with an explicit buffer, are intended to implement data stream-based data transfer
85 for a channel which has already initiated a message exchange and observe the body size constraints
86 specified fro the message. Where the size is known at the outset, the body-size and body-position govern
87 eof behaviour. Where the size was unkown, the channel implemented chunked transfer in terms of maxmial
88 sized messages, and relies on device-flush and device-clear-input to indicate and manage the effective
89 eof.
90
91 The state operators for position and length return meaningful information for fixed content sizes only
92 and have no effect to modify channel state.
93
94 For more information on simple streams, see Franz's documentation[3] and the sbcl implementation[4] of same,
95 as well as the discussions of the the alternative fu interface.[5]
96
97 ---
98 [1]: https://jira.amqp.org/confluence/download/attachments/720900/amqp0-8.pdf?version=1
99 [2]: https://jira.amqp.org/confluence/download/attachments/720900/amqp0-9-1.pdf?version=1
100 [3]: http://www.franz.com/support/documentation/current/doc/streams.htm
101 [4]: http://sbcl.cvs.sourceforge.net/viewvc/sbcl/sbcl/contrib/sb-simple-streams/
102 [5]: http://paste.lisp.org/display/65229"))
103
104
105
106
107 (defmacro assert-device-state (device state &optional op)
108 (let ((state (or (find-symbol (string state) :amqp.s)
109 (error "invalid state indicator: ~s." state)))
110 (device-state-var (gensym)))
111 `(let ((,device-state-var (device-state ,device)))
112 (assert (typep ,device-state-var ',state) ()
113 "~@[~a: ~]Required device state ~a is not satisfied by ~a."
114 ',op ',state ,device-state-var))))
115
116
117 ;;;
118 ;;; channel operations
119
120
121 (defmethod device-open ((device amqp:channel) #-sbcl (slots t) initargs)
122 "Prepare a channel by binding it locally to a connection and declaring its
123 exchange and queue on the connection's server. The connection may be provided
124 as an instance to be re-used, or as a server designator, in which case a
125 news connection is opened."
126
127 (etypecase (device-state device)
128 (amqp.s:open-channel
129 (if (or (stream-input-handle device)
130 (stream-output-handle device))
131 (call-next-method)
132 (destructuring-bind (&key (connection (or (channel-connection device)
133 (error "connection required.")))
134 (input-handle connection)
135 (output-handle connection)
136 &allow-other-keys)
137 initargs
138 (call-next-method)
139 (setf (stream-input-handle device) input-handle)
140 (setf (stream-output-handle device) output-handle)
141
142 ;; once that has been done, adjust the channel class
143 (let ((connection-channel-class (class-find-object-class connection 'amqp:channel)))
144 (unless (typep device connection-channel-class)
145 (change-class device connection-channel-class)))
146
147 ;; bind the channel to the connection to obtain the queues and initialize buffers
148 (connect-channel connection device)
149
150 ;; unless it's the connection's own channel,
151 ;; open the channel w/ the broker
152
153 (unless (zerop (channel-number device))
154 ;; resolve the channel's identifer relative to the connection - with
155 ;; non-strict handling to allow a scheme.
156 (let* ((uri (merge-uris (channel-uri device) (connection-uri connection) nil nil))
157 (host (uri-host uri)))
158 (setf-device-uri uri device)
159
160 ;; if the connection is 'for real', that is, if it specifies a host, open the channel
161 (unless (zerop (length host))
162 (amqp:open device)
163 #+(or ) ;; don't do this here:
164 ;; the 0.8r0 channel specializes device-open to get a ticket, which would need to
165 ;; happen before this, as the ticket is an argument.
166 ;; could go in initialize instance
167 (let ((exchange (uri-exchange uri))
168 (queue (uri-queue uri))
169 (object-path (uri-path uri))
170 (cond (exchange
171 (let ((queue (amqp:channel.queue device :queue queue))
172 (exchange (amqp:channel.exchange device :exchange exchange :type "direct")))
173 (amqp:declare queue)
174 (amqp:declare exchange)
175 (amqp:bind queue :exchange exchange :queue queue
176 :routing-key object-path)))
177 (queue
178 ;; if there is no exchange, allow input only
179 (assert (eq (stream-direction device) :input) ()
180 "An exchange must be provided for channel output.")
181 (amqp:declare (amqp:channel.queue device :queue queue)))))))))
182 (setf (device-state device) amqp.s:use-channel)
183 device)))
184 (amqp.s:use-channel
185 (call-next-method))))
186
187
188 (defmethod device-close ((device amqp:channel) (abort t))
189 "remove the channel from the connection."
190 (amqp:log :debug device "Close in state: ~s" (channel-state device))
191 (when (open-stream-p device)
192 (cond (abort
193 (call-next-method))
194 (t
195 (let ((initial-state (shiftf (channel-state device) amqp.s:close-channel)))
196 (typecase initial-state
197 ;; if in use, send the close request, the flush it
198 (amqp.s:use-channel
199 (amqp:request-close device)
200 ;; complete and flush the content.
201 (device-flush device t)))
202 (call-next-method))))
203 ;; in any case disconnect
204 (disconnect-channel (channel-connection device) device)))
205
206
207 (defmethod device-read ((device amqp:channel) buffer-arg start end blocking)
208 "Channels read a frame at a time through a connection.
209 the connection manages the actual stream and makes frames available as
210 they appear. the specified 'blocking' mode determines whether to
211 wait if there is nothing present."
212 (assert-device-state device use-channel.body.input device-read)
213 (with-slots (buffer buffpos buffer-ptr buf-len body-position body-length) device
214 (cond ((< buffer-ptr 0)
215 -1)
216 ((eql start end) ; interpret blocking
217 (if blocking
218 ;; nothing is read anyway
219 0
220 ;; iff not blocking , see if anything is present or in the read queue
221 (if (or (< buffpos buffer-ptr) (not (collection-empty-p (device-read-frames device))))
222 -3
223 0)))
224 ((>= body-position body-length)
225 (typecase (device-state device)
226 (amqp.s:use-channel.body.input.chunked
227 ;; chunked => start the next message
228 ;; - if the last was a deliver, wait for the next
229 ;; - if it was a get-ok, ask for the next
230 (command-case (device)
231 ((or amqp:get-ok amqp:deliver) ((basic amqp:basic) &rest args)
232 (amqp:log :debug device "chunk continuation: (~s ~s) . ~s"
233 (type-of amqp:method) (type-of basic) args)
234 t)
235 (t ((class t) &rest args)
236 (amqp:log :error device "Unexpected chunk continuation: (~s ~s) . ~s"
237 (type-of amqp:method) (type-of class) args)
238 t))
239 (let ((result (device-read-content-header device)))
240 (amqp:log :debug device "read chunk header: ~s" result))
241 ;; if that succeeds, read the
242 (if (< body-position body-length)
243 ;; just try again
244 (device-read device buffer-arg start end blocking)
245 -1))
246 (t
247 ; not chunked => mark eof
248 (setf buffer-ptr -1))))
249 (buffer-arg
250 ;; if a buffer is provided, use it+bounds together with the devices buffer+bounds
251 ;; to iteratively fill the argument buffer. recurse for more input
252 ;; maintain body-position since the buffer is accepting the content.
253 (let ((total-count 0)
254 (last-device-read 0))
255 (unless end (setf end (length buffer-arg)))
256 (unless start (setf start 0))
257 (loop (unless (> start end) (return))
258 (when (>= buffpos buffer-ptr)
259 (unless (and (< body-position body-length)
260 (plusp (setf last-device-read (device-read device nil 0 nil blocking))))
261 (return)))
262 (let* ((count (min (- end start) (- buffer-ptr buffpos)))
263 (end1 (+ start count))
264 (end2 (+ buffpos count)))
265 (replace buffer-arg buffer :start1 start :end1 end1 :start2 buffpos :end2 end2)
266 (setf start end1
267 buffpos end2)
268 (incf total-count count)
269 (incf body-position count)))
270 (if (zerop total-count)
271 last-device-read
272 total-count)))
273 (t
274 ;; otherwise read a frame buffer
275 (assert (and (zerop start) (or (null end) (= end (length buffer)))) ()
276 "Frame buffer i/o permitted for entire buffers only.")
277 (let* ((frame nil))
278 (loop (setf frame (get-read-frame device :wait blocking))
279 ;; if non-blocking, maybe no frame
280 (if frame
281 (cond ((plusp (frame-size frame))
282 (let* ((data (frame-data frame))
283 (length (frame-size frame)))
284 (rotatef buffer data)
285 (setf-frame-data data frame)
286 (release-frame frame)
287 (setf buffpos 0)
288 (setf buffer-ptr length)
289 (setf buf-len (length buffer)) ; could change iff possible to re-tune
290 (assert (<= buffer-ptr buf-len) ()
291 "Invalid buffer sizes: ptr ~d, len ~d." buffer-ptr buf-len)
292 (return length)))
293 (t
294 ;; if the frame is a zero-length frame, always skip it
295 (release-frame frame)
296 ;; iff chunking, also end chunking, indicate end of the body and
297 ;; skip next padding frame as well.
298 (typecase (device-state device)
299 (amqp.s:use-channel.body.input.chunked
300 (setf (device-state device) amqp.s:use-channel.body.input)
301 (cond ((setf frame (get-read-frame device :wait t)) ; resolve it now
302 (assert (and (eq (frame-type-class-name frame) 'amqp:body)
303 (= (frame-size frame) (- body-length body-position))) ()
304 "Invalid pad frame: ~s." frame)
305 (setf body-length body-position) ; 'truncate' the body
306 (release-frame frame)
307 (return (setf buffer-ptr -1)))
308 (t
309 (return (setf buffer-ptr -1))))))))
310 (return (when blocking (setf buffer-ptr -1))))))))))
311
312
313 (defmethod device-write ((device amqp:channel) buffer-arg start end (blocking t))
314 "Channels read a frame at a time through a connection.
315 The connection manages the actual stream and writes frames as required. The
316 specified 'blocking' mode has no affect on output."
317 (assert-device-state device use-channel.body device-write)
318 (with-slots (out-buffer outpos max-out-pos) device
319 (cond (buffer-arg
320 ;; if a buffer is provided, use it+bounds together with the devices buffer+bounds
321 ;; to iteratively empty the argument buffer. recurse for progressive output
322 (let ((total-count 0))
323 (loop (when (>= start end)
324 (return))
325 (let* ((count (min (- end start) (- max-out-pos outpos)))
326 (source-start start) (source-end (+ source-start count))
327 (buffer-start outpos) (buffer-end (+ outpos count)))
328 (setf start source-end
329 outpos buffer-end)
330 (when (> (+ (device-body-position device) count)
331 (device-body-length device))
332 (error "DEVICE-WRITE: output exceeds content length: ~s, ~s."
333 (+ (device-body-position device) count)
334 (device-body-length device)))
335 (replace out-buffer buffer-arg :start1 buffer-start :end1 buffer-end :start2 source-start :end2 source-end)
336 (incf total-count count)
337 (if (>= outpos max-out-pos)
338 ;; flush the buffer
339 (let ((result (device-write device nil 0 nil blocking)))
340 (when (minusp result)
341 (return-from device-write result)))
342 (incf (device-body-position device) count))))
343 total-count))
344 (t
345 (assert (and (zerop start) (or (null end) (= end (length out-buffer)))) ()
346 "Frame buffer i/o permitted for entire buffers only.")
347 (device-flush device)))))
348
349
350
351 (defmethod device-flush ((device amqp:channel) &optional (complete nil))
352 "Push data to the channel'c connection.
353 DEVICE : amqp:channel : an open channel
354 COMPLETE : boolean : iff true, an in-progress chunked body stream is closed.
355
356 Given a buffer, its content is passed through the channel's
357 frame buffers. Lacking a buffer, the exisitng frame buffer is sent. The effect is to support
358 single-frame commands, sequence-based stream io, and buffer based stream io. The device state is checked to
359 confirm an operation makes sense.
360
361 On framed content, streams, and chunking:
362 there are three aspects:
363 * is there buffered content: (zerop outpos) ?
364 * is the content complete?
365 * was the content length known ahead of time: .output or .output.chunked ?
366
367 if there is content, wrap up the body frame and send it.
368 given the now empty buffer, if the content body complete, then it matters whether the length was
369 predetermined. if it was, that is, in state .output, since this is not an abort, sufficient
370 frames must be sent to achieve the content length. if the length was not predtermined, then the
371 state is .output.chunked and the end of the sequence of commands is indicated by sending a
372 zero-length body frame and then padding to fill the content length. shoudl this have been an
373 empty buffer, then the zero-length frame will be followed by a full length pad."
374
375 (let ((result-length 0))
376 (typecase (device-state device)
377 (amqp.s:use-channel.body.output
378 (with-slots (out-buffer outpos max-out-pos body-position body-length) device
379 (flet ((flush-frame ()
380 (let* ((frame (claim-output-frame device))
381 (length outpos))
382 (rotatef out-buffer (frame-data frame))
383 (setf-frame-type-class-name 'amqp:body frame)
384 (setf-frame-cycle 0 frame)
385 (setf-frame-channel-number (channel-number device) frame)
386 (setf-frame-track-number (channel-track device) frame)
387 (setf-frame-size outpos frame)
388 (put-encoded-frame device frame)
389 (setf outpos 0)
390 (setf max-out-pos (length out-buffer))
391 length)))
392 ;; check whether there is anything to flush
393 (when (or (plusp outpos) (zerop body-length)) ; always at least one frame
394 ;; if there is content, send the frame out.
395 (setf result-length (flush-frame)))
396 ;; check completion
397 (cond (complete
398 (typecase (device-state device)
399 (amqp.s:use-channel.body.output.chunked
400 ;; if the content was chunked, send a zero-length frame and refert to .output
401 (amqp:log :debug device "Ending chunking. padding ~d bytes..."
402 (- body-length body-position))
403 (flush-frame)
404 (setf (device-state device) amqp.s:use-channel.body.output)))
405 ;; now send frames to fill the difference between content-position and
406 ;; content-length - as many as needed
407 (do ((count (min (- body-length body-position) max-out-pos)
408 (min (- body-length body-position) max-out-pos)))
409 ((<= count 0))
410 (fill out-buffer 0 :start 0 :end count)
411 (setf outpos count)
412 (flush-frame)
413 (incf body-position count))
414 (amqp:log :debug device "Ended padding."))
415 (t
416 (typecase (device-state device)
417 (amqp.s:use-channel.body.output.chunked
418 ;; if so, need to send a new command:
419 ;; send a new publish, reset the buffer positions, and continue streaming
420 (let ((basic (channel.basic device)))
421 (amqp:log :debug basic "Starting next chunk: publish...")
422 (amqp:send-publish basic :exchange (basic-exchange basic))
423 (setf outpos 0
424 max-out-pos (length out-buffer)
425 ;; start a new body
426 body-length (class-body-size basic)
427 body-position 0)
428 (amqp:log :debug basic "Starting next chunk: header...")
429 (send-header basic)
430 (amqp:log :debug basic "Starting next chunk: done."))))))
431 result-length))))))
432
433
434 (defmethod device-clear-input ((device amqp:channel) buffer-only)
435 ;;; clear a possible pushed character and "empty" buffer.
436 ;;; unless buffer-only, also flush any not yet read frames until
437 ;;; the end of the body is reached
438 (with-slots (decoder buffer buffpos buffer-ptr body-position body-length) device
439 (funcall decoder #'(lambda (s) (declare (ignore s)) 0) device)
440 ;; skip over anything already in the buffer
441 (setf body-position (+ body-position (- buffer-ptr buffpos)))
442 (setf buffpos buffer-ptr)
443 ;; optionally drain pending input
444 (unless buffer-only
445 ;; flush input
446 (amqp:log :debug device "device-clear-input: drain expected frames: state: ~a, at ~s of ~s"
447 (device-state device) body-position body-length)
448 (loop (when (>= body-position body-length)
449 (return))
450 (unless (plusp (device-read device nil 0 nil t))
451 (return))
452 (incf body-position buffer-ptr)))
453 nil))
454
455
456 (defmethod device-buffer-length ((device amqp:channel))
457 (let ((connection (channel-connection device)))
458 (if connection
459 (device-buffer-length connection)
460 0)))
461
462
463 (defmethod (setf device-file-position) (position (device amqp:channel))
464 (declare (ignore position))
465 (device-body-length device))
466
467
468 (defmethod device-file-length ((device amqp:channel))
469 (device-body-length device))
470
471
472 (defmethod (setf device-file-length) (length (device amqp:channel))
473 (declare (ignore length))
474 nil)
475
476
477
478 ;;;
479 ;;; connection operations
480
481
482 (defmethod device-open ((device amqp:connection) #-sbcl (slots t) initargs)
483 "Prepare a connection by opening a socket to broker, negotiating the
484 protocol parameter, and opening the virutal host."
485
486 (etypecase (device-state device)
487 (amqp.s:open-connection
488 (if (or (stream-input-handle device)
489 (stream-output-handle device))
490 (call-next-method)
491 (destructuring-bind (&key (uri (connection-uri device))
492 (version (class-protocol-version device))
493 (direction :io)
494 &allow-other-keys)
495 initargs
496 ;; merge the host and port information from/to the uri
497 (let ((remote-host (uri-host uri))
498 (remote-port (or (uri-port uri) *standard-port*)))
499
500 (ecase direction
501 (:probe (call-next-method device #-sbcl slots
502 (list* :remote-host remote-host
503 :remote-port remote-port
504 :direction :probe
505 initargs)))
506 (:io (when (call-next-method device #-sbcl slots
507 (list* :remote-host remote-host
508 :remote-port remote-port
509 :direction :io
510 initargs))
511 (when (open-connection device :version version)
512 ;; once the concrete class is fixed, initialize the buffer
513 (device-initialize-buffers device)
514 (negotiate-client-connection device)
515 t))))))))
516 (amqp.s:use-connection
517 (call-next-method))))
518
519
520 (defmethod device-close ((device amqp:connection) (abort t))
521 (map nil #'(lambda (c) (when c (device-close c abort)))
522 (get-connection-channels device))
523 (if abort
524 (call-next-method)
525 (typecase (shiftf (device-state device) amqp.s:close-connection)
526 ;; never succeeded to open
527 (amqp.s:open-connection
528 (call-next-method))
529 ;; if it's in use, perform a protocol close, then flush that data
530 ;; close the stream, and reset to the initial state
531 (amqp.s:use-connection
532 (amqp:request-close device)
533 (device-flush device t)
534 (multiple-value-prog1 (call-next-method)
535 (setf (connection-state device) amqp.s:open-connection)))
536 ;; otherwise, the protocol close is already in progress;
537 ;; just flush and close the stream
538 (t
539 (device-flush device t)
540 (call-next-method)))))
541
542
543
544
545 (defmethod device-read ((device amqp:connection) buffer-arg start end (blocking t))
546 "Connections permit stream io only if there is just one channel read a frame at a time through a connection.
547 the connection manages the actual stream and makes frames available as
548 they appear. the specified 'blocking' mode determines whether to
549 wait of there is noting present."
550
551 (assert-device-state device use-connection amqp.s:use-channel.body.input)
552 (with-slots (buffer buffpos buffer-ptr buf-len) device
553 (if (or (< buffer-ptr 0)
554 (>= (device-body-position device) (device-body-length device)))
555 -1
556 (cond (buffer-arg
557 ;; if a buffer is provided, use it+bounds together with the devices buffer+bounds
558 ;; to iteratively fill the argument buffer. recurse for more input
559 (let ((total-count 0))
560 (loop (let* ((count (min (- end start) (- buffer-ptr buffpos)))
561 (start1 start) (end1 (+ start1 count))
562 (start2 buffpos) (end2 (+ buffpos count)))
563 (setf start end1
564 buffpos end2)
565 (replace buffer-arg buffer :start1 start1 :end1 end1 :start2 start2 :end2 end2)
566 (incf total-count count)
567 (when (>= start end)
568 (incf (device-body-position device) count)
569 (return))
570 ;; read more
571 (let ((result (device-read device nil 0 nil blocking)))
572 (when (minusp result)
573 (return-from device-read result)))))
574 total-count))
575 (t
576 ;; otherwise read a frame buffer
577 (assert (and (zerop start) (null end)) ()
578 "Frame buffer i/o permitted for entire buffers only.")
579 (loop (let ((frame (get-read-frame device :wait blocking)))
580 (if frame
581 (when (plusp (length frame))
582 (let* ((data (frame-data frame))
583 (length (frame-size frame)))
584 (rotatef buffer data)
585 (setf-frame-data data frame)
586 (release-frame frame)
587 (setf buffpos 0)
588 (setf buffer-ptr length)
589 (setf buf-len (length buffer))
590 (assert (<= buffer-ptr buf-len) ()
591 "Invalid buffer sizes: ptr ~d, len ~d." buffer-ptr buf-len)
592 (return length)))
593 (return (setf buffer-ptr -1))))))))))
594
595
596 ;;; this duplicates most of the channel version, but for the buffer source
597 (defmethod device-write ((device amqp:connection) buffer-arg start end (blocking t))
598 (assert-device-state device :use-connection device-write)
599 (with-slots (out-buffer outpos max-out-pos) device
600 (cond (buffer-arg
601 ;; if a buffer is provided, use it+bounds together with the devices buffer+bounds
602 ;; to iteratively empty the argument buffer. recurse for progressive output
603 (let ((total-count 0))
604 (loop (when (>= start end)
605 (return))
606 (let* ((count (min (- end start) (- max-out-pos outpos)))
607 (source-start start) (source-end (+ source-start count))
608 (buffer-start outpos) (buffer-end (+ outpos count)))
609 (setf start source-end
610 outpos buffer-end)
611 (when (> (+ (device-body-position device) count)
612 (device-body-length device))
613 (error "DEVICE-WRITE: output exceeds content length: ~s, ~s."
614 (+ (device-body-position device) count)
615 (device-body-length device)))
616 (replace out-buffer buffer-arg :start1 buffer-start :end1 buffer-end :start2 source-start :end2 source-end)
617 (incf total-count count)
618 (if (>= outpos max-out-pos)
619 ;; flush the buffer
620 (let ((result (device-write device nil 0 nil blocking)))
621 (when (minusp result)
622 (return-from device-write result)))
623 (incf (device-body-position device) count))))
624 total-count))
625 (t
626 (assert (and (zerop start) (null end)) ()
627 "Frame buffer i/o permitted for entire buffers only.")
628 (device-flush device)))))
629
630 (defmethod device-flush ((device amqp:connection) &optional complete-p)
631 (declare (ignore complete-p))
632 (typecase (device-state device)
633 (amqp.s:use-connection
634 (with-slots (out-buffer outpos max-out-pos) device
635 (when (plusp outpos)
636 (let* ((frame (claim-output-frame device))
637 (length outpos))
638 (rotatef out-buffer (frame-data frame))
639 (setf-frame-type-class-name 'amqp:body frame)
640 (setf-frame-cycle 0 frame)
641 (setf-frame-channel-number (channel-number device) frame)
642 (setf-frame-track-number (channel-track device) frame)
643 (setf-frame-size outpos frame)
644 (put-encoded-frame device frame)
645 (setf outpos 0)
646 (setf max-out-pos (length out-buffer))
647 length))))))
648
649 (defmethod device-buffer-length ((device amqp:connection))
650 "Until the connection has been specialized for the protocol, propose the full frame
651 as the buffer size."
652 (let ((frame-class (connection-input-frame-class device)))
653 (- (connection-frame-size device)
654 (if frame-class
655 (frame-overhead (allocate-instance (find-class frame-class)))
656 0))))
657
658 (defmethod device-file-position ((device amqp:connection))
659 (device-body-position device))
660
661 (defmethod (setf device-file-position) (position (device amqp:connection))
662 (declare (ignore position))
663 nil)
664
665 (defmethod device-file-length ((device amqp:connection))
666 (device-body-length device))
667
668 (defmethod (setf device-file-length) (length (device amqp:connection))
669 (declare (ignore length))
670 nil)
671
672
673
674 (defmethod open-connection ((connection amqp:connection)
675 &key (version (class-protocol-version connection))
676 (attempt 1))
677 "Given a CONNECTION and an optional VERSION, open a socket to the respective
678 host/port, negotiate the version and process any initial frame(s).
679
680 CONNECTION : amqp:connection : an initialized connection w/o a socket. May be
681 an abstract connection
682 :VERSION : amqp:version : Intended version
683 VALUES : (or stream null) : the open stream if succeeded
684 amqp:version : the negotiated version
685
686 The socket is opened and an attempt is made to read a permitted version. The
687 criteria is, whether a connection class is defined which supports the version.
688 If so, the given connection instance's class is changed to agree.
689 In addition, a possible initial frame is read and queued."
690
691 (let ((buffer-out (make-array 8 :element-type 'unsigned-byte))
692 (buffer-in (make-array 8 :element-type 'unsigned-byte))
693 (version-received nil)
694 (byte-zero 0))
695 (labels ((negotiation-failed (&optional reason)
696 (error "Connection negotiation failed~@[ (~a)~]: requested ~s, class ~s, received ~s"
697 reason
698 version (class-protocol-version connection) version-received))
699 (update-connection-class (version-received)
700 (let ((new-class (amqp:find-protocol-class connection version-received)))
701 (unless new-class
702 (negotiation-failed "Unsupported protocol version"))
703 (cond ((eq new-class (class-of connection)))
704 ((eql attempt 1)
705 (change-class connection new-class)
706 (amqp:log :debug connection "open-connection: updated class to ~s."
707 (type-of connection)))
708 (t
709 (negotiation-failed "Re-negotiation failed to match")))))
710 (cycle-socket ()
711 (let* ((uri (device-uri connection))
712 (remote-host (uri-host uri))
713 (remote-port (or (uri-port uri) *standard-port*)))
714 (setf (device-socket connection)
715 (usocket:socket-connect remote-host remote-port
716 :element-type 'unsigned-byte)))))
717
718 (setf (buffer-protocol-header-version buffer-out) version)
719 (amqp:log :debug connection "open-connection: requesting version: ~s/~s."
720 buffer-out version)
721 (write-sequence buffer-out (stream-output-handle connection))
722 (force-output (stream-output-handle connection))
723 (case (setf byte-zero (read-byte (stream-input-handle connection)))
724 ;; the later protocols reply with a version to confirm, but
725 ;; the early ones just send the start frame immediately
726 (#.(char-code #\A)
727 ;; if a protocol header is returned, if the version matches, ok
728 ;; otherwise close the connection and return the version
729 (setf (aref buffer-in 0) #.(char-code #\A))
730 (cond ((= 8 (read-sequence buffer-in (stream-input-handle connection) :start 1))
731 (setf version-received (buffer-protocol-header-version buffer-in nil))
732 (amqp:log :debug connection "open-connection: parsed version: ~s / ~s."
733 buffer-in version-received)
734 ;; negotiate the protocol
735 (cond (version-received
736 ;; use received versions to specialize the given instance
737 (update-connection-class version-received)
738 ;; cycle the socket and retry to connect
739 (cycle-socket)
740 (open-connection connection :attempt (1+ attempt) :version version-received))
741 (t
742 (negotiation-failed (format nil "Unsupported protocol header: ~a." buffer-in)))))
743 (t
744 (negotiation-failed "Incomplete protocol header"))))
745 (t
746 ;; version accepted w/ immediate connection-open
747 ;; still, update to change from abstract to concrete class
748 (amqp:log :debug connection "open-connection: byte-zero: ~s, connection class ~s."
749 byte-zero (type-of connection))
750 (setf version-received version)
751 (update-connection-class version)
752
753 ;; this had to wait until the connection had been transformed in to a concrete class
754 (let ((frame (claim-input-frame connection)))
755 (setf (aref (frame-header frame) 0) byte-zero)
756 ;; (print :no-header)
757 (read-frame connection frame :start 1)
758 ;; make channel-zero and prime it with the first frame
759 (amqp:connection.channel connection :number 0)
760 (put-read-frame connection frame)
761 (values version frame)))))))
762
763
764 (document (negotiate-client-connection open-connection)
765 "AMQP connection negotiation occurs in two steps. First, the peers agree on a protocol version. Second
766 they exchange authentication and control information to set up the connection. The first step is
767 implemented by open-connection. It negotiates with the broker to agree on a supported protocol version
768 and changes the connection instance's class to that of the implementation class for that version. For
769 some protocol versions, it is also confronted with the initial frame, which it buffers for the
770 configuration step.
771
772 The second step, authentication and configuration, is implemented in negotiate-client-connection. It
773 exchanges connection commands with the broker for authentication and tuning. The configured connection is
774 returned.")
775
776
777
778 (defmethod negotiate-client-connection ((device amqp:connection) &key (retry-limit 2))
779 "Negotiate security and open a virtual host:
780 - construct a channel-zero instance.
781 - go through the handshake w/ start, secure, tune comamnds.
782 - open channel-zero for the connection's host. "
783
784 (let ((channel (amqp:connection.channel device :number 0)))
785 (command-case (channel)
786 (amqp:start ((class amqp:connection) &rest args)
787 (setf (connection-state device) amqp.s:open-connection.start)
788 (apply #'channel-respond-to-start channel device args)
789 t)
790 (t ((class t) &rest args)
791 (error "Invalid negotiation command: ~s~{ ~s~}." class args)))
792
793 (dotimes (x retry-limit (error "Security negotiation failed."))
794 (command-case (channel)
795 (amqp:secure ((class amqp:connection) &rest args)
796 (setf (connection-state device) amqp.s:open-connection.secure)
797 (apply #'channel-respond-to-secure channel class args)
798 t)
799 (amqp:tune ((class amqp:connection) &rest args)
800 (setf (connection-state device) amqp.s:open-connection.tune)
801 (apply #'channel-respond-to-tune channel class args)
802 (return))))
803
804 (setf (connection-state device) amqp.s:open-connection.host)
805 (channel-request-open channel device :virtual-host (connection-virtual-host device))
806 (setf (connection-state device) amqp.s:use-connection)
807
808 device))
809
810
811 (document (device-read-content device-write-content)
812 "The content processing interface comprises the two operators
813 * device-read-message (channel &rest)
814 * device-write-message (channel body &rest)
815
816 Each accepts the keywords which apply to the respctive protocol operation, that is, any method arguments and the class'
817 header properties. for reading this means the arguments for get and deliver, while for writing those for publish.
818 The interface supports two use patterns : body instances and continuation based. The decision is made by the writer
819 according to whether the body size is known at the outset. For fixed length vectors this is true. For aeverything else, it
820 is not. Where it is known, a single content-bearing command is sent. Where is it not known, A sequence of commands is
821 sent until the writer terminates the stream by indicating completion in a call to device-flush.
822
823 Streams are broken into and reconstituted from the three frame constituents for a command : method, header, and body.
824 * a method frame : is emitted by a request operator and parsed and processed by the respond-to- operator to cache the
825 arguments in the channel. The protocol interface operator then invokes device-read/write-content.
826 * a content header : ion inpt, s parsed in device-read-content/content-header to extract the properties and cache them
827 in the channel's basic class instance. On output, it is generated by device-write-content based on the channel's current
828 basic instance
829 * the content body : is parsed based on the channel's internal type combined with the content type. It is generated
830 based on the bassed body instance in combination with the content type /encoding.
831
832 [ channel-request-publish channel-respond-to-get-ok channel-respond-to-deliver ]
833 --> [ device-read-content device-write-content]
834 --> [ device-read-content-header device-write-content-header ]
835 --> [ device-read-content-body device-write-content-body ]
836 ")
837
838 (defgeneric device-read-content (channel &key delivery-tag redelivered exchange routing-key
839 ;; from get-ok only
840 message-count
841 ;; from deliver only
842 consumer-tag
843 )
844 (:documentation "Given a channel which has received a Basic.Deliver or Basic.Get/Get-ok,
845 firat, prepare the channel based on the content header properties, and read the content
846 according to the combined channel data type and content type. Combine the header's possibly
847 incomplete content type with the channel's to specify the effective decoding.")
848
849 (:method ((channel amqp:channel) &key body delivery-tag redelivered exchange routing-key message-count consumer-tag)
850 (declare (ignore delivery-tag redelivered exchange routing-key message-count consumer-tag))
851 (setf (channel-state channel) amqp.s:use-channel.body.input)
852 (assert-argument-type device-read-content body (or null function))
853 (let* ((basic (device-read-content-header channel))
854 (headers (amqp:basic-headers basic))
855 (element-type (getf headers :element-type))
856 (package (getf headers :package))
857 (mime-type (class-mime-type basic)))
858 ;; element-type in the basic header combines the read values with the channel's content-type
859 (setf element-type (or (find-symbol element-type package)
860 (error "Invalid type x package combination: ~s, ~s."
861 element-type package)))
862 (amqp:log :debug channel "device-read-content: in (~s ~s) in state ~s x~s"
863 element-type mime-type (channel-state channel) (device-body-length channel))
864 (device-read-content-body channel (or body element-type) mime-type))))
865
866
867 (defgeneric device-read-content-header (channel )
868 (:method ((channel amqp:channel))
869 (command-loop (channel)
870 (amqp:header ((basic amqp:basic) &key frame &allow-other-keys)
871 (declare (ignore frame))
872 ;; merge the header's content type with the stream's
873 (let* ((body-size (class-body-size basic))
874 (headers (amqp:basic-headers basic))
875 (mime-type (mime-type basic)))
876 (assert-argument-type device-read-content-body body-size integer)
877
878 (with-slots (buffer buffer-ptr body-length body-position) channel
879 (unless buffer
880 (device-initialize-input-buffer channel))
881 (setf body-length body-size
882 buffer-ptr 0
883 body-position 0))
884 (update-device-codecs channel mime-type)
885 (setf (channel-state channel)
886 (if (string-equal (getf headers :transfer-encoding) "chunked")
887 amqp.s:use-channel.body.input.chunked
888 amqp.s:use-channel.body.input))
889 (return-from command-loop basic))))))
890
891
892 (defgeneric device-read-content-body (device type content-type)
893
894 (:method ((channel amqp:channel) (type (eql 'string)) (content-type mime:text/plain))
895 (let* ((body-length (device-body-length channel))
896 ;; construct a string with the message content
897 (body (make-string body-length)))
898 (read-sequence body channel :start 0 :end body-length)
899 body))
900
901 (:method ((channel amqp:channel) (body-op function) (content-type mime:*/*))
902 "Given a the null type, just return the channel as a stream to be read."
903 (prog1 (funcall body-op channel content-type)
904 (device-clear-input channel nil)))
905
906 (:method ((channel amqp:channel) (type (eql 'vector)) (content-type mime:application/octet-stream))
907 "Given a the type 'vector, create one and copy the stream content into it."
908 (let* ((body-length (device-body-length channel))
909 (body (make-frame-buffer body-length)))
910 (device-read channel body 0 body-length nil)
911 body))
912
913 (:method ((channel amqp:channel) (channel-type (eql 'list)) (content-type mime::application/sexp))
914 "Given an sexp mime type, then read the form."
915 (let* ((basic (amqp:channel.basic channel))
916 (headers (amqp:basic-headers basic))
917 (package (getf headers :package)))
918 (setf package (or (find-package package)
919 (amqp:syntax-error :channel channel
920 :class-code (amqp:class-id basic)
921 :message-string "Invalid package designator: ~s"
922 :message-arguments (list package)))) ;;;!!! need a text
923 (with-standard-io-syntax
924 (setq *read-eval* nil)
925 (let ((*package* package))
926 (prog1 (read channel)
927 (device-clear-input channel nil))))))
928
929 (:method ((channel amqp:channel) (channel-type (eql 'standard-object)) (content-type mime::application/sexp))
930 (let* ((basic (amqp:channel.basic channel))
931 (headers (amqp:basic-headers basic))
932 (package (getf headers :package)))
933 (setf package (or (find-package package)
934 (amqp:syntax-error :channel channel
935 :class-code (amqp:class-id basic)
936 :message-string "Invalid package designator: ~s"
937 :message-arguments (list package))))
938 (with-standard-io-syntax
939 (let ((*package* package))
940 (setq *read-eval* nil)
941 (let ((form (read channel)))
942 (prog1 (reconstitute form)
943 (device-clear-input channel nil))))))))
944
945 (defun reconstitute (channel-form)
946 "Interpret a received object representation as the two values generated by make-load-form.
947 This version expects a let expression, because that expresses the necessary circularity."
948 (let ((object nil))
949 (destructuring-bind (let ((marker creation)) initialization) channel-form
950 (declare (ignore let))
951 (labels ((simple-eval (form)
952 (typecase form
953 (cons (ecase (first form)
954 (quote (second form))
955 (progn (map nil #'simple-eval (rest form)))
956 ((make-instance initialize-instance allocate-instance reinitialize-instance
957 shared-initialize find-class)
958 (apply (first form) (mapcar #'simple-eval (rest form))))
959 (setf (loop for ((slot-value x name) value) on (rest form) by #'cddr
960 do (progn
961 (assert (eq slot-value 'slot-value) () "invalid form: ~s" form)
962 (setf (slot-value (simple-eval x) (simple-eval name))
963 (simple-eval value)))))))
964 (t form))))
965 (setf object (simple-eval creation))
966 (simple-eval (subst object marker initialization))
967 object))))
968
969 ;(reconstitute '(let ((:m (allocate-instance (find-class 'tc)))) (setf (slot-value ':m 'a) '(as d f))))
970
971
972
973 (defgeneric device-write-content (channel body &rest args
974 &key class-id weight body-size
975 exchange routing-key mandatory immediate
976 content-type content-encoding headers delivery-mode
977 priority correlation-id reply-to expiration message-id timestamp
978 type user-id)
979 (:documentation "Given a channel which has sent a Basic.Publish,
980 firat, write a content header based on the properties, then write the content
981 according to the combined channel data type and content type. Combine the header's possibly
982 incomplete content type with the channel's to specify the effective encoding.")
983 (declare (dynamic-extent args))
984
985 (:method ((channel amqp:channel) body &rest args)
986 ;; configure the respective basic for the (content x element-type x content-type)
987 ;; combination. this resolve the body size, the transfer encoding, and the
988 ;; transfer element type
989 (let* ((basic (apply #'device-write-content-header channel body args)))
990
991 (prog1 (apply #'device-write-content-body channel body (mime-type basic) args)
992 (device-flush channel t)))))
993
994
995 (defgeneric device-write-content-header (channel body
996 &key class-id weight body-size
997 exchange routing-key mandatory immediate
998 content-type content-encoding headers delivery-mode
999 priority correlation-id reply-to expiration message-id timestamp
1000 type user-id)
1001
1002 (:method ((channel amqp:channel) (body t) &rest args)
1003 (let* ((basic (apply #'amqp:channel.basic channel :body body args))
1004 (body-size (class-body-size basic))
1005 (headers (amqp:basic-headers basic))
1006 (mime-type (mime-type basic)))
1007 (with-slots (out-buffer max-out-pos outpos body-length body-position) channel
1008 (unless out-buffer
1009 (device-initialize-output-buffer channel))
1010 (setf body-length body-size
1011 body-position 0))
1012 (update-device-codecs channel mime-type)
1013 (setf (channel-state channel)
1014 (if (string-equal (getf headers :transfer-encoding) "chunked")
1015 amqp.s:use-channel.body.output.chunked
1016 amqp.s:use-channel.body.output))
1017 (send-header basic)
1018 basic)))
1019
1020
1021 (defgeneric device-write-content-body (device body content-type
1022 &key
1023 body-size class-id consumer-tag content-type content-encoding correlation-id
1024 delivery-mode delivery-tag exchange expiration headers immediate
1025 mandatory message-count
1026 priority routing-key reply-to message-id
1027 redelivered timestamp type user-id weight)
1028
1029 (:method ((channel amqp:channel) (body null) (content-type mime:*/*) &rest args)
1030 "Given a null body , configure the channel to write the message body
1031 and return the stream."
1032 (declare (dynamic-extent args) (ignore args))
1033 nil)
1034
1035 (:method ((channel amqp:channel) (body string) (content-type mime:text/plain) &rest args)
1036 (declare (dynamic-extent args) (ignore args))
1037
1038 ;; write the content,
1039 (stream-write-string channel body 0 nil))
1040
1041 (:method ((channel amqp:channel) (body-op function) (content-type mime:*/*) &rest args)
1042 (declare (dynamic-extent args) (ignore args))
1043
1044 ;; call the function
1045 (funcall body-op channel content-type))
1046
1047 (:method ((channel amqp:channel) (body vector) (content-type mime:application/octet-stream) &rest args)
1048 "Given a the type 'vector, create one and copy the stream content into it."
1049 (declare (dynamic-extent args) (ignore args))
1050
1051 (device-write channel body 0 (length body) t))
1052
1053 (:method ((channel amqp:channel) (body cons) (content-type mime::application/sexp) &rest args)
1054 "Given an sexp mime type, set up the stream, then read the form."
1055 (declare (dynamic-extent args) (ignore args))
1056 (with-standard-io-syntax
1057 (write body :stream channel :circle t)))
1058
1059 (:method ((channel amqp:channel) (body standard-object) (content-type mime::application/sexp) &rest args)
1060 "Given an sexp mime type, set up the stream, then read the form."
1061 (declare (dynamic-extent args) (ignore args))
1062
1063 (multiple-value-bind (creation initialization)
1064 (make-load-form body)
1065 (with-standard-io-syntax
1066 (let ((marker (list :object)))
1067 (let ((form `(let ((,marker ,creation))
1068 ,(subst marker body initialization))))
1069 (write form :stream channel :circle t)))))))

  ViewVC Help
Powered by ViewVC 1.1.5