/[de-setf-amqp]/amqp-device.lisp
ViewVC logotype

Contents of /amqp-device.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (hide annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 13926 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 janderson 3 ;;; -*- Package: de.setf.amqp.implementation; -*-
2    
3     (in-package :de.setf.amqp.implementation)
4    
5    
6     (document :file
7     (description "This file defines the `amqp-device` class to extend `simple-stream` devices as part of the
8     'de.setf.amqp' library.")
9     (copyright
10     "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
11     "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
12     of the GNU Affero General Public License as published by the Free Software Foundation.
13    
14     'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
15     implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
16     See the Affero General Public License for more details.
17    
18     A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
19     If not, see the GNU [site](http://www.gnu.org/licenses/)."))
20    
21    
22    
23    
24     (defclass amqp-device (dual-channel-simple-stream)
25     ((uri
26     :initform (error "initialization argument required: uri")
27     :initarg :uri
28     :reader device-uri
29     :writer setf-device-uri)
30     (body-position
31     :initform 0
32     :accessor device-body-position)
33     (body-length
34     :initform 0
35     :accessor device-body-length)
36     (free-input-frames
37     :accessor device-free-input-frames
38     :type locked-stack
39     :documentation "This stack resources frames to be used to
40     read data from the device. It is shared directly by a connection and
41     all of its channels.")
42     (free-output-frames
43     :accessor device-free-output-frames
44     :type locked-stack
45     :documentation "This stack resources frames to be used to
46     write data to the device. It is shared directly by a connection and
47     all of its channels.")
48     (read-frames
49     :accessor device-read-frames
50     :type locked-queue
51     :documentation "This queue buffers frames which have been read from a
52     device, but not yet processed by the respective channel or (for channel 0)
53     by the connection.")
54     (encoded-frames
55     :accessor device-encoded-frames
56     :type locked-queue
57     :documentation "This stack resources frames to be used to
58     read data from the device. It is shared directly by a connection and
59     all of its channels.")
60     (content-type
61     :initarg :content-type
62     :reader device-content-type :writer setf-device-content-type
63     :type mime:*/*
64     :documentation "Specifies the encoding type for character-oriented stream
65     operations. The publish, deliver, and get operations consult it to determine
66     which encoding filter to install. (See (setf channel-content-type).)")
67     (element-type
68     :initarg :element-type
69     :accessor device-element-type
70     :documentation "A type specifier which indicates what objects may be read
71     from or written to the stream. The type determine how device-read-content-body
72     processes delivered content. In addition to character and integer, which
73     are treated as equivalent to string and vector with respect to a complete body,
74     the types stream and list return the stream and read an s-expression. any other
75     type is to be encoded as an instantiation form.")
76     (encoder
77     :type function
78     :reader device-encoder :writer setf-device-encoder
79     :documentation "Optionally binds a function which is then used to encode
80     character values for output to the device.")
81     (decoder
82     :type function
83     :reader device-decoder :writer setf-device-decoder
84     :documentation "Optionally binds a function which is then used to decode
85     character values from input from the device.")
86     (state
87     :initform nil
88     :accessor device-state)
89     (line-buffer
90     :initform (make-array 32 :element-type 'character :fill-pointer 0 :adjustable t)
91     :reader stream-line-buffer)
92     (eof-marker
93     :initform :eof :initarg :eof-marker
94     :accessor stream-eof-marker)
95     (eol-marker
96     :initform #\newline :initarg :eol-marker
97     :accessor stream-eol-marker))
98     (:default-initargs
99     :element-type 'character
100     :content-type mime:text/plain ))
101    
102    
103    
104     (defclass amqp-connection-device (amqp-device)
105     ())
106    
107     (defclass amqp-socket-device (amqp-device)
108     ((socket
109     :initform nil
110     :reader device-socket)))
111    
112    
113    
114     (defmethod shared-initialize ((instance amqp-device) (slots t) &rest initargs
115     &key (content-type nil ct-s))
116     (declare (dynamic-extent initargs))
117    
118     ;; coerce and set the type, and pass it to any other initializers
119     (if ct-s
120     (apply #'call-next-method instance slots
121     :content-type (setf (device-content-type instance) content-type)
122     initargs)
123     (call-next-method)))
124    
125    
126     (defgeneric (setf device-content-type) (type device)
127     (:method ((type mime:*/*) (device amqp-device))
128     (setf-device-content-type type device)
129     (slot-makunbound device 'decoder)
130     (slot-makunbound device 'encoder)
131     (update-device-codecs device type)
132     type)
133     (:method ((type t) (device amqp-device))
134     (setf (device-content-type device) (mime-type type))))
135    
136    
137    
138     (defmethod update-device-codecs ((device amqp-device) (type mime:*/*))
139     (multiple-value-bind (decoder encoder)
140     (compute-charset-codecs (device-content-type device))
141     (unless (slot-boundp device 'decoder)
142     (setf-device-decoder decoder device))
143     (unless (slot-boundp device 'encoder)
144     (setf-device-encoder encoder device)))
145     type)
146    
147    
148    
149     (defmethod mime-type-charset ((device amqp-device))
150     "Given a device, delegate to the content-type."
151     (mime-type-charset (device-content-type device)))
152    
153    
154     (defmethod mime:mime-type ((device amqp-device) &rest args)
155     (declare (dynamic-extent args) (ignore args))
156     (device-content-type device))
157    
158    
159     (defmethod device-initialize-input-buffer ((instance amqp-device))
160     (with-slots (buffer buffpos buffer-ptr buf-len) instance
161     ;; claim a frame to get the size right and confiscate it's buffer
162     (let ((frame (claim-input-frame instance)))
163     (setf buffer (frame-data frame) ; don't give it back
164     buffpos 0
165     buffer-ptr 0
166     buf-len (length buffer)))))
167    
168     (defmethod device-initialize-output-buffer ((instance amqp-device))
169     (with-slots (out-buffer max-out-pos outpos) instance
170     ;; claim a frame to get the size right and confiscate it's buffer
171     (let ((frame (claim-output-frame instance)))
172     (setf out-buffer (frame-data frame) ; don't give it back
173     outpos 0
174     max-out-pos (length out-buffer)))))
175    
176     (defmethod device-initialize-buffers ((instance amqp-device))
177     (device-initialize-output-buffer instance)
178     (device-initialize-input-buffer instance))
179    
180    
181     (defmethod device-reset-buffers ((device amqp-device))
182     (with-slots (buffer buffpos buffer-ptr buf-len
183     out-buffer max-out-pos outpos
184     position length pending) device
185     (setf buffer nil
186     buffpos 0
187     buffer-ptr 0
188     buf-len 0
189     pending nil
190     max-out-pos 0
191     out-buffer nil
192     outpos 0)))
193    
194    
195    
196     (defmethod device-open ((device amqp-device) #-sbcl (slots t) (options t))
197    
198     (device-reset-buffers device)
199     #+sbcl ;; set up the cached i/o functions
200     (with-stream-class (amqp-device device)
201     (add-stream-instance-flags device :dual :simple :input :output)
202     (install-amqp-device-strategy device))
203     t)
204    
205    
206     (defmethod device-close ((device amqp-device) (abort t))
207     (unless (eq (stream-external-format device) :void)
208     (ignore-errors (if abort
209     (clear-output device)
210     (finish-output device)))
211     (when (next-method-p) (call-next-method))
212     (setf (stream-external-format device) :void)))
213    
214    
215     (defmethod device-close ((device amqp-connection-device) (abort t))
216     (call-next-method)
217     (setf (stream-input-handle device) nil)
218     (setf (stream-output-handle device) nil))
219    
220    
221     (defgeneric device-listen (device)
222     (:method ((device amqp-connection-device))
223     (when (stream-input-handle device)
224     (device-listen (stream-input-handle device))))
225    
226     (:method ((device amqp-socket-device))
227     (when (stream-input-handle device)
228     (listen (stream-input-handle device)))))
229    
230     (defmethod device-open ((device amqp-socket-device) #-sbcl (slots t) options)
231     (if (or (stream-input-handle device)
232     (stream-output-handle device))
233     (call-next-method)
234     (flet ((argument-error ()
235     (error "device-open on ~S requires :remote-host and :remote-port arguments"
236     (type-of device))))
237     (destructuring-bind (&key (remote-host (argument-error))
238     (remote-port (argument-error))
239     (direction :io)
240     &allow-other-keys)
241     options
242     (assert (member direction '(:probe :io)) ()
243     "Invalid diection: ~s." direction)
244     (when (call-next-method)
245     (let ((opened-socket (usocket:socket-connect remote-host
246     remote-port
247     :element-type 'unsigned-byte)))
248     (ecase direction
249     (:io
250     (setf (device-socket device) opened-socket)
251     #+mcl
252     (ccl:terminate-when-unreachable device))
253     (:probe
254     (usocket:socket-close opened-socket))))
255     device)))))
256    
257    
258     (defmethod device-close ((device amqp-socket-device) (abort t))
259     (call-next-method)
260     (when (device-socket device)
261     (usocket:socket-close (device-socket device))
262     (setf (device-socket device) nil))
263     #+mcl
264     (ccl:cancel-terminate-when-unreachable device))
265    
266    
267     (defmethod (setf device-socket) ((socket null) (device amqp-socket-device))
268     (setf (slot-value device 'socket) nil)
269     (setf (stream-input-handle device) nil)
270     (setf (stream-output-handle device) nil))
271    
272     (defmethod (setf device-socket) ((socket usocket:stream-usocket) (device amqp-socket-device))
273     (setf (slot-value device 'socket) socket)
274     (setf (stream-input-handle device) (usocket:socket-stream socket))
275     (setf (stream-output-handle device) (usocket:socket-stream socket)))
276    
277    
278     #+mcl
279     (defmethod terminate ((object simple-stream))
280     (device-close object t))
281    
282     (when (fboundp 'stream-close)
283     (defmethod stream-close ((stream amqp-device))
284     (when (next-method-p) (call-next-method))
285     (device-close stream nil)))
286    
287    
288     ;;;
289     ;;; simple stream i/o strategies to be cached in the device instance
290    
291     #+sbcl
292     (progn
293     (defun amqp-j-read-char (device eof-error-p eof-value blocking)
294     (when (or blocking (stream-listen device))
295     (with-slots (decoder last-char-read-size buffpos) device
296     (flet ((buffer-extract-byte (stream)
297     (with-slots (buffer buffpos buffer-ptr body-position body-length) stream
298     (when (or (< buffpos buffer-ptr)
299     (and (not (minusp buffer-ptr))
300     ; resets buff-pos / renews buffer content unless eof
301     (plusp (device-read stream nil 0 nil t))))
302     (incf body-position)
303     (prog1 (aref buffer buffpos)
304     (incf buffpos))))))
305     (let ((start-buffpos buffpos)
306     (char (funcall decoder #'buffer-extract-byte device)))
307     (cond (char
308     (setf last-char-read-size (- buffpos start-buffpos))
309     char)
310     (eof-error-p
311     (error 'end-of-file :stream device))
312     (t
313     eof-value)))))))
314    
315     (defun amqp-j-read-chars (device sequence search start end blocking)
316     "like stream-read-sequence, but terminate on match, exclusive of match"
317     (when (or blocking (stream-listen device))
318     (setf end (or end (length sequence)))
319     (with-slots (decoder last-char-read-size buffpos) device
320     (flet ((buffer-extract-byte (stream)
321     (with-slots (buffer buffpos buffer-ptr body-position) stream
322     (when (or (< buffpos buffer-ptr)
323     (and (not (minusp buffer-ptr))
324     ; resets buff-pos / buffer content unless eof
325     (plusp (device-read stream nil 0 nil t))))
326     (prog1 (aref buffer buffpos)
327     (incf body-position)
328     (incf buffpos))))))
329     (if (> end start)
330     (let ((i start)
331     (start-buffpos buffpos)
332     (char #\null))
333     (loop (unless (setf char (funcall decoder #'buffer-extract-byte device))
334     (return (values (- i start) :eof)))
335     (setf last-char-read-size (- buffpos start-buffpos))
336     (setf start-buffpos buffpos)
337     (when (eql char search)
338     (return (values (- i start) t)))
339     (setf (char sequence i) char)
340     (incf i)))
341     0)))))
342    
343     (defun amqp-j-write-char (character stream)
344     (amqp-stream-write-char stream character))
345    
346     (defun amqp-j-write-chars (string stream start end)
347     (amql-stream-write-string stream string start end))
348    
349     (defun amqp-j-listen (stream)
350     (stream-listen (stream-input-handle stream)))
351    
352     (defun install-amqp-device-strategy (device)
353     ;; no attention to external format, as that is handled based on
354     ;; messag header and/or individual publish request
355     (sb-simple-streams::with-stream-class (amqp-device device)
356     (setf (sm sb-simple-streams::j-read-char device) #'amqp-j-read-char
357     (sm sb-simple-streams::j-read-chars device) #'amqp-j-read-chars
358     (sm sb-simple-streams::j-unread-char device) #'sb-simple-streams::%unread-char
359     (sm sb-simple-streams::j-write-char device) #'amqp-j-write-char
360     (sm sb-simple-streams::j-write-chars device) #'amqp-j-write-chars
361     (sm sb-simple-streams::j-listen device) #'amqp-j-listen)))
362     )

  ViewVC Help
Powered by ViewVC 1.1.5