/[de-setf-amqp]/stream.lisp
ViewVC logotype

Contents of /stream.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 18029 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
5 (document :file
6 (description "This file defines a stream interface for AMQP channel and connection instances for the
7 'de.setf.amqp' library.")
8 (copyright
9 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
10 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
11 of the GNU Affero General Public License as published by the Free Software Foundation.
12
13 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
14 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 See the Affero General Public License for more details.
16
17 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
18 If not, see the GNU [site](http://www.gnu.org/licenses/).")
19
20 (long-description "The goal here is to hook the channel/connection stream operations into standard
21 stream operations. this file implements
22
23 - gray stream compatibility
24 - stream-ty* support
25
26 the original "gray streams" proposal source[1] has not been forthcoming, but there is a copy on the clozure
27 ftp site.[2] kmp has notes[3], corman lisp publishes an implementation[4], and clim discusses in interim gray
28 streams implementation[5], to be used in the interim, until a real implementation is available.
29
30
31 the stream uses a channel's the dual-channel-simple-stream device implementation to manage body length,
32 position and buffering together with the its operators for character coding. the interface
33 supports (vector (unsigned byte 8)) and string element types and (vector (unsigned byte 8))
34 i/o buffers. codecs are always present - even for ascii, and are used to implement unread-char.
35
36 the protocol is
37 on input:
38
39 buffer is the buffer
40 buf-len is the full buffer size
41 buffer-ptr is the length of read data, which is given by the frame payload size
42 buffpos is the position from which to take the next byte
43 body-position is that position wrt the entire body
44 body-length is the length of the entire body
45
46 the decoders maintain buffpos and body-position, and use the relation between buffpos
47 and buffer-ptr to refresh the buffer iff needed. if buff-pr is negative, that indicates that eof
48 has already been reached. the relation between body-position and body-size can indicate
49 the end of the current content, depends on whether the channel expects chunked content.
50 (see device-read)
51
52 on output:
53
54 out-buffer is the buffer
55 max-out-pos is the full buffer size
56 outpos is the position at which to put the next byte
57 body-position is that position wrt the entire body
58 body-length is the length of the entire body
59
60 the encoders maintian outpos and body-position and use the relation between outpos and
61 max-out-pos to flush the buffer iff needed. the relation between body-position and
62 body-size indicates the end of intended content and is used to effect chunking. (see device-write)
63
64 body-position and body-length can be multiplexed between input and output so long as the channels
65 remain half-duplex. the application must adhere to the single-channel per process constraint and
66 limit stream i/o to device-read/write-content or the respetive request-publish/get. should interleaved
67 input and output be required, the channel will need distinct state for each direction.
68
69 ---
70 [1]: ftp://parcftp.xerox.com/pub/cl/cleanup/mail/stream-definition-by-user.mail
71 [2]: ftp://ftp.clozure.com/stream-definition-by-user.mail
72 [3]:http://www.nhplace.com/kent/CL/Issues/stream-definition-by-user-notes.html
73 [4]: http://www.grumblesmurf.org/corman-patches/modules/gray-streams.lisp
74 [5]: http://www.cs.cmu.edu/afs/cs/project/clisp/hackers/phg/clim/src/utils/cl-streams.lisp"))
75
76
77 ;;;
78 ;;; input
79
80 (defmethod stream-read-char ((stream amqp-device))
81 (with-slots (decoder) stream
82 (flet ((buffer-extract-byte (stream)
83 (with-slots (buffer buffpos buffer-ptr body-position body-length) stream
84 (when (or (< buffpos buffer-ptr)
85 (and (not (minusp buffer-ptr))
86 ; resets buff-pos / renews buffer content unless eof
87 (plusp (device-read stream nil 0 nil t))))
88 (incf body-position)
89 (prog1 (aref buffer buffpos)
90 (incf buffpos))))))
91 (or (funcall decoder #'buffer-extract-byte stream) (stream-eof-marker stream)))))
92
93 ;;;!!! for multi-byte encodings the content-encoding needs to supply this
94 ;;;!!! in order that the position is properly maintained.
95 ;;;!!! it's also not clear what happens when this backs up over a frame boundary
96 (defmethod stream-unread-char ((stream amqp-device) char)
97 (with-slots (buffpos body-position) stream
98 (let ((old-decoder (device-decoder stream)))
99 (flet ((unread-decoder (byte-decoder stream)
100 (declare (ignore byte-decoder))
101 (with-slots (buffpos body-position) stream
102 (setf-device-decoder old-decoder stream)
103 (incf body-position)
104 (incf buffpos)
105 char)))
106 (decf body-position)
107 (decf buffpos)
108 (setf-device-decoder #'unread-decoder stream)
109 nil))))
110
111
112 (defmethod stream-read-char-no-hang ((stream amqp-device))
113 (with-slots (body-position body-length) stream
114 (if (< body-position body-length)
115 (when (stream-listen (stream-input-handle stream))
116 (stream-read-char stream))
117 (stream-eof-marker stream))))
118
119
120 (defmethod stream-peek-char ((stream amqp-device))
121 (with-slots (decoder) stream
122 ;; cannot do this manipulating buffer position, as a multi-byte encoding
123 ;; might cross a frame boundary
124 (flet ((buffer-extract-byte (stream)
125 (with-slots (buffer buffpos buffer-ptr body-position body-length) stream
126 (when (or (< buffpos buffer-ptr)
127 (and (not (minusp buffer-ptr))
128 ; resets buff-pos / renews buffer content unless eof
129 (plusp (device-read stream nil 0 nil t))))
130 (incf body-position)
131 (prog1 (aref buffer buffpos)
132 (incf buffpos))))))
133 (let ((char (funcall decoder #'buffer-extract-byte stream)))
134 (cond (char
135 (stream-unread-char stream char)
136 char)
137 (t
138 (stream-eof-marker stream)))))))
139
140
141 (defmethod stream-listen ((stream amqp-device))
142 ;; sbcl has no stream-listen for the input stream
143 (device-listen stream))
144
145
146 (defmethod stream-read-line ((stream amqp-device))
147 (with-slots (decoder) stream
148 (let ((eol-char (stream-eol-marker stream))
149 (line (stream-line-buffer stream)))
150 (setf (fill-pointer line) 0)
151 (flet ((buffer-extract-byte (stream)
152 (with-slots (buffer buffpos buffer-ptr body-position body-length) stream
153 (when (or (< buffpos buffer-ptr)
154 (and (not (minusp buffer-ptr))
155 ; resets buff-pos / renews buffer content unless eof
156 (plusp (device-read stream nil 0 nil t))))
157 (incf body-position)
158 (prog1 (aref buffer buffpos)
159 (incf buffpos))))))
160 (loop (let ((char (funcall decoder #'buffer-extract-byte stream)))
161 (cond ((eql char eol-char)
162 (return (copy-seq line)))
163 (char
164 (vector-push-extend char line))
165 (t
166 (return (values (copy-seq line) (stream-eof-marker stream)))))))))))
167
168
169 (defmethod stream-clear-input ((stream amqp-device))
170 (device-clear-input stream t))
171
172
173 (defmethod stream-line-column ((stream amqp-device))
174 nil)
175
176
177 (defmethod stream-start-line-p ((stream amqp-device))
178 nil)
179
180
181 (defun amqp-stream-write-char (stream character)
182 (with-slots (encoder) stream
183 (flet ((buffer-insert-byte (stream byte)
184 (with-slots (out-buffer outpos max-out-pos body-position body-length encoder) stream
185 (setf (aref out-buffer outpos) byte)
186 (incf body-position)
187 (incf outpos)
188 (when (>= outpos max-out-pos)
189 ;; resets outpos / flushes out-buffer content
190 (device-write stream nil 0 nil t)))))
191 (funcall encoder character #'buffer-insert-byte stream))
192 character))
193
194 (defmethod stream-write-char ((stream amqp-device) character)
195 (amqp-stream-write-char stream character))
196
197
198 (defun amql-stream-write-string (stream string start end)
199 (with-slots (encoder) stream
200 (flet ((buffer-insert-byte (stream byte)
201 (with-slots (out-buffer outpos max-out-pos body-position body-length) stream
202 (setf (aref out-buffer outpos) byte)
203 (incf body-position)
204 (incf outpos)
205 (when (>= outpos max-out-pos)
206 (device-write stream nil 0 nil t)))))
207 (unless start (setf start 0))
208 (unless end (setf end (length string)))
209 (do ((i start (1+ i)))
210 ((>= i end))
211 (funcall encoder (char string i) #'buffer-insert-byte stream)))
212 string))
213
214 (defmethod stream-write-string ((stream amqp-device) string #-mcl &optional start end)
215 (amql-stream-write-string stream string start end))
216
217
218 (defmethod stream-terpri ((stream amqp-device))
219 (stream-write-char stream #\newline))
220
221
222 ;; stream-fresh-line :
223 ;; default suffices
224
225
226 (defmethod stream-finish-output ((stream amqp-device))
227 "Force output and delegate the finsh to the output stream."
228 (stream-force-output stream)
229 (stream-finish-output (stream-output-handle stream)))
230
231
232 (defmethod stream-force-output ((stream amqp-device))
233 "Flush the current buffer, w/o checking for content. (see device-flush)"
234 (device-flush stream))
235
236
237 (defmethod stream-clear-output ((stream amqp-device))
238 "Eliminate nything in the present buffer."
239 (with-slots (outpos body-position) stream
240 (when (plusp outpos)
241 ;; back up in the global stream
242 (decf body-position outpos)
243 ;; reset the current frame
244 (setf outpos 0))))
245
246
247 (defmethod stream-advance-to-column ((stream amqp-device))
248 nil)
249
250
251 ;;; close :
252 ;;; see extremely-simple-streams.lisp
253
254
255 #-mcl
256 (defmethod open-stream-p ((stream amqp-device))
257 "This replicates the mcl definition, so that s single stream-direction suffices."
258
259 (not (eql (stream-direction stream) :closed)))
260
261 (defmethod stream-direction ((stream amqp-device))
262 "A device's direction depends on the state of its handles."
263 (if (stream-input-handle stream)
264 (if (stream-output-handle stream)
265 :io
266 :input)
267 (if (stream-output-handle stream)
268 :output
269 :closed)))
270
271
272 #+ccl
273 (defmethod stream-eofp ((stream amqp-device))
274 (with-slots (body-length body-position) stream
275 (>= body-position body-length)))
276
277 ;; input-stream-p :
278 ;; mcl version is not generic, it is based on direction, which suffices
279
280 ;; output-stream-p :
281 ;; mcl version is not generic, it is based on direction, which suffices
282
283 (defmethod stream-element-type ((stream amqp-device))
284 (device-element-type stream))
285
286
287 ;; pathname : NYI
288
289
290 ;; truename : NYI
291
292
293 (defmethod stream-read-byte ((stream amqp-device))
294 (with-slots (buffer buffpos buffer-ptr body-position) stream
295 (if (or (< buffpos buffer-ptr)
296 (and (not (minusp buffer-ptr))
297 ; resets buff-pos / buffer content unless eof
298 (plusp (device-read stream nil 0 nil t))))
299 (prog1 (aref buffer buffpos)
300 (incf buffpos)
301 (incf body-position))
302 (stream-eof-marker stream))))
303
304
305 (defmethod stream-write-byte ((stream amqp-device) byte)
306 "Add the byte at the current buffer position. If either the buffer is
307 full, or the stream length is reached, write the buffer."
308 (with-slots (out-buffer outpos max-out-pos body-position body-length) stream
309 (setf (aref out-buffer outpos) byte)
310 (incf body-position)
311 (incf outpos)
312 (when (>= outpos max-out-pos)
313 (device-write stream nil 0 nil t))
314 byte))
315
316
317
318 (defmethod stream-read-sequence ((stream amqp-device) (sequence vector)
319 #+ccl &key #-ccl &optional
320 (start 0) end)
321 (setf end (or end (length sequence)))
322 (let ((count (device-read stream sequence start end nil)))
323 (when (plusp count) (+ start count))))
324
325
326
327
328 (defmethod stream-read-sequence ((stream amqp-device) (sequence string)
329 #+ccl &key #-ccl &optional
330 (start 0) end)
331 "Arrange to read bytes from the stream buffer, construct characters, and
332 return the the next position Iff the first byte read shows eof, return nil."
333 (setf end (or end (length sequence)))
334 (with-slots (decoder) stream
335 (flet ((buffer-extract-byte (stream)
336 (with-slots (buffer buffpos buffer-ptr body-position) stream
337 (when (or (< buffpos buffer-ptr)
338 (and (not (minusp buffer-ptr))
339 ; resets buff-pos / buffer content unless eof
340 (plusp (device-read stream nil 0 nil t))))
341 (prog1 (aref buffer buffpos)
342 (incf body-position)
343 (incf buffpos))))))
344 (if (> end start)
345 (let ((char (funcall decoder #'buffer-extract-byte stream)))
346 (when char
347 (setf (char sequence start) char)
348 (do ((i (1+ start) (1+ i)))
349 ((>= i end) end)
350 (if (setf char (funcall decoder #'buffer-extract-byte stream))
351 (setf (char sequence i) char)
352 (return i)))))
353 end))))
354
355
356
357 (defmethod stream-write-sequence ((stream amqp-device) (sequence vector)
358 #+ccl &key #-ccl &optional
359 (start 0) end)
360 (setf end (or end (length sequence)))
361 (device-write stream sequence start end 0))
362
363
364 (defmethod stream-write-sequence ((stream amqp-device) (sequence string)
365 #+ccl &key #-ccl &optional
366 (start 0) end)
367 (setf end (or end (length sequence)))
368 (stream-write-string stream sequence start end))
369
370
371
372 #+clozure
373 (progn
374 (defmethod ccl:stream-read-vector ((stream amqp-device) (sequence vector) start end)
375 (let ((count (device-read stream sequence (or start 0) (or end (length sequence)) nil)))
376 (when (plusp count) (+ start count))))
377
378 (defmethod ccl:stream-read-vector ((stream amqp-device) (sequence string) start end)
379 (stream-read-sequence stream sequence :start (or start 0) :end (or end (length sequence))))
380
381 (defmethod ccl:stream-write-vector ((stream amqp-device) (sequence vector) start end)
382 (device-write stream sequence (or start 0) (or end (length sequence)) t))
383
384 (defmethod ccl:stream-write-vector ((stream amqp-device) (sequence string) start end)
385 (stream-write-sequence stream sequence :start (or start 0) :end (or end (length sequence)))))
386
387
388 (defmethod stream-tyo ((stream amqp-device) character)
389 ;; (stream-tyo *trace-output* character)
390 (stream-write-char stream character))
391
392 (defmethod stream-tyi ((stream amqp-device))
393 (let ((char (stream-read-char stream)))
394 (typecase char
395 (character char)
396 (t nil))))
397
398 (defmethod stream-untyi ((stream amqp-device) character)
399 (stream-unread-char stream character))
400
401
402
403 ;;;
404 ;;; fu interface
405
406 (defmethod device-allocate-buffer ((stream amqp-device) &key
407 (length (device-buffer-length stream))
408 (initial-contents nil))
409 ;; the description does not make this obvious, but it
410 ;; makes sense for thi initial contents to be in the application domain
411 ;; - that is, decoded
412 (let ((new-buffer (make-frame-buffer length)))
413 (when initial-contents
414 (assert (= (length initial-contents) length) ()
415 "Inconsistent lengths: ~d, ~d" length (length initial-contents))
416 (with-slots (out-buffer outpos max-out-pos body-length body-position) stream
417 (let ((.out-buffer out-buffer)
418 (.outpos outpos)
419 (.max-out-pos max-out-pos)
420 (.body-length body-length)
421 (.body-position body-position))
422 (unwind-protect
423 (progn (setf out-buffer new-buffer
424 outpos 0
425 max-out-pos (1+ length) ; prevent writing
426 body-length (1+ length)
427 body-position 0)
428 (stream-write-sequence stream initial-contents))
429 (setf out-buffer .out-buffer
430 outpos .outpos
431 max-out-pos .max-out-pos
432 body-length .body-length
433 body-position .body-position)))))
434 new-buffer))
435
436
437 (defmethod device-input-element-type ((stream amqp-device))
438 ;; one only
439 (device-element-type stream))
440
441
442 (defmethod device-output-element-type ((stream amqp-device))
443 ;; one only
444 (device-element-type stream))
445
446 (defmethod device-encoded-length ((stream amqp-device) buffer &optional start end)
447 (declare (ignore buffer start end))
448 nil)
449
450 ;;;
451 ;;; the description does not make this obvious, but it
452 ;;; [http://paste.lisp.org/display/65229], and the names do imply that
453 ;;; the buffers are in the device domain - that is the content is encoded
454 ;;; thus device-write rather than stream-write-sequence
455
456 (defmethod device-write-buffers ((stream amqp-device) &rest buffer-specs)
457 (loop for (buffer start end) in buffer-specs by #'cddr
458 do (device-write stream buffer start end t)))
459
460 (defmethod device-read-buffers ((stream amqp-device) &rest buffer-specs)
461 (loop for (buffer start end) in buffer-specs by #'cddr
462 do (device-read stream buffer start end t)))
463
464

  ViewVC Help
Powered by ViewVC 1.1.5