/[de-setf-amqp]/test/test.lisp
ViewVC logotype

Contents of /test/test.lisp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3 - (show annotations)
Tue Feb 23 09:05:39 2010 UTC (4 years, 1 month ago) by janderson
File size: 33141 byte(s)
Merge commit 'remotes/github/master' into remotes/git-svn
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
5 (document :file
6 (description "This file defines test utilities for the 'de.setf.amqp' library.")
7 (copyright
8 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de)"
9 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
10 of the GNU Affero General Public License as published by the Free Software Foundation.
11
12 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
13 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14 See the Affero General Public License for more details.
15
16 You should have received a copy of the GNU Affero General Public License along with 'de.setf.amqp'.
17 If not, see the GNU [site](http://www.gnu.org/licenses/)."))
18
19
20 ;;;
21 ;;; set up a delegating stream to act as a handle for test conection instances
22
23 (defclass binary-trace-stream ( #+ALLEGRO excl::fundamental-binary-output-stream
24 #+clisp gray:fundamental-binary-output-stream
25 #+CMU extensions:fundamental-binary-output-stream
26 #+CormanLisp stream
27 #+LispWorks stream:fundamental-stream
28 #+(and MCL digitool) ccl::output-binary-stream
29 #+clozure-common-lisp ccl:fundamental-binary-output-stream
30 #+sbcl sb-gray:fundamental-binary-output-stream
31 #+scl ext:binary-output-stream)
32 ((stream :initform *trace-output* :initarg :stream
33 :reader stream-stream))
34 #+CormanLisp
35 (:default-initargs :element-type '(unsigned-byte 8)))
36
37 (defMethod stream-element-type ((stream binary-trace-stream))
38 '(unsigned-byte 8))
39
40 (defmethod stream-direction ((stream binary-trace-stream))
41 :output)
42
43 (defmethod stream-tyo ((stream binary-trace-stream) (datum integer))
44 (format (stream-stream stream) " 0x~2,'0x" datum))
45
46 (defmethod stream-write-sequence ((vector vector) (stream binary-trace-stream) &key (start 0) (end nil))
47 (setf end (or end (length vector)))
48 (do ((I start (1+ i)))
49 ((>= i end))
50 (declare (fixnum i))
51 (format (stream-stream stream) " 0x~2,'0x" (aref vector i)))
52 (- end start))
53
54
55 (defparameter *binary-trace-output* (make-instance 'binary-trace-stream))
56
57
58 (defmacro with-test-connection ((name &rest initargs) &rest body)
59 `(let ((,name (amqp:connection ,@(or initargs `(t))
60 :uri #u"amqp:///"
61 :input-handle *standard-input*
62 :output-handle *binary-trace-output*)))
63 ,@body))
64
65 (defmacro with-test-channel ((name connection &rest initargs) &rest body)
66 `(let ((,name (amqp:connection.channel ,connection
67 ,@initargs
68 :number t
69 :uri #u"amqp:///"
70 :input-handle *standard-input*
71 :output-handle *binary-trace-output*)))
72 ;; as the input handles have been provided to suppress
73 ;; real protocol negotiation, need to connect it by-hand
74 (connect-channel ,connection ,name)
75 ,@body))
76
77 (defmacro with-test-channels (bindings &rest body)
78 (labels ((wrap-with-binding (bindings)
79 (if bindings
80 `(with-test-channel ,(first bindings)
81 ,(wrap-with-binding (rest bindings)))
82 `(progn ,@body))))
83 (wrap-with-binding bindings)))
84
85 (defgeneric test-class-method-codecs (object arguments &key verbose-p)
86 (:documentation "Test the codec operators for a given object/method combination.
87
88 object : (designator amqp:object)
89 method : (designator amqp:method)
90
91 Resolve the object and method to instances, handling the context requirements at each level. Then encode and
92 decode the arguments and test equivalence. Decoded reserved values are ignored.")
93
94 (:method ((spec t) arguments &rest options)
95 (let ((class (etypecase spec (cons (first spec)) (symbol spec)))
96 (initargs (etypecase spec (cons (rest spec)) (symbol nil))))
97 (cond ((subtypep class 'amqp:connection)
98 (let ((amqp:*class.connection* class))
99 (with-test-connection (connection)
100 (when initargs (apply #'reinitialize-instance connection initargs))
101 (apply #'test-class-method-codecs connection arguments options))))
102 ((subtypep class 'amqp:channel)
103 (with-test-connection (connection)
104 (apply #'test-class-method-codecs (apply #'amqp:ensure-object connection class initargs)
105 arguments options)))
106 (t
107 (with-test-connection (connection)
108 (with-test-channel (channel connection)
109 (apply #'test-class-method-codecs (apply #'amqp:ensure-object channel class initargs)
110 arguments options)))))))
111
112 (:method ((object amqp:object) arguments &rest options
113 &key (verbose-p (eq test:*test-unit-mode* :verbose)))
114 (every #'(lambda (method-name)
115 (let ((method-entry (assoc method-name arguments)))
116 (cond (method-entry
117 (apply #'test-class-method-codecs (amqp:ensure-method object method-name)
118 (rest method-entry)
119 options))
120 (t
121 (when verbose-p
122 (format *trace-output* "No test entry for class method ~s." method-name))
123 nil))))
124 (class-method-names object)))
125
126 (:method ((method amqp:method) arguments &key (verbose-p (eq test:*test-unit-mode* :verbose)))
127 (when (eq verbose-p :break) (break "test codec for method: ~s." method))
128 (apply 'call-with-encoded-arguments
129 #'(lambda (frame object method)
130 (call-with-decoded-arguments #'(lambda (object method &rest decoded-arguments)
131 (when verbose-p
132 (format *trace-output* "~&tcmc: ~a.~a: original ~s~%~7tdecoded ~s"
133 (type-of object)
134 (type-of method)
135 arguments
136 decoded-arguments))
137 (equal (loop for (key value) on decoded-arguments by #'cddr
138 unless (search "reserved" (string key) :test #'char-equal)
139 nconc (list key value))
140 arguments))
141 object
142 method
143 frame))
144 (method-object method) method
145 arguments)))
146
147
148 ;;; support for loopback testing
149 ;;; define a channel which puts output frames direly into input
150 ;;; and arrange for it to be used by the connection
151
152 (defclass loopback-connection (amqp:connection)
153 ((frame-map
154 :initform nil :initarg :frame-map
155 :accessor connection-frame-map)
156 (loopback-enabled
157 :initform t
158 :accessor connection-loopback-enabled))
159 (:documentation "A Specialized connection class which simulates broker interaction by mirroring
160 each frames back into transformed input frames."))
161
162 (defgeneric print-queues (connection stream)
163 (:method ((connection amqp:connection) stream)
164 (format stream "~& [queues in[ ")
165 (flet ((do-frame (frame) (format-frame-header frame stream) (write-char #\space stream)))
166 (map nil #'do-frame (collection-content (device-read-frames connection)))
167 (format stream "] out[ ")
168 (map nil #'do-frame (collection-content (device-encoded-frames connection)))
169 (format stream "]]"))
170 (format stream "]")))
171
172 (defgeneric loopback-frame (output-frame)
173 (:method ((output-frame output-frame))
174 (let* ((connection (frame-connection output-frame))
175 (input-frame (claim-input-frame connection)))
176 (rotatef (slot-value input-frame 'header) (slot-value output-frame 'header))
177 (rotatef (slot-value input-frame 'data) (slot-value output-frame 'data))
178 (release-frame output-frame)
179 input-frame)))
180
181
182 (defmethod put-encoded-frame ((connection loopback-connection) output-frame)
183 (when (connection-loopback-enabled connection)
184 (amqp:log :debug output-frame "loopback output")
185
186 (let ((frame-map (connection-frame-map connection)))
187 (cond (frame-map
188 (dolist (map-operator frame-map
189 (amqp:log :warn output-frame "no mapping."))
190 (multiple-value-bind (mapped handled)
191 (funcall map-operator output-frame)
192 (when handled
193 (typecase mapped
194 (null
195 (amqp:log :debug nil "loopback suppressed"))
196 (amqp:frame
197 (amqp:log :debug mapped "loopback as mapped input")
198 (put-read-frame connection mapped))
199 (sequence
200 (dotimes (i (length mapped))
201 (let ((mapped (elt mapped i)))
202 (amqp:log :debug mapped "loopback as mapped input")
203 (put-read-frame connection mapped)))))
204 (return)))))
205 (t
206 (let ((input-frame (loopback-frame output-frame)))
207 (amqp:log :debug input-frame "default loopback as input")
208 (put-read-frame connection input-frame)))))
209
210 (flet ((log-queues (stream)
211 (print-queues connection stream)))
212 (declare (dynamic-extent #'log-queues))
213 (log-when :debug #'log-queues))))
214
215 ;;; this has no effect as the channels pass through the connection queue in their
216 ;;; own specialized method
217 (defmethod get-read-frame ((connection loopback-connection) &key (wait nil))
218 (flet ((frame-matches-connection-p (frame)
219 (eql (frame-channel-number frame) 0)))
220 (declare (dynamic-extent #'frame-matches-connection-p))
221 (or (dequeue (device-read-frames connection)
222 :test #'frame-matches-connection-p
223 :if-empty wait)
224 (error "empty loopback queue."))))
225
226
227 ;;;
228 ;;; frequent loopback opreators
229
230 (defun make-publish-to-deliver-loopback (from-channel to-channel &key
231 (redelivered nil)
232 (delivery-tag 0)
233 (message-count 0))
234 (flet ((publish-to-deliver-loopback (frame)
235 (when (and (eql (frame-channel-number frame) (channel-number from-channel))
236 (eq (frame-type-class-name frame) 'amqp:method)
237 (eq (frame-method-name frame) 'amqp:publish))
238 (let ((to-basic (amqp:basic to-channel))
239 (from-basic (amqp:basic from-channel)))
240 (labels ((encode-get-ok (class method &key routing-key exchange &allow-other-keys)
241 (declare (ignore class method))
242 (release-frame frame)
243 (call-with-encoded-arguments #'return-get-ok to-basic 'amqp:get-ok
244 :delivery-tag (incf delivery-tag)
245 :redelivered redelivered
246 :exchange exchange
247 :routing-key routing-key
248 :message-count (incf message-count)))
249 (return-get-ok (frame class method)
250 (declare (ignore class method))
251 (return-from publish-to-deliver-loopback
252 (values (loopback-frame frame) t))))
253 (declare (dynamic-extent #'encode-get-ok #'return-get-ok))
254 (call-with-decoded-arguments #'encode-get-ok from-basic 'amqp:publish frame))))))
255 #'publish-to-deliver-loopback))
256
257 (defun make-content-loopback (from-channel to-channel)
258 (flet ((content-loopback (frame)
259 (when (and (eql (frame-channel-number frame) (channel-number from-channel))
260 (member (frame-type-class-name frame) '(amqp:header amqp:body)))
261 (let ((input-frame (loopback-frame frame)))
262 (setf (frame-channel-number input-frame)
263 (channel-number to-channel))
264 (values input-frame t)))))
265 #'content-loopback))
266
267 (defun make-declare-queue-ok-loopback (channel &key
268 (consumer-count 0)
269 (message-count 0))
270 (flet ((queue-ok-loopback (frame)
271 (when (and (eql (frame-channel-number frame) (channel-number channel))
272 (eq (frame-type-class-name frame) 'amqp:method)
273 (eq (frame-method-name frame) 'amqp:declare)
274 (eq (frame-class-name frame) 'amqp:queue))
275 (let ((queue (amqp:queue channel))
276 (ok-frame nil))
277 (flet ((encode-ok (class method &key queue &allow-other-keys)
278 (declare (ignore class method))
279 (setf ok-frame
280 (encode-method 'amqp:declare-ok (amqp:queue channel)
281 :queue queue
282 :message-count (incf message-count)
283 :consumer-count (incf consumer-count)))))
284 (declare (dynamic-extent #'encode-ok))
285 (call-with-decoded-arguments #'encode-ok queue (amqp:ensure-method queue 'amqp:declare) frame)
286 (release-frame frame)
287 (values (loopback-frame ok-frame) t))))))
288 #'queue-ok-loopback))
289
290 (defun make-declare-exchange-ok-loopback (channel)
291 (flet ((exchange-ok-loopback (frame)
292 (when (and (eq (frame-type-class-name frame) 'amqp:method)
293 (eq (frame-method-name frame) 'amqp:declare)
294 (eq (frame-class-name frame) 'amqp:exchange))
295 (let* ((exchange (amqp:exchange channel))
296 (ok-frame (encode-method 'amqp:declare-ok exchange)))
297 (release-frame frame)
298 (values (loopback-frame ok-frame) t)))))
299 #'exchange-ok-loopback))
300
301 (defun make-consume-ok-loopback (channel)
302 (flet ((consume-ok-loopback (frame)
303 (when (and (eql (frame-channel-number frame) (channel-number channel))
304 (eq (frame-type-class-name frame) 'amqp:method)
305 (eq (frame-method-name frame) 'amqp:consume)
306 (eq (frame-class-name frame) 'amqp:basic))
307 (let* ((exchange (amqp:exchange channel))
308 (ok-frame (encode-method 'amqp:declare-ok exchange)))
309 (release-frame frame)
310 (values (loopback-frame ok-frame) t)))))
311 #'consume-ok-loopback))
312
313 (defun make-bind-ok-loopback (channel)
314 (flet ((bind-ok-loopback (frame)
315 (when (and (eql (frame-channel-number frame) (channel-number channel))
316 (eq (frame-type-class-name frame) 'amqp:method)
317 (eq (frame-method-name frame) 'amqp:bind)
318 (eq (frame-class-name frame) 'amqp:queue))
319 (let* ((queue (amqp:queue channel))
320 (ok-frame (encode-method 'amqp:bind-ok queue)))
321 (release-frame frame)
322 (values (loopback-frame ok-frame) t)))))
323 #'bind-ok-loopback))
324
325 (defun make-command-sink (method-name)
326 (flet ((method-sink (frame)
327 (when (and (eq (frame-type-class-name frame) 'amqp:method)
328 (eq (frame-method-name frame) method-name))
329 (release-frame frame)
330 (values nil t))))
331 #'method-sink))
332
333 (defun make-ok-reflector (channel class out-method in-method)
334 (flet ((ok-reflector (frame)
335 (when (and (eql (frame-channel-number frame) (channel-number channel))
336 (eq (frame-type-class-name frame) 'amqp:method)
337 (eq (frame-method-name frame) out-method)
338 (eq (frame-class-name frame) class))
339 (let* ((queue (amqp:ensure-object channel class))
340 (ok-frame (encode-method in-method queue)))
341 (release-frame frame)
342 (values (loopback-frame ok-frame) t)))))
343 #'ok-reflector))
344
345 ;;;
346 ;;;
347
348
349 (defgeneric test-class-method-loopback (output-script input-constraints &key connection-class verbose-p)
350 (:documentation "Test the output against the processed values
351
352 OUTPUT-SCRIPT : a list of forms, each (request-operator . args)
353 INPUT-CONSTRAINTS : a list of forms, each (method-type class-type . args)
354
355 the request is written to a loopback channel and the result is received until none are left.
356 each is compared by type and argument list to the contraint form.")
357
358 (:method ((output list) (input list) &key (connection-class (error "connection-class required."))
359 (verbose-p nil))
360
361 (let ((amqp:*class.connection* connection-class))
362 (with-test-connection (connection)
363 (with-test-channel (channel connection :number 1)
364 (dolist (request output)
365 (destructuring-bind (op class . args) request
366 (setf class (amqp:ensure-object channel class))
367 (apply op class args)))
368 (let ((failure nil))
369 (command-loop (channel :wait nil)
370 (t ((class t) &rest args)
371 (let ((test-response (pop input)))
372 (unless test-response
373 (setf failure `(:input ,amqp::method ,class ,args))
374 (return-from command-loop nil))
375 (destructuring-bind (method-type class-type . test-args) test-response
376 (when verbose-p
377 (format *trace-output* "~&~%received: (~s ~s)~%~8t. ~s~%constraint: (~s ~s) . ~s"
378 amqp::method class args method-type class-type test-args))
379 (or (and (typep amqp::method method-type) (typep class class-type)
380 (loop for (key value) on test-args by #'cddr
381 unless (equalp (getf args key) value)
382 return nil
383 finally (return t)))
384 (return-from command-loop
385 (progn (setf failure `(:input ,amqp::method ,class ,args)) nil)))))))
386 (and (null failure) (null input))))))))
387
388 ;;;
389 ;;; test timing:
390 ;;; time processing to send and receive a command. in order to test the pipeline per-se,
391 ;;; create all markers by running one loop. then time repeated passes. the result indicates
392 ;;; the total duration and memory usage and an average per echoed command. the minimum memory
393 ;;; usage is that requird to service the queues.
394 ;;;
395 ;;; mcl5.2 g5-1.8ghz is .4ms per loopback
396 ;;; (:loops 4000 :SECONDS 1.683 :BYTES 60416 :SECONDS/LOOPBACK 4.2075E-4 :BYTES/LOOPBACK 15.104)
397
398 (defun call-with-loopback (op &key (class 'amqp-1-1-0-9-1-loopback-connection))
399 (let ((amqp:*class.connection* class)
400 (command-count 0))
401 (with-test-connection (connection)
402 (with-test-channel (channel connection :number 1)
403 (let ((basic (amqp:basic channel ))
404 (exchange (amqp:exchange channel :exchange "" :type "direct"))
405 (queue (amqp:queue channel :queue "" :message-count 1 :consumer-count 0))
406 (tx (amqp:tx channel)))
407
408 (setf (connection-frame-map connection)
409 (list (make-publish-to-deliver-loopback channel channel)
410 (make-content-loopback channel channel)
411 (make-declare-queue-ok-loopback channel)
412 (make-declare-exchange-ok-loopback channel)
413 (make-bind-ok-loopback channel)
414 (make-consume-ok-loopback channel)
415 (make-command-sink 'amqp:get)
416 (make-ok-reflector channel 'amqp:tx 'amqp:select 'amqp:select-ok)
417 (make-command-sink 'amqp:ack)))
418 (flet ((basic.publish ()
419 ;; there is no intrinsic response
420 (amqp:publish basic :exchange exchange)
421 ;; clear the respose: get-ok,header.body
422 (setf (device-state channel) amqp.s:use-channel.body.input)
423 (dotimes (i 3) ; command, header, body
424 (command-case (channel)
425 (t ((class t) &rest args)
426 (declare (dynamic-extent args))
427 (amqp:log :debug class "cwl in: ~a : ~s" amqp::method args)
428 t))))
429 (exchange.declare ()
430 ;; request op handles response
431 (amqp:declare exchange))
432 (queue.declare ()
433 ;; request op handles response
434 (amqp:declare queue))
435 (tx.select ()
436 ;; request op handles response
437 (amqp:select tx)))
438 (flet ((run (&rest operations)
439 (declare (dynamic-extent operations))
440 (dolist (op (or operations '(basic.publish exchange.declare queue.declare tx.select)))
441 (ecase op
442 (basic.publish (basic.publish))
443 (exchange.declare (exchange.declare))
444 (queue.declare (queue.declare))
445 (tx.select (tx.select)))
446 (incf command-count))))
447 (funcall op #'run))))))
448 command-count))
449
450
451 (defun run-loopback (&key (passes 1) (operations nil) ((:log-level *log-level*) *log-level*)
452 (class 'amqp-1-1-0-9-1-loopback-connection))
453 (labels ((run-loopback (run)
454 (dotimes (i passes)
455 (apply run operations))))
456
457 (call-with-loopback #'run-loopback :class class)))
458
459
460 ;;; about a 20% difference with the cache enabled
461 #+(or ) (time-loopback :passes 1000 :operations '() :cache-p t) ; :log-level :warn)
462 #+(or ) (time-loopback :passes 1000 :operations '() :cache-p nil) ; :log-level :warn)
463
464 (defun loopback-class-methods (combinations &key (class 'amqp-1-1-0-9-1-loopback-connection)
465 ((:log-level *log-level*) *log-level*))
466 (let ((amqp:*class.connection* class)
467 (count 0))
468 (with-test-connection (connection)
469 (with-test-channel (channel connection :number 1)
470 (loop for (class-name . methods) in combinations
471 do (let ((class (amqp:ensure-object channel (cons-symbol :amqp class-name))))
472 (dolist (method methods)
473 (send-method (cons-symbol :amqp method) class)
474 (command-case (channel)
475 (t ((class t) &rest args)
476 (declare (dynamic-extent args))
477 (amqp:log :debug class "~a : ~s" amqp::method args)
478 (incf count))))))))
479 count))
480
481 #+(or )
482 (loopback-class-methods '((basic publish) (exchange declare) (queue declare) (tx select))
483 :log-level :debug)
484
485
486
487 (defun loopback-objects (content &key (class 'amqp-1-1-0-9-1-loopback-connection)
488 (content-type mime:application/sexp)
489 (element-type 'standard-object)
490 ((:log-level *log-level*) *log-level*))
491 (let ((amqp:*class.connection* class))
492 (with-test-connection (connection)
493 (with-test-channels ((channel-publish connection :number 1 :element-type element-type :content-type content-type)
494 (channel-consume connection :number 2 :element-type element-type :content-type content-type))
495 (let ((basic-publish (amqp:basic channel-publish))
496 (basic-consume (amqp:basic channel-consume))
497 (exchange-publish (amqp:exchange channel-publish :exchange "ex" :type "direct"))
498 (queue-publish (amqp:queue channel-publish :queue "q1" :message-count 0 :consumer-count 0))
499 (queue-consume (amqp:queue channel-consume :queue "q1" :message-count 0 :consumer-count 0)))
500
501 (setf (connection-frame-map connection)
502 (list (make-publish-to-deliver-loopback channel-publish channel-consume)
503 (make-content-loopback channel-publish channel-consume)
504 (make-declare-queue-ok-loopback channel-publish)
505 (make-declare-exchange-ok-loopback channel-publish)
506 (make-bind-ok-loopback channel-publish)
507 (make-declare-queue-ok-loopback channel-consume)
508 (make-consume-ok-loopback channel-consume)
509 (make-command-sink 'amqp:get)
510 (make-command-sink 'amqp:ack)))
511
512 ;; declare and bind the queues
513 (amqp:request-declare queue-publish)
514 (amqp:request-declare exchange-publish)
515 (amqp:request-bind queue-publish :exchange exchange-publish
516 :queue queue-publish
517 :routing-key "/")
518
519 (mapcar #'(lambda (content)
520 (amqp:request-publish basic-publish :body content :exchange exchange-publish)
521 (amqp:request-get basic-consume :queue queue-consume))
522 content))))))
523
524 ;;;
525 ;;; simple class to use for object message tests
526
527 (defclass test-object ()
528 ((number :initform 1 :initarg :number)
529 (character :initform #\return :initarg :character)
530 (string :initform "two" :initarg :string)
531 (vector :initform #(1 2 3) :initarg :vector)))
532
533 (defmethod print-object ((object test-object) (stream t))
534 (print-unreadable-object (object stream :identity t :type t)
535 (dolist (sd (c2mop:class-direct-slots (class-of object)))
536 (let ((name (c2mop:slot-definition-name sd)))
537 (format stream " [~a ~s]"
538 name (bound-slot-value object name '|#<unbound>|))))))
539
540 (defmethod make-load-form ((object test-object) &optional environment)
541 (declare (ignore environment))
542 (values `(allocate-instance (find-class ',(type-of object)))
543 `(progn ,@(remove nil
544 (mapcar #'(lambda (sd)
545 (let ((name (c2mop:slot-definition-name sd)))
546 (when (slot-boundp object name)
547 `(setf (slot-value ,object ',name)
548 ',(slot-value object name)))))
549 (c2mop:class-direct-slots (class-of object)))))))
550
551
552 (defun test-live-objects (content &key (content-type mime:application/sexp)
553 (verbose-p nil) (delay-p nil)
554 (element-type 'standard-object)
555 (uri "amqp://guest:guest@192.168.1.25/")
556 ((:log-level *log-level*) *log-level*)
557 (count 1) (no-ack t))
558 (let ((connection (make-instance 'amqp:connection :uri uri)))
559 (let ((channel-publish (amqp:connection.channel connection :uri #u"/?exchange=e1&queue=q1"
560 :element-type element-type
561 :content-type content-type))
562 (channel-consume (amqp:connection.channel connection :direction :input :uri #u"/?queue=q1"
563 :element-type element-type
564 :content-type content-type)))
565 (let ((basic-publish (amqp:channel.basic channel-publish))
566 (basic-consume (amqp:channel.basic channel-consume :no-ack no-ack))
567 (exchange-publish (amqp:channel.exchange channel-publish :exchange "ex" :type "direct"))
568 (queue-publish (amqp:channel.queue channel-publish :queue "q1" :message-count 0 :consumer-count 0))
569 (queue-consume (amqp:channel.queue channel-consume :queue "q1" :message-count 0 :consumer-count 0)))
570
571 ;; declare and bind the queues
572 (amqp:declare queue-publish)
573 (amqp:request-declare exchange-publish)
574 (amqp:request-bind queue-publish :exchange exchange-publish
575 :queue queue-publish
576 :routing-key "my key")
577 (let ((start (get-internal-run-time))
578 (get-count 0)
579 (publish-count 0)
580 (result nil))
581 (values
582 (dotimes (i count result)
583 (mapcar #'(lambda (content)
584 (amqp:request-publish basic-publish :body content :exchange exchange-publish
585 :routing-key "my key")
586 (incf publish-count)
587 (when delay-p (sleep (random 1.0)))
588 (setf result (amqp:request-get basic-consume :queue queue-consume))
589 (incf get-count)
590 (when verbose-p (print result)))
591 content))
592 (- (get-internal-run-time) start)
593 (+ publish-count get-count)))))))
594
595 ;;;
596 ;;; utilities
597
598 (defmethod drain-connection ((data vector) (stream stream) &key (start 0) (end (length data)))
599 "read an return all bytes available on the stream."
600 (multiple-value-bind (null error)
601 (ignore-errors
602 (do ((i start (1+ i))
603 (byte (read-byte stream)
604 (read-byte stream)))
605 ((or (>= i end) (null byte) (< byte 0))
606 (subseq data 0 i))
607 (format *trace-output* " ~2,'0d" byte)
608 (force-output *trace-output*)
609 (setf (aref data i) byte)))
610 (cond ((null null)
611 (princ error)
612 (values data error))
613 (t
614 data))))
615
616 (defun probe-connection (&key (host "127.0.0.1") (port *standard-port*) (repeat 0))
617 ;; open, write protocol token, read frame, write static frame, read complete to eof
618 (let* ((socket (usocket:socket-connect host port :element-type 'unsigned-byte))
619 (stream (usocket:socket-stream socket))
620 (data (make-frame-buffer 1024))
621 (token (make-frame-buffer 8))
622 (byte 0))
623 (unwind-protect
624 (progn
625 (setf (buffer-protocol-header token) *default-version*)
626 (write-sequence token stream)
627 (dotimes (i repeat)
628 (write-sequence (map 'vector #'char-code #(#\return #\linefeed))
629 stream))
630 (force-output stream)
631
632 ;; read header
633 (case (setf byte (read-byte stream))
634 ;; the later protocols reply with a version to confirm, but
635 ;; the early ones just send the start frame immediately
636 (#.(char-code #\A)
637 (setf (aref data 0) byte)
638 (unless (= 8 (read-sequence token stream :start 1))
639 (error "protocol token failed to read."))
640 (buffer-protocol-header token))
641 (t
642 (setf (aref data 0) byte)
643 (drain-connection data stream :start 1))))
644 (when socket (usocket:socket-close socket)))))
645
646
647 #|
648 ;;; generic dispatch and keyword processing overhead
649
650 (defgeneric test-keywords (context1 context2 &rest args &key key1 key2 key3 key4 key5)
651 (declare (dynamic-extent args))
652 (:method ((context1 t) (context2 null) &rest args)
653 (declare (dynamic-extent args))
654 (every #'identity args))
655 (:method ((context1 fixnum) (context2 cons) &key key1 key2 key3 key4 key5)
656 (test-keywords (1+ context1) (rest context2)
657 :key1 key1
658 :key2 key2
659 :key3 key3
660 :key4 key4
661 :key5 key5)))
662
663 (defun test-required (context1 context2 key1 key2 key3 key4 key5)
664 (if context2
665 (test-required (1+ context1) (rest context2) key1 key2 key3 key4 key5)
666 (and key1 key2 key3 key4 key5)))
667
668 (defun test-calls (&key (count 1024) (data (make-list 1024)))
669 (time (dotimes (x count)
670 (test-keywords 0 data :key1 1 :key2 'two :key3 "three" :key4 4.0 :key5 #(5))))
671 (time (dotimes (x count)
672 (test-required 0 data 1 'two "three" 4.0 #(5)))))
673
674
675 ;;; (test-calls :count (* 100 1024):data (make-list 10))
676
677 |#

  ViewVC Help
Powered by ViewVC 1.1.5