/[de-setf-amqp]/amqp-device.lisp
ViewVC logotype

Contents of /amqp-device.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 13926 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
5
6 (document :file
7 (description "This file defines the `amqp-device` class to extend `simple-stream` devices as part of the
8 'de.setf.amqp' library.")
9 (copyright
10 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
11 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
12 of the GNU Affero General Public License as published by the Free Software Foundation.
13
14 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
15 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
16 See the Affero General Public License for more details.
17
18 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
19 If not, see the GNU [site](http://www.gnu.org/licenses/)."))
20
21
22
23
24 (defclass amqp-device (dual-channel-simple-stream)
25 ((uri
26 :initform (error "initialization argument required: uri")
27 :initarg :uri
28 :reader device-uri
29 :writer setf-device-uri)
30 (body-position
31 :initform 0
32 :accessor device-body-position)
33 (body-length
34 :initform 0
35 :accessor device-body-length)
36 (free-input-frames
37 :accessor device-free-input-frames
38 :type locked-stack
39 :documentation "This stack resources frames to be used to
40 read data from the device. It is shared directly by a connection and
41 all of its channels.")
42 (free-output-frames
43 :accessor device-free-output-frames
44 :type locked-stack
45 :documentation "This stack resources frames to be used to
46 write data to the device. It is shared directly by a connection and
47 all of its channels.")
48 (read-frames
49 :accessor device-read-frames
50 :type locked-queue
51 :documentation "This queue buffers frames which have been read from a
52 device, but not yet processed by the respective channel or (for channel 0)
53 by the connection.")
54 (encoded-frames
55 :accessor device-encoded-frames
56 :type locked-queue
57 :documentation "This stack resources frames to be used to
58 read data from the device. It is shared directly by a connection and
59 all of its channels.")
60 (content-type
61 :initarg :content-type
62 :reader device-content-type :writer setf-device-content-type
63 :type mime:*/*
64 :documentation "Specifies the encoding type for character-oriented stream
65 operations. The publish, deliver, and get operations consult it to determine
66 which encoding filter to install. (See (setf channel-content-type).)")
67 (element-type
68 :initarg :element-type
69 :accessor device-element-type
70 :documentation "A type specifier which indicates what objects may be read
71 from or written to the stream. The type determine how device-read-content-body
72 processes delivered content. In addition to character and integer, which
73 are treated as equivalent to string and vector with respect to a complete body,
74 the types stream and list return the stream and read an s-expression. any other
75 type is to be encoded as an instantiation form.")
76 (encoder
77 :type function
78 :reader device-encoder :writer setf-device-encoder
79 :documentation "Optionally binds a function which is then used to encode
80 character values for output to the device.")
81 (decoder
82 :type function
83 :reader device-decoder :writer setf-device-decoder
84 :documentation "Optionally binds a function which is then used to decode
85 character values from input from the device.")
86 (state
87 :initform nil
88 :accessor device-state)
89 (line-buffer
90 :initform (make-array 32 :element-type 'character :fill-pointer 0 :adjustable t)
91 :reader stream-line-buffer)
92 (eof-marker
93 :initform :eof :initarg :eof-marker
94 :accessor stream-eof-marker)
95 (eol-marker
96 :initform #\newline :initarg :eol-marker
97 :accessor stream-eol-marker))
98 (:default-initargs
99 :element-type 'character
100 :content-type mime:text/plain ))
101
102
103
104 (defclass amqp-connection-device (amqp-device)
105 ())
106
107 (defclass amqp-socket-device (amqp-device)
108 ((socket
109 :initform nil
110 :reader device-socket)))
111
112
113
114 (defmethod shared-initialize ((instance amqp-device) (slots t) &rest initargs
115 &key (content-type nil ct-s))
116 (declare (dynamic-extent initargs))
117
118 ;; coerce and set the type, and pass it to any other initializers
119 (if ct-s
120 (apply #'call-next-method instance slots
121 :content-type (setf (device-content-type instance) content-type)
122 initargs)
123 (call-next-method)))
124
125
126 (defgeneric (setf device-content-type) (type device)
127 (:method ((type mime:*/*) (device amqp-device))
128 (setf-device-content-type type device)
129 (slot-makunbound device 'decoder)
130 (slot-makunbound device 'encoder)
131 (update-device-codecs device type)
132 type)
133 (:method ((type t) (device amqp-device))
134 (setf (device-content-type device) (mime-type type))))
135
136
137
138 (defmethod update-device-codecs ((device amqp-device) (type mime:*/*))
139 (multiple-value-bind (decoder encoder)
140 (compute-charset-codecs (device-content-type device))
141 (unless (slot-boundp device 'decoder)
142 (setf-device-decoder decoder device))
143 (unless (slot-boundp device 'encoder)
144 (setf-device-encoder encoder device)))
145 type)
146
147
148
149 (defmethod mime-type-charset ((device amqp-device))
150 "Given a device, delegate to the content-type."
151 (mime-type-charset (device-content-type device)))
152
153
154 (defmethod mime:mime-type ((device amqp-device) &rest args)
155 (declare (dynamic-extent args) (ignore args))
156 (device-content-type device))
157
158
159 (defmethod device-initialize-input-buffer ((instance amqp-device))
160 (with-slots (buffer buffpos buffer-ptr buf-len) instance
161 ;; claim a frame to get the size right and confiscate it's buffer
162 (let ((frame (claim-input-frame instance)))
163 (setf buffer (frame-data frame) ; don't give it back
164 buffpos 0
165 buffer-ptr 0
166 buf-len (length buffer)))))
167
168 (defmethod device-initialize-output-buffer ((instance amqp-device))
169 (with-slots (out-buffer max-out-pos outpos) instance
170 ;; claim a frame to get the size right and confiscate it's buffer
171 (let ((frame (claim-output-frame instance)))
172 (setf out-buffer (frame-data frame) ; don't give it back
173 outpos 0
174 max-out-pos (length out-buffer)))))
175
176 (defmethod device-initialize-buffers ((instance amqp-device))
177 (device-initialize-output-buffer instance)
178 (device-initialize-input-buffer instance))
179
180
181 (defmethod device-reset-buffers ((device amqp-device))
182 (with-slots (buffer buffpos buffer-ptr buf-len
183 out-buffer max-out-pos outpos
184 position length pending) device
185 (setf buffer nil
186 buffpos 0
187 buffer-ptr 0
188 buf-len 0
189 pending nil
190 max-out-pos 0
191 out-buffer nil
192 outpos 0)))
193
194
195
196 (defmethod device-open ((device amqp-device) #-sbcl (slots t) (options t))
197
198 (device-reset-buffers device)
199 #+sbcl ;; set up the cached i/o functions
200 (with-stream-class (amqp-device device)
201 (add-stream-instance-flags device :dual :simple :input :output)
202 (install-amqp-device-strategy device))
203 t)
204
205
206 (defmethod device-close ((device amqp-device) (abort t))
207 (unless (eq (stream-external-format device) :void)
208 (ignore-errors (if abort
209 (clear-output device)
210 (finish-output device)))
211 (when (next-method-p) (call-next-method))
212 (setf (stream-external-format device) :void)))
213
214
215 (defmethod device-close ((device amqp-connection-device) (abort t))
216 (call-next-method)
217 (setf (stream-input-handle device) nil)
218 (setf (stream-output-handle device) nil))
219
220
221 (defgeneric device-listen (device)
222 (:method ((device amqp-connection-device))
223 (when (stream-input-handle device)
224 (device-listen (stream-input-handle device))))
225
226 (:method ((device amqp-socket-device))
227 (when (stream-input-handle device)
228 (listen (stream-input-handle device)))))
229
230 (defmethod device-open ((device amqp-socket-device) #-sbcl (slots t) options)
231 (if (or (stream-input-handle device)
232 (stream-output-handle device))
233 (call-next-method)
234 (flet ((argument-error ()
235 (error "device-open on ~S requires :remote-host and :remote-port arguments"
236 (type-of device))))
237 (destructuring-bind (&key (remote-host (argument-error))
238 (remote-port (argument-error))
239 (direction :io)
240 &allow-other-keys)
241 options
242 (assert (member direction '(:probe :io)) ()
243 "Invalid diection: ~s." direction)
244 (when (call-next-method)
245 (let ((opened-socket (usocket:socket-connect remote-host
246 remote-port
247 :element-type 'unsigned-byte)))
248 (ecase direction
249 (:io
250 (setf (device-socket device) opened-socket)
251 #+mcl
252 (ccl:terminate-when-unreachable device))
253 (:probe
254 (usocket:socket-close opened-socket))))
255 device)))))
256
257
258 (defmethod device-close ((device amqp-socket-device) (abort t))
259 (call-next-method)
260 (when (device-socket device)
261 (usocket:socket-close (device-socket device))
262 (setf (device-socket device) nil))
263 #+mcl
264 (ccl:cancel-terminate-when-unreachable device))
265
266
267 (defmethod (setf device-socket) ((socket null) (device amqp-socket-device))
268 (setf (slot-value device 'socket) nil)
269 (setf (stream-input-handle device) nil)
270 (setf (stream-output-handle device) nil))
271
272 (defmethod (setf device-socket) ((socket usocket:stream-usocket) (device amqp-socket-device))
273 (setf (slot-value device 'socket) socket)
274 (setf (stream-input-handle device) (usocket:socket-stream socket))
275 (setf (stream-output-handle device) (usocket:socket-stream socket)))
276
277
278 #+mcl
279 (defmethod terminate ((object simple-stream))
280 (device-close object t))
281
282 (when (fboundp 'stream-close)
283 (defmethod stream-close ((stream amqp-device))
284 (when (next-method-p) (call-next-method))
285 (device-close stream nil)))
286
287
288 ;;;
289 ;;; simple stream i/o strategies to be cached in the device instance
290
291 #+sbcl
292 (progn
293 (defun amqp-j-read-char (device eof-error-p eof-value blocking)
294 (when (or blocking (stream-listen device))
295 (with-slots (decoder last-char-read-size buffpos) device
296 (flet ((buffer-extract-byte (stream)
297 (with-slots (buffer buffpos buffer-ptr body-position body-length) stream
298 (when (or (< buffpos buffer-ptr)
299 (and (not (minusp buffer-ptr))
300 ; resets buff-pos / renews buffer content unless eof
301 (plusp (device-read stream nil 0 nil t))))
302 (incf body-position)
303 (prog1 (aref buffer buffpos)
304 (incf buffpos))))))
305 (let ((start-buffpos buffpos)
306 (char (funcall decoder #'buffer-extract-byte device)))
307 (cond (char
308 (setf last-char-read-size (- buffpos start-buffpos))
309 char)
310 (eof-error-p
311 (error 'end-of-file :stream device))
312 (t
313 eof-value)))))))
314
315 (defun amqp-j-read-chars (device sequence search start end blocking)
316 "like stream-read-sequence, but terminate on match, exclusive of match"
317 (when (or blocking (stream-listen device))
318 (setf end (or end (length sequence)))
319 (with-slots (decoder last-char-read-size buffpos) device
320 (flet ((buffer-extract-byte (stream)
321 (with-slots (buffer buffpos buffer-ptr body-position) stream
322 (when (or (< buffpos buffer-ptr)
323 (and (not (minusp buffer-ptr))
324 ; resets buff-pos / buffer content unless eof
325 (plusp (device-read stream nil 0 nil t))))
326 (prog1 (aref buffer buffpos)
327 (incf body-position)
328 (incf buffpos))))))
329 (if (> end start)
330 (let ((i start)
331 (start-buffpos buffpos)
332 (char #\null))
333 (loop (unless (setf char (funcall decoder #'buffer-extract-byte device))
334 (return (values (- i start) :eof)))
335 (setf last-char-read-size (- buffpos start-buffpos))
336 (setf start-buffpos buffpos)
337 (when (eql char search)
338 (return (values (- i start) t)))
339 (setf (char sequence i) char)
340 (incf i)))
341 0)))))
342
343 (defun amqp-j-write-char (character stream)
344 (amqp-stream-write-char stream character))
345
346 (defun amqp-j-write-chars (string stream start end)
347 (amql-stream-write-string stream string start end))
348
349 (defun amqp-j-listen (stream)
350 (stream-listen (stream-input-handle stream)))
351
352 (defun install-amqp-device-strategy (device)
353 ;; no attention to external format, as that is handled based on
354 ;; messag header and/or individual publish request
355 (sb-simple-streams::with-stream-class (amqp-device device)
356 (setf (sm sb-simple-streams::j-read-char device) #'amqp-j-read-char
357 (sm sb-simple-streams::j-read-chars device) #'amqp-j-read-chars
358 (sm sb-simple-streams::j-unread-char device) #'sb-simple-streams::%unread-char
359 (sm sb-simple-streams::j-write-char device) #'amqp-j-write-char
360 (sm sb-simple-streams::j-write-chars device) #'amqp-j-write-chars
361 (sm sb-simple-streams::j-listen device) #'amqp-j-listen)))
362 )

  ViewVC Help
Powered by ViewVC 1.1.5