/[de-setf-amqp]/device-stream.lisp
ViewVC logotype

Contents of /device-stream.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 4491 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
5 (document :file
6 (description "This file defines a stream interface for AMQP channel and connection instances for the
7 'de.setf.amqp' library.")
8 (copyright
9 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
10 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
11 of the GNU Affero General Public License as published by the Free Software Foundation.
12
13 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
14 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 See the Affero General Public License for more details.
16
17 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
18 If not, see the GNU [site](http://www.gnu.org/licenses/).")
19
20 (long-description
21 "Stream-based operations are available through the `device-read-content` and `device-write-content` operators
22 in the event that the size of the body datum is indeterminate. One specialization accepts a function object as
23 the body. This is used to implement `with-open-channel` by passing the form body as the continuation."))
24
25 (defun call-with-channel-input-stream (operator channel &key
26 (direction :input)
27 (queue (error "queue argument required.")))
28 (flet ((content-body-operator (channel content-type)
29 (declare (ignore content-type))
30 (funcall operator channel)))
31 (declare (dynamic-extent #'content-body-operator))
32 (assert (eq direction :input) () "Invalid direction for input: ~s." direction)
33 (setf queue (etypecase queue
34 (amqp:queue queue)
35 (string (amqp:channel.queue channel :queue queue))
36 (cons (apply #'amqp:channel.queue channel queue))))
37 (amqp:declare queue)
38 (amqp:request-get channel :queue queue :body #'content-body-operator)))
39
40 (defun call-with-channel-output-stream (operator channel &key
41 (direction :output)
42 (queue (error "queue argument required."))
43 (exchange (error "exchange argument required."))
44 (type "direct")
45 (routing-key "/"))
46 (flet ((content-body-operator (channel content-type)
47 (declare (ignore content-type))
48 (funcall operator channel)))
49 (declare (dynamic-extent #'content-body-operator))
50 (assert (eq direction :output) () "Invalid direction for output: ~s." direction)
51 (setf queue (etypecase queue
52 (amqp:queue queue)
53 (string (amqp:channel.queue channel :queue queue))
54 (cons (apply #'amqp:channel.queue channel queue))))
55 (setf exchange (etypecase exchange
56 (amqp:exchange queue)
57 (string (amqp:channel.exchange channel :exchange exchange :type type))
58 (cons (apply #'amqp:channel.exchange channel exchange))))
59 (amqp:declare exchange)
60 (amqp:declare queue)
61 (amqp:bind queue :exchange exchange :queue queue :routing-key routing-key)
62 (amqp:request-publish (amqp:basic channel) :exchange exchange :body #'content-body-operator
63 :routing-key routing-key)))
64
65 (defgeneric call-with-open-channel-stream (operator channel &rest options)
66 (:method (operator (channel amqp:channel) &rest options)
67 (unwind-protect
68 (multiple-value-prog1 (if (or (getf options :exchange)
69 (eq (getf options :direction) :output))
70 (apply #'call-with-channel-output-stream operator channel options)
71 (apply #'call-with-channel-input-stream operator channel options))
72 (when (open-stream-p channel)
73 (close channel))
74 (setf channel nil))
75 (when channel (close channel :abort t)))))
76
77
78 (defmacro amqp:with-open-channel ((channel-var connection &rest options) &rest body)
79 `(flet ((_::with-open-channel-body (,channel-var)
80 ,@body))
81 (declare (dynamic-extent #'_::with-open-channel-body))
82 (call-with-open-channel-stream #'_::with-open-channel-body (amqp:connection.channel ,connection :number t) ,@options)))

  ViewVC Help
Powered by ViewVC 1.1.5