/[de-setf-amqp]/utilities.lisp
ViewVC logotype

Contents of /utilities.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (hide annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 21628 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 janderson 3 ;;; -*- Package: de.setf.amqp.implementation; -*-
2    
3     (in-package :de.setf.amqp.implementation)
4    
5     (document :file
6     (description "This file defines utility operators for the 'de.setf.amqp' library.")
7     (copyright
8     "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
9     "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
10     of the GNU Affero General Public License as published by the Free Software Foundation.
11    
12     'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
13     implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14     See the Affero General Public License for more details.
15    
16     A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
17     If not, see the GNU [site](http://www.gnu.org/licenses/)."))
18    
19    
20     ;;;
21     ;;; macros
22    
23     (defmacro assert-condition (form &rest args)
24     (let ((format-string nil) (format-arguments nil) (operator nil))
25     (when (or (typep (first args) '(and symbol (not keyword)))
26     (and (consp (first args)) (eq (caar args) 'setf)))
27     (setf operator (pop args)))
28     (when (stringp (first args))
29     (setf format-string (pop args)
30     format-arguments (shiftf args nil)))
31     (destructuring-bind (&key (operator operator) (format-string format-string) (format-arguments format-arguments)
32     (type (if (and (consp form) (eq (first form) 'typep)) (third form) `(satisfies ,form))))
33     args
34     `(unless ,form
35     (error 'simple-type-error
36     :expected-type (quote ,type)
37     :format-string ,(format nil "~@[~a: ~]condition failed: ~s~:[.~; ~~@?~]"
38     operator form
39     format-string)
40     :format-arguments ,(when format-string `(list ,format-string ,@format-arguments)))))))
41    
42     (defmacro def-delegate-slot ((class slot) &rest operators)
43     `(progn ,@(mapcar #'(lambda (op)
44     `(progn (defmethod ,op ((instance ,class)) (,op (slot-value instance ',slot)))
45     (defmethod (setf ,op) (value (instance ,class)) (setf (,op (slot-value instance ',slot)) value))))
46     operators)))
47    
48     (defmacro assert-argument-type (operator variable type &optional (required-p t) (test `(typep ,variable ',type)))
49     (let ((form `(assert ,test ()
50     ,(format nil "~s: the ~:[(optional) ~;~] ~a argument must be of type ~a."
51     operator required-p variable type))))
52     (if required-p
53     form
54     `(when ,variable ,form))))
55    
56     (defmacro assert-argument-types (operator &rest assertions)
57     `(progn ,@(loop for assertion in assertions
58     collect `(assert-argument-type ,operator ,@assertion))))
59    
60     #+mcl
61     (setf (ccl:assq 'assert-argument-types ccl:*fred-special-indent-alist*) 1)
62    
63    
64     ;;; assorted
65     (unless (boundp 'directory-pathname-p)
66     (defun directory-pathname-p (path)
67     (let ((name (pathname-name path))(type (pathname-type path)))
68     (and (or (null name) (eq name :unspecific) (zerop (length name)))
69     (or (null type) (eq type :unspecific))))))
70    
71     ;;; logging
72    
73     (defparameter amqp:*log-level* :warn)
74    
75     (defparameter *log-levels* '(:debug :verbose :warn :error))
76    
77     (defparameter *log-stream* *trace-output*)
78    
79     (defmacro amqp:log (criteria class &rest args)
80     (let ((log-op (gensym)))
81     `(flet ((,log-op (stream)
82     (format stream "~&[~/date::format-iso-time/] ~a ~a: ~@?"
83     (get-universal-time) ',criteria ,class ,@args)))
84     (declare (dynamic-extent (function ,log-op)))
85     (log-when ',criteria (function ,log-op)))))
86    
87     (defmacro amqp:log* (criteria class &rest args)
88     (let ((log-op (gensym)))
89     `(flet ((,log-op (stream)
90     (apply #'format stream "~&[~/date::format-iso-time/] ~a ~a: ~@?"
91     (get-universal-time) ',criteria ,class ,@args)))
92     (declare (dynamic-extent (function ,log-op)))
93     (log-when ',criteria (function ,log-op)))))
94    
95     (defun write-log-entry (op)
96     (let ((*print-readably* nil))
97     (when *log-stream* (funcall op *log-stream*))))
98    
99     (defun log-when (criteria op)
100     (when (find criteria (member *log-level* *log-levels*))
101     (write-log-entry op)))
102    
103     ;;;
104     ;;; instance tags are just the type and identity
105    
106     (defun make-instance-tag (instance)
107     (with-output-to-string (stream)
108     (print-unreadable-object (instance stream :type t :identity t))))
109    
110     ;;;
111     ;;; Version keywords reflect the version information contained in a protocol
112     ;;; header. The symbol name comprises the initial 4-byte protocol identifier
113     ;;; and the class, instance, and major/minor version numbers, each separated
114     ;;; by a '-'
115     ;;; nb. the structural relation between the parsed version id and the keyword
116     ;;; does not, in practice correspond to the relation between version keywords
117     ;;; and the connection protocol headers. (see )
118     ;;; this relation is recorded in amqp.u:*version-headers*
119    
120     (defun version-protocol-header (version-keyword)
121     (rest (assoc version-keyword amqp.u:*version-headers* :test #'string-equal)))
122    
123     (defun protocol-header-version (protocol-header)
124     (first (rassoc protocol-header amqp.u:*version-headers* :test #'equalp)))
125    
126     (defun (setf version-protocol-header) (header version-keyword)
127     (assert (typep header '(vector t 8)) ()
128     "Invalid version header: ~s." header)
129     (let ((entry (assoc version-keyword amqp.u:*version-headers* :test #'string-equal)))
130     (cond (entry
131     (setf (rest entry) header))
132     (t
133     (setq amqp.u:*version-headers*
134     (acons version-keyword header amqp.u:*version-headers*))
135     header))))
136    
137     (defun make-version-keyword (&key (name :amqp)
138     (class 1) (instance 1)
139     (major (error "major version number is required."))
140     (minor (error "minor version number is required."))
141     (revision 0))
142     "Generate the version keyword for the given combination of
143     CLASS, INSTANCE, and MAJOR, MINOR, and REVISION numbers.
144     By default class and instance default to '1', while the default revision
145     is '0'. Both major and minor version number are required."
146     (intern (format nil "~:@(~a~)-~d-~d-~d-~d-~d"
147     name
148     class instance major minor revision)
149     :keyword))
150    
151     (defun parse-version-keyword (keyword)
152     (labels ((elements (string position)
153     (let ((next (position #\- string :start (1+ position))))
154     (cons (subseq string position next)
155     (when next (elements string (1+ next)))))))
156     (destructuring-bind (name &rest numbers) (elements (string keyword) 0)
157     (assert (and (every #'alpha-char-p name)
158     (= (length numbers) 5)))
159     (cons (intern name :keyword) (mapcar #'parse-integer numbers)))))
160    
161    
162     (defgeneric version-lessp (version1 version2)
163     (:documentation "Return TRUE iff VERSION1 is less than VERSION2.
164     Accepts version indicators as keywords and as parsed lists.
165     Requres the scheme to agree and ignores the protocol class and instance,
166     to compare just major, minor, and revision numbers.")
167    
168     (:method ((version1 symbol) (version2 t))
169     (version-lessp (parse-version-keyword version1) version2))
170    
171     (:method ((version1 t) (version2 symbol))
172     (version-lessp version1 (parse-version-keyword version2)))
173    
174     (:method ((version1 cons) (version2 cons))
175     (assert (and (symbolp (first version1)) (eq (first version1) (first version2))) ()
176     "Invalid version comparison: ~s ~s" version1 version2)
177     (map nil #'(lambda (e1 e2)
178     (assert (and (numberp e1) (numberp e2)) ()
179     "Invalid version comparison: ~s ~s" version1 version2)
180     (cond ((< e1 e2)
181     (return-from version-lessp t))
182     ((> e1 e2)
183     (return-from version-lessp nil))))
184     (cdddr version1) (cdddr version2))))
185    
186    
187    
188     (defun amqp:initialize (&key frame-size timeout force-p)
189     (assert-argument-types amqp:initialize
190     (frame-size integer nil)
191     (timeout integer nil))
192    
193     (when (or force-p (null *connection-classes*))
194     (labels ((collect-subclasses (class)
195     (dolist (class (closer-mop:class-direct-subclasses class))
196     (when (null (closer-mop:class-direct-subclasses class))
197     (push class *connection-classes*)
198     (collect-subclasses class)))))
199    
200     (when frame-size
201     (setq *frame-size* frame-size))
202     (when timeout
203     (setq *connection-timeout* timeout))
204     (setq *connection-classes* '())
205     (collect-subclasses (find-class 'amqp:object))
206     (setq *connection-classes* (sort *connection-classes* #'version-lessp
207     :key #'class-protocol-version)))))
208    
209    
210    
211     ;;; queues
212     ;;; corrected and extended from rhode's paper
213    
214     (defclass collection ()
215     ((if-empty :initform nil :initarg :if-empty :reader collection-if-empty)
216     (name :initform nil :initarg :name :reader collection-name)))
217    
218     (defgeneric collection-empty-p (collection))
219    
220     (defclass queue (collection)
221     ((header :accessor queue-header)
222     (pointer :accessor queue-pointer)
223     (cache :accessor queue-cache :initform nil)))
224    
225     (defclass locked-queue (queue)
226     ((lock :reader queue-lock
227     :initform (bt:make-lock))
228     (processor :accessor queue-processor
229     :initform nil)))
230    
231     (defclass stack (collection)
232     ((data :reader stack-data)))
233    
234     (defclass locked-stack (stack)
235     ((lock :reader stack-lock
236     :initform (bt:make-lock))))
237    
238    
239     (defmethod initialize-instance ((instance collection) &rest args
240     &key (name (with-output-to-string (stream)
241     (print-unreadable-object (instance stream :identity t :type t)))))
242     (apply #'call-next-method instance
243     :name name
244     args))
245    
246     (defmethod print-object ((instance collection) (stream t))
247     (print-unreadable-object (instance stream :identity t :type t)
248     (format stream "~@[~a~]" (collection-name instance))))
249    
250     (defmethod initialize-instance :after ((o queue) &key)
251     (let ((head (list nil)))
252     (setf (queue-header o) head (queue-pointer o) head)))
253    
254     (defmethod initialize-instance :after ((instance stack) &key)
255     (with-slots (data) instance
256     (setf data (make-array 32 :fill-pointer 0 :adjustable t))))
257    
258     (defmethod collection-empty-p ((queue queue))
259     (eq (queue-header queue) (queue-pointer queue)))
260    
261     (defmethod collection-empty-p ((stack stack))
262     (zerop (fill-pointer (stack-data stack))))
263    
264     (defgeneric collection-content (collection)
265     (:method ((collection queue))
266     (rest (queue-header collection)))
267     (:method ((collection stack))
268     (stack-data collection)))
269    
270     (defgeneric collection-size (collection)
271     (:method ((collection collection))
272     (length (collection-content collection))))
273    
274     (defgeneric enqueue (data queue &key if-empty)
275     (declare (dynamic-extent if-empty))
276     (:argument-precedence-order queue data)
277    
278     #+(or) ;; this version caches the released queue cells
279     (:method (data (o queue) &key if-empty)
280     (declare (dynamic-extent if-empty))
281     (if (and (eq (queue-pointer o) (queue-header o))
282     if-empty)
283     (funcall if-empty)
284     (let ((elt nil))
285     (cond ((setf elt nil) ;;(queue-cache o))
286     (setf (queue-cache o) (rest elt)
287     (car elt) data
288     (cdr elt) nil))
289     (t
290     (setf elt (list data))))
291     (setf (cdr (queue-pointer o)) elt
292     (queue-pointer o) elt)))
293     data)
294    
295     (:method (data (o queue) &key if-empty)
296     (declare (dynamic-extent if-empty) (ignore if-empty))
297     (let ((elt (list data)))
298     (setf (cdr (queue-pointer o)) elt
299     (queue-pointer o) elt))
300     data)
301    
302     (:method ((data t) (queue locked-queue) &key (if-empty (collection-if-empty queue)))
303     (declare (dynamic-extent if-empty))
304     (let ((lock (queue-lock queue))
305     (state :released))
306     (flet ((acquire-it ()
307     (setf state :acquiring)
308     (bt:acquire-lock lock)
309     (setf state :acquired))
310     (release-it ()
311     (setf state :releasing)
312     (bt:release-lock lock)
313     (setf state :released)))
314     (unwind-protect
315     (loop
316     (acquire-it)
317     (if (collection-empty-p queue)
318     ;; if there's no content, decided whether to prime the process
319     ;; or just add to the queue
320     (cond ((queue-processor queue)
321     ;; recursive call is allowed, simply enqueue
322     (return (call-next-method)))
323     ((null if-empty)
324     ;; simple enqueue
325     (return (call-next-method)))
326     (t
327     ;; w/o a processor, but with a if-empty continuation, use it
328     (assert-argument-type dequeue if-empty
329     (or function (and symbol (satisfies fboundp))))
330     (unwind-protect
331     (progn (setf (queue-processor queue) (bt:current-thread))
332     (call-next-method)
333     (release-it)
334     (return (values (funcall if-empty) t)))
335     (setf (queue-processor queue) nil))))
336     ;; if the collection already has content, just enqueue
337     (return (call-next-method))))
338     (ecase state
339     (:released )
340     (:acquired (bt:release-lock lock))
341     ((:acquiring :releasing) ; maybe or maybe not
342     (ignore-errors (bt:release-lock lock))))))))
343    
344     (:method (data (stack stack) &key if-empty)
345     (declare (ignore if-empty))
346     (vector-push-extend data (stack-data stack))
347     data)
348    
349     (:method ((data t) (stack locked-stack) &rest args)
350     (declare (dynamic-extent args) (ignore args))
351     (bt:with-lock-held ((stack-lock stack))
352     (call-next-method))))
353    
354     (defgeneric dequeue (queue &key if-empty test)
355     (declare (dynamic-extent if-empty))
356    
357     #+(or) ;; this version caches the released queue cells
358     (:method ((queue queue) &key test if-empty)
359     (declare (ignore if-empty))
360     (let ((head (queue-header queue)))
361     (cond ((eq head (queue-pointer queue))
362     (values nil nil))
363     (test
364     (assert-argument-type dequeue test
365     (or function (and symbol (satisfies fboundp))))
366     (do ((head head (cdr head))
367     (ptr (cdr head) (cdr ptr)))
368     ((null ptr) (values nil nil))
369     (when (funcall test (car ptr))
370     (unless (setf (cdr head) (cdr ptr))
371     (setf (queue-pointer queue) head))
372     (setf (cdr ptr) (queue-cache queue)
373     (queue-cache queue) ptr)
374     (return (values (shiftf (car ptr) nil) t)))))
375     (t
376     (let ((elt (cdr head)))
377     (unless (setf (cdr head) (cdr elt))
378     (setf (queue-pointer queue) head))
379     (setf (cdr elt) (queue-cache queue)
380     (queue-cache queue) elt)
381     (values (shiftf (car elt) nil) t))))))
382    
383     (:method ((queue queue) &key test if-empty)
384     (declare (ignore if-empty))
385     (let ((head (queue-header queue)))
386     (cond ((eq head (queue-pointer queue))
387     (values nil nil))
388     (test
389     (assert-argument-type dequeue test
390     (or function (and symbol (satisfies fboundp))))
391     (do ((head head (cdr head))
392     (ptr (cdr head) (cdr ptr)))
393     ((null ptr) (values nil nil))
394     (when (funcall test (car ptr))
395     (unless (setf (cdr head) (cdr ptr))
396     (setf (queue-pointer queue) head))
397     (return (values (car ptr) t)))))
398     (t
399     (let ((value (cadr head)))
400     (unless (setf (cdr head) (cddr head))
401     (setf (queue-pointer queue) head))
402     (values value t))))))
403    
404     (:method ((queue locked-queue) &key (if-empty (collection-if-empty queue)) test)
405     (declare (dynamic-extent if-empty)
406     (ignore test))
407     (let ((lock (queue-lock queue))
408     (state :released))
409     (flet ((acquire-it ()
410     (setf state :acquiring)
411     (bt:acquire-lock lock)
412     (setf state :acquired))
413     (release-it ()
414     (setf state :releasing)
415     (bt:release-lock lock)
416     (setf state :released)))
417     (unwind-protect
418     ;; attempt to dequeue a value. if that succeeds return it. otherwise,
419     ;; - if waiting, release the lock and yield in the hope one appears,
420     ;; and repeat the process upon resumption;
421     ;; - if suppressed, return nil
422     ;; - if provided a continuation, return its result
423     (loop
424     (acquire-it)
425     (multiple-value-bind (value value-p)
426     (call-next-method)
427     (if value-p
428     (return (values value t))
429     (case if-empty
430     ((nil)
431     (return (values nil nil)))
432     (:wait
433     (when (queue-processor queue)
434     ;; if there is a processor, wait
435     (assert (not (eq (queue-processor queue) (bt:current-thread))) ()
436     "Recursive dequeue: ~s" (queue-processor queue)))
437     (release-it)
438     (loop (if (collection-empty-p queue)
439     (bt:thread-yield)
440     (return))))
441     (t
442     (assert-argument-type dequeue if-empty
443     (or function (and symbol (satisfies fboundp))))
444     (unwind-protect
445     (progn (setf (queue-processor queue) (bt:current-thread))
446     (release-it)
447     (return (values (funcall if-empty) t)))
448     (setf (queue-processor queue) nil)))))))
449     (ecase state
450     (:released )
451     (:acquired (bt:release-lock lock))
452     ((:acquiring :releasing) ; maybe or maybe not
453     (ignore-errors (bt:release-lock lock))))))))
454    
455     (:method ((stack stack) &key (if-empty (collection-if-empty stack)) test)
456     (declare (dynamic-extent if-empty)
457     (ignore test))
458     (let ((data (stack-data stack)))
459     (if (plusp (fill-pointer data))
460     (values (vector-pop data) t)
461     (case if-empty
462     ((nil)
463     (values nil nil))
464     (t
465     (assert-argument-type dequeue if-empty
466     (or function (and symbol (satisfies fboundp))))
467     (values (funcall if-empty) t))))))
468    
469     (:method ((stack locked-stack) &rest args)
470     (declare (dynamic-extent args) (ignore args))
471     (bt:with-lock-held ((stack-lock stack))
472     (call-next-method))))
473    
474    
475    
476     #+:de.setf.utility.test
477     (with-test-situation (:define)
478     (test:test parse-version-keyword/1 (parse-version-keyword :amqp-1-1-0-8-0)
479     '(:AMQP 1 1 0 8 0))
480    
481     (test:test version-lessp/1 (version-lessp '(amqp 1 1 0 8 0) '(amqp 1 1 0 9 0)))
482    
483     (test:test queue/1 (let ((q (make-instance 'queue)))
484     (list (enqueue 1 q)
485     (dequeue q)
486     (dequeue q)
487     (enqueue 2 q)
488     (dequeue q)))
489     '(1 1 NIL 2 2))
490     (test:test queue/1 (let ((q (make-instance 'queue)))
491     (list (enqueue 1 q)
492     (enqueue 2 q)
493     (enqueue 1 q)
494     (dequeue q :test 'evenp)
495     (dequeue q)
496     (dequeue q)
497     (dequeue q)))
498     '(1 2 1 2 1 1 nil))
499     (test:test queue/2 (let ((q (make-instance 'locked-queue :name "test")))
500     (list (enqueue 1 q)
501     (dequeue q)
502     (dequeue q)
503     (enqueue 2 q)
504     (dequeue q)))
505     '(1 1 NIL 2 2))
506     (test:test stack/1 (let ((q (make-instance 'stack :if-empty (let ((x 0)) #'(lambda () (incf x))))))
507     (list (enqueue 'a q)
508     (dequeue q)
509     (dequeue q)
510     (enqueue 'b q)
511     (dequeue q)
512     (dequeue q)
513     (dequeue q)))
514     '(A A 1 B B 2 3))
515     (when bt:*supports-threads-p*
516     (test:test queue/wait (let ((q (make-instance 'locked-queue :name "test")))
517     (list (enqueue 1 q)
518     (dequeue q)
519     (progn (bt:make-thread #'(lambda ()
520     (bt:thread-yield)
521     (enqueue :foreign q)))
522     (dequeue q :if-empty :wait))
523     (enqueue 2 q)
524     (dequeue q)))))
525    
526     )
527    
528    

  ViewVC Help
Powered by ViewVC 1.1.5