/[de-setf-amqp]/utilities.lisp
ViewVC logotype

Contents of /utilities.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 21628 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
5 (document :file
6 (description "This file defines utility operators for the 'de.setf.amqp' library.")
7 (copyright
8 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
9 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
10 of the GNU Affero General Public License as published by the Free Software Foundation.
11
12 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
13 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14 See the Affero General Public License for more details.
15
16 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
17 If not, see the GNU [site](http://www.gnu.org/licenses/)."))
18
19
20 ;;;
21 ;;; macros
22
23 (defmacro assert-condition (form &rest args)
24 (let ((format-string nil) (format-arguments nil) (operator nil))
25 (when (or (typep (first args) '(and symbol (not keyword)))
26 (and (consp (first args)) (eq (caar args) 'setf)))
27 (setf operator (pop args)))
28 (when (stringp (first args))
29 (setf format-string (pop args)
30 format-arguments (shiftf args nil)))
31 (destructuring-bind (&key (operator operator) (format-string format-string) (format-arguments format-arguments)
32 (type (if (and (consp form) (eq (first form) 'typep)) (third form) `(satisfies ,form))))
33 args
34 `(unless ,form
35 (error 'simple-type-error
36 :expected-type (quote ,type)
37 :format-string ,(format nil "~@[~a: ~]condition failed: ~s~:[.~; ~~@?~]"
38 operator form
39 format-string)
40 :format-arguments ,(when format-string `(list ,format-string ,@format-arguments)))))))
41
42 (defmacro def-delegate-slot ((class slot) &rest operators)
43 `(progn ,@(mapcar #'(lambda (op)
44 `(progn (defmethod ,op ((instance ,class)) (,op (slot-value instance ',slot)))
45 (defmethod (setf ,op) (value (instance ,class)) (setf (,op (slot-value instance ',slot)) value))))
46 operators)))
47
48 (defmacro assert-argument-type (operator variable type &optional (required-p t) (test `(typep ,variable ',type)))
49 (let ((form `(assert ,test ()
50 ,(format nil "~s: the ~:[(optional) ~;~] ~a argument must be of type ~a."
51 operator required-p variable type))))
52 (if required-p
53 form
54 `(when ,variable ,form))))
55
56 (defmacro assert-argument-types (operator &rest assertions)
57 `(progn ,@(loop for assertion in assertions
58 collect `(assert-argument-type ,operator ,@assertion))))
59
60 #+mcl
61 (setf (ccl:assq 'assert-argument-types ccl:*fred-special-indent-alist*) 1)
62
63
64 ;;; assorted
65 (unless (boundp 'directory-pathname-p)
66 (defun directory-pathname-p (path)
67 (let ((name (pathname-name path))(type (pathname-type path)))
68 (and (or (null name) (eq name :unspecific) (zerop (length name)))
69 (or (null type) (eq type :unspecific))))))
70
71 ;;; logging
72
73 (defparameter amqp:*log-level* :warn)
74
75 (defparameter *log-levels* '(:debug :verbose :warn :error))
76
77 (defparameter *log-stream* *trace-output*)
78
79 (defmacro amqp:log (criteria class &rest args)
80 (let ((log-op (gensym)))
81 `(flet ((,log-op (stream)
82 (format stream "~&[~/date::format-iso-time/] ~a ~a: ~@?"
83 (get-universal-time) ',criteria ,class ,@args)))
84 (declare (dynamic-extent (function ,log-op)))
85 (log-when ',criteria (function ,log-op)))))
86
87 (defmacro amqp:log* (criteria class &rest args)
88 (let ((log-op (gensym)))
89 `(flet ((,log-op (stream)
90 (apply #'format stream "~&[~/date::format-iso-time/] ~a ~a: ~@?"
91 (get-universal-time) ',criteria ,class ,@args)))
92 (declare (dynamic-extent (function ,log-op)))
93 (log-when ',criteria (function ,log-op)))))
94
95 (defun write-log-entry (op)
96 (let ((*print-readably* nil))
97 (when *log-stream* (funcall op *log-stream*))))
98
99 (defun log-when (criteria op)
100 (when (find criteria (member *log-level* *log-levels*))
101 (write-log-entry op)))
102
103 ;;;
104 ;;; instance tags are just the type and identity
105
106 (defun make-instance-tag (instance)
107 (with-output-to-string (stream)
108 (print-unreadable-object (instance stream :type t :identity t))))
109
110 ;;;
111 ;;; Version keywords reflect the version information contained in a protocol
112 ;;; header. The symbol name comprises the initial 4-byte protocol identifier
113 ;;; and the class, instance, and major/minor version numbers, each separated
114 ;;; by a '-'
115 ;;; nb. the structural relation between the parsed version id and the keyword
116 ;;; does not, in practice correspond to the relation between version keywords
117 ;;; and the connection protocol headers. (see )
118 ;;; this relation is recorded in amqp.u:*version-headers*
119
120 (defun version-protocol-header (version-keyword)
121 (rest (assoc version-keyword amqp.u:*version-headers* :test #'string-equal)))
122
123 (defun protocol-header-version (protocol-header)
124 (first (rassoc protocol-header amqp.u:*version-headers* :test #'equalp)))
125
126 (defun (setf version-protocol-header) (header version-keyword)
127 (assert (typep header '(vector t 8)) ()
128 "Invalid version header: ~s." header)
129 (let ((entry (assoc version-keyword amqp.u:*version-headers* :test #'string-equal)))
130 (cond (entry
131 (setf (rest entry) header))
132 (t
133 (setq amqp.u:*version-headers*
134 (acons version-keyword header amqp.u:*version-headers*))
135 header))))
136
137 (defun make-version-keyword (&key (name :amqp)
138 (class 1) (instance 1)
139 (major (error "major version number is required."))
140 (minor (error "minor version number is required."))
141 (revision 0))
142 "Generate the version keyword for the given combination of
143 CLASS, INSTANCE, and MAJOR, MINOR, and REVISION numbers.
144 By default class and instance default to '1', while the default revision
145 is '0'. Both major and minor version number are required."
146 (intern (format nil "~:@(~a~)-~d-~d-~d-~d-~d"
147 name
148 class instance major minor revision)
149 :keyword))
150
151 (defun parse-version-keyword (keyword)
152 (labels ((elements (string position)
153 (let ((next (position #\- string :start (1+ position))))
154 (cons (subseq string position next)
155 (when next (elements string (1+ next)))))))
156 (destructuring-bind (name &rest numbers) (elements (string keyword) 0)
157 (assert (and (every #'alpha-char-p name)
158 (= (length numbers) 5)))
159 (cons (intern name :keyword) (mapcar #'parse-integer numbers)))))
160
161
162 (defgeneric version-lessp (version1 version2)
163 (:documentation "Return TRUE iff VERSION1 is less than VERSION2.
164 Accepts version indicators as keywords and as parsed lists.
165 Requres the scheme to agree and ignores the protocol class and instance,
166 to compare just major, minor, and revision numbers.")
167
168 (:method ((version1 symbol) (version2 t))
169 (version-lessp (parse-version-keyword version1) version2))
170
171 (:method ((version1 t) (version2 symbol))
172 (version-lessp version1 (parse-version-keyword version2)))
173
174 (:method ((version1 cons) (version2 cons))
175 (assert (and (symbolp (first version1)) (eq (first version1) (first version2))) ()
176 "Invalid version comparison: ~s ~s" version1 version2)
177 (map nil #'(lambda (e1 e2)
178 (assert (and (numberp e1) (numberp e2)) ()
179 "Invalid version comparison: ~s ~s" version1 version2)
180 (cond ((< e1 e2)
181 (return-from version-lessp t))
182 ((> e1 e2)
183 (return-from version-lessp nil))))
184 (cdddr version1) (cdddr version2))))
185
186
187
188 (defun amqp:initialize (&key frame-size timeout force-p)
189 (assert-argument-types amqp:initialize
190 (frame-size integer nil)
191 (timeout integer nil))
192
193 (when (or force-p (null *connection-classes*))
194 (labels ((collect-subclasses (class)
195 (dolist (class (closer-mop:class-direct-subclasses class))
196 (when (null (closer-mop:class-direct-subclasses class))
197 (push class *connection-classes*)
198 (collect-subclasses class)))))
199
200 (when frame-size
201 (setq *frame-size* frame-size))
202 (when timeout
203 (setq *connection-timeout* timeout))
204 (setq *connection-classes* '())
205 (collect-subclasses (find-class 'amqp:object))
206 (setq *connection-classes* (sort *connection-classes* #'version-lessp
207 :key #'class-protocol-version)))))
208
209
210
211 ;;; queues
212 ;;; corrected and extended from rhode's paper
213
214 (defclass collection ()
215 ((if-empty :initform nil :initarg :if-empty :reader collection-if-empty)
216 (name :initform nil :initarg :name :reader collection-name)))
217
218 (defgeneric collection-empty-p (collection))
219
220 (defclass queue (collection)
221 ((header :accessor queue-header)
222 (pointer :accessor queue-pointer)
223 (cache :accessor queue-cache :initform nil)))
224
225 (defclass locked-queue (queue)
226 ((lock :reader queue-lock
227 :initform (bt:make-lock))
228 (processor :accessor queue-processor
229 :initform nil)))
230
231 (defclass stack (collection)
232 ((data :reader stack-data)))
233
234 (defclass locked-stack (stack)
235 ((lock :reader stack-lock
236 :initform (bt:make-lock))))
237
238
239 (defmethod initialize-instance ((instance collection) &rest args
240 &key (name (with-output-to-string (stream)
241 (print-unreadable-object (instance stream :identity t :type t)))))
242 (apply #'call-next-method instance
243 :name name
244 args))
245
246 (defmethod print-object ((instance collection) (stream t))
247 (print-unreadable-object (instance stream :identity t :type t)
248 (format stream "~@[~a~]" (collection-name instance))))
249
250 (defmethod initialize-instance :after ((o queue) &key)
251 (let ((head (list nil)))
252 (setf (queue-header o) head (queue-pointer o) head)))
253
254 (defmethod initialize-instance :after ((instance stack) &key)
255 (with-slots (data) instance
256 (setf data (make-array 32 :fill-pointer 0 :adjustable t))))
257
258 (defmethod collection-empty-p ((queue queue))
259 (eq (queue-header queue) (queue-pointer queue)))
260
261 (defmethod collection-empty-p ((stack stack))
262 (zerop (fill-pointer (stack-data stack))))
263
264 (defgeneric collection-content (collection)
265 (:method ((collection queue))
266 (rest (queue-header collection)))
267 (:method ((collection stack))
268 (stack-data collection)))
269
270 (defgeneric collection-size (collection)
271 (:method ((collection collection))
272 (length (collection-content collection))))
273
274 (defgeneric enqueue (data queue &key if-empty)
275 (declare (dynamic-extent if-empty))
276 (:argument-precedence-order queue data)
277
278 #+(or) ;; this version caches the released queue cells
279 (:method (data (o queue) &key if-empty)
280 (declare (dynamic-extent if-empty))
281 (if (and (eq (queue-pointer o) (queue-header o))
282 if-empty)
283 (funcall if-empty)
284 (let ((elt nil))
285 (cond ((setf elt nil) ;;(queue-cache o))
286 (setf (queue-cache o) (rest elt)
287 (car elt) data
288 (cdr elt) nil))
289 (t
290 (setf elt (list data))))
291 (setf (cdr (queue-pointer o)) elt
292 (queue-pointer o) elt)))
293 data)
294
295 (:method (data (o queue) &key if-empty)
296 (declare (dynamic-extent if-empty) (ignore if-empty))
297 (let ((elt (list data)))
298 (setf (cdr (queue-pointer o)) elt
299 (queue-pointer o) elt))
300 data)
301
302 (:method ((data t) (queue locked-queue) &key (if-empty (collection-if-empty queue)))
303 (declare (dynamic-extent if-empty))
304 (let ((lock (queue-lock queue))
305 (state :released))
306 (flet ((acquire-it ()
307 (setf state :acquiring)
308 (bt:acquire-lock lock)
309 (setf state :acquired))
310 (release-it ()
311 (setf state :releasing)
312 (bt:release-lock lock)
313 (setf state :released)))
314 (unwind-protect
315 (loop
316 (acquire-it)
317 (if (collection-empty-p queue)
318 ;; if there's no content, decided whether to prime the process
319 ;; or just add to the queue
320 (cond ((queue-processor queue)
321 ;; recursive call is allowed, simply enqueue
322 (return (call-next-method)))
323 ((null if-empty)
324 ;; simple enqueue
325 (return (call-next-method)))
326 (t
327 ;; w/o a processor, but with a if-empty continuation, use it
328 (assert-argument-type dequeue if-empty
329 (or function (and symbol (satisfies fboundp))))
330 (unwind-protect
331 (progn (setf (queue-processor queue) (bt:current-thread))
332 (call-next-method)
333 (release-it)
334 (return (values (funcall if-empty) t)))
335 (setf (queue-processor queue) nil))))
336 ;; if the collection already has content, just enqueue
337 (return (call-next-method))))
338 (ecase state
339 (:released )
340 (:acquired (bt:release-lock lock))
341 ((:acquiring :releasing) ; maybe or maybe not
342 (ignore-errors (bt:release-lock lock))))))))
343
344 (:method (data (stack stack) &key if-empty)
345 (declare (ignore if-empty))
346 (vector-push-extend data (stack-data stack))
347 data)
348
349 (:method ((data t) (stack locked-stack) &rest args)
350 (declare (dynamic-extent args) (ignore args))
351 (bt:with-lock-held ((stack-lock stack))
352 (call-next-method))))
353
354 (defgeneric dequeue (queue &key if-empty test)
355 (declare (dynamic-extent if-empty))
356
357 #+(or) ;; this version caches the released queue cells
358 (:method ((queue queue) &key test if-empty)
359 (declare (ignore if-empty))
360 (let ((head (queue-header queue)))
361 (cond ((eq head (queue-pointer queue))
362 (values nil nil))
363 (test
364 (assert-argument-type dequeue test
365 (or function (and symbol (satisfies fboundp))))
366 (do ((head head (cdr head))
367 (ptr (cdr head) (cdr ptr)))
368 ((null ptr) (values nil nil))
369 (when (funcall test (car ptr))
370 (unless (setf (cdr head) (cdr ptr))
371 (setf (queue-pointer queue) head))
372 (setf (cdr ptr) (queue-cache queue)
373 (queue-cache queue) ptr)
374 (return (values (shiftf (car ptr) nil) t)))))
375 (t
376 (let ((elt (cdr head)))
377 (unless (setf (cdr head) (cdr elt))
378 (setf (queue-pointer queue) head))
379 (setf (cdr elt) (queue-cache queue)
380 (queue-cache queue) elt)
381 (values (shiftf (car elt) nil) t))))))
382
383 (:method ((queue queue) &key test if-empty)
384 (declare (ignore if-empty))
385 (let ((head (queue-header queue)))
386 (cond ((eq head (queue-pointer queue))
387 (values nil nil))
388 (test
389 (assert-argument-type dequeue test
390 (or function (and symbol (satisfies fboundp))))
391 (do ((head head (cdr head))
392 (ptr (cdr head) (cdr ptr)))
393 ((null ptr) (values nil nil))
394 (when (funcall test (car ptr))
395 (unless (setf (cdr head) (cdr ptr))
396 (setf (queue-pointer queue) head))
397 (return (values (car ptr) t)))))
398 (t
399 (let ((value (cadr head)))
400 (unless (setf (cdr head) (cddr head))
401 (setf (queue-pointer queue) head))
402 (values value t))))))
403
404 (:method ((queue locked-queue) &key (if-empty (collection-if-empty queue)) test)
405 (declare (dynamic-extent if-empty)
406 (ignore test))
407 (let ((lock (queue-lock queue))
408 (state :released))
409 (flet ((acquire-it ()
410 (setf state :acquiring)
411 (bt:acquire-lock lock)
412 (setf state :acquired))
413 (release-it ()
414 (setf state :releasing)
415 (bt:release-lock lock)
416 (setf state :released)))
417 (unwind-protect
418 ;; attempt to dequeue a value. if that succeeds return it. otherwise,
419 ;; - if waiting, release the lock and yield in the hope one appears,
420 ;; and repeat the process upon resumption;
421 ;; - if suppressed, return nil
422 ;; - if provided a continuation, return its result
423 (loop
424 (acquire-it)
425 (multiple-value-bind (value value-p)
426 (call-next-method)
427 (if value-p
428 (return (values value t))
429 (case if-empty
430 ((nil)
431 (return (values nil nil)))
432 (:wait
433 (when (queue-processor queue)
434 ;; if there is a processor, wait
435 (assert (not (eq (queue-processor queue) (bt:current-thread))) ()
436 "Recursive dequeue: ~s" (queue-processor queue)))
437 (release-it)
438 (loop (if (collection-empty-p queue)
439 (bt:thread-yield)
440 (return))))
441 (t
442 (assert-argument-type dequeue if-empty
443 (or function (and symbol (satisfies fboundp))))
444 (unwind-protect
445 (progn (setf (queue-processor queue) (bt:current-thread))
446 (release-it)
447 (return (values (funcall if-empty) t)))
448 (setf (queue-processor queue) nil)))))))
449 (ecase state
450 (:released )
451 (:acquired (bt:release-lock lock))
452 ((:acquiring :releasing) ; maybe or maybe not
453 (ignore-errors (bt:release-lock lock))))))))
454
455 (:method ((stack stack) &key (if-empty (collection-if-empty stack)) test)
456 (declare (dynamic-extent if-empty)
457 (ignore test))
458 (let ((data (stack-data stack)))
459 (if (plusp (fill-pointer data))
460 (values (vector-pop data) t)
461 (case if-empty
462 ((nil)
463 (values nil nil))
464 (t
465 (assert-argument-type dequeue if-empty
466 (or function (and symbol (satisfies fboundp))))
467 (values (funcall if-empty) t))))))
468
469 (:method ((stack locked-stack) &rest args)
470 (declare (dynamic-extent args) (ignore args))
471 (bt:with-lock-held ((stack-lock stack))
472 (call-next-method))))
473
474
475
476 #+:de.setf.utility.test
477 (with-test-situation (:define)
478 (test:test parse-version-keyword/1 (parse-version-keyword :amqp-1-1-0-8-0)
479 '(:AMQP 1 1 0 8 0))
480
481 (test:test version-lessp/1 (version-lessp '(amqp 1 1 0 8 0) '(amqp 1 1 0 9 0)))
482
483 (test:test queue/1 (let ((q (make-instance 'queue)))
484 (list (enqueue 1 q)
485 (dequeue q)
486 (dequeue q)
487 (enqueue 2 q)
488 (dequeue q)))
489 '(1 1 NIL 2 2))
490 (test:test queue/1 (let ((q (make-instance 'queue)))
491 (list (enqueue 1 q)
492 (enqueue 2 q)
493 (enqueue 1 q)
494 (dequeue q :test 'evenp)
495 (dequeue q)
496 (dequeue q)
497 (dequeue q)))
498 '(1 2 1 2 1 1 nil))
499 (test:test queue/2 (let ((q (make-instance 'locked-queue :name "test")))
500 (list (enqueue 1 q)
501 (dequeue q)
502 (dequeue q)
503 (enqueue 2 q)
504 (dequeue q)))
505 '(1 1 NIL 2 2))
506 (test:test stack/1 (let ((q (make-instance 'stack :if-empty (let ((x 0)) #'(lambda () (incf x))))))
507 (list (enqueue 'a q)
508 (dequeue q)
509 (dequeue q)
510 (enqueue 'b q)
511 (dequeue q)
512 (dequeue q)
513 (dequeue q)))
514 '(A A 1 B B 2 3))
515 (when bt:*supports-threads-p*
516 (test:test queue/wait (let ((q (make-instance 'locked-queue :name "test")))
517 (list (enqueue 1 q)
518 (dequeue q)
519 (progn (bt:make-thread #'(lambda ()
520 (bt:thread-yield)
521 (enqueue :foreign q)))
522 (dequeue q :if-empty :wait))
523 (enqueue 2 q)
524 (dequeue q)))))
525
526 )
527
528

  ViewVC Help
Powered by ViewVC 1.1.5