diff --git a/pkgdcl.lisp b/pkgdcl.lisp index 380c2584857899ab0e0519dc49b864542ba15943..febff802c279778b4847159b0676cf1146e24216 100644 --- a/pkgdcl.lisp +++ b/pkgdcl.lisp @@ -1,8 +1,8 @@ (asdf/package:define-package quux-hunchentoot (:use :common-lisp :hunchentoot - :bordeaux-threads :alexandria + :chanl ;; for exchanging messages :optima ;; for matching inter-thread messages :classy) ;; for its >simple-fifo-queue< (:import-from diff --git a/thread-pooling.lisp b/thread-pooling.lisp index 33ee79697278e472df9e95a847811448f33722ef..f26ebce45875f7ed32b285632e4f7b2bb2701e3a 100644 --- a/thread-pooling.lisp +++ b/thread-pooling.lisp @@ -2,7 +2,7 @@ (defun make-channel () "Create a communication channel between threads in our pool" - (make-instance 'chanl:unbounded-channel)) + (make-instance 'unbounded-channel)) ;; This taskmaster takes threads out of a worker thread pool. ;; Workers have to register themselves whenever they are ready @@ -13,7 +13,7 @@ ;; (defclass thread-pooling-taskmaster (taskmaster) ((master-lock - :initform (make-lock "taskmaster-master") + :initform (bt:make-lock "taskmaster-master") :reader taskmaster-master-lock :documentation "Thread-unsafe operations without a clear owner use this lock") @@ -136,16 +136,24 @@ implementations.")) (defmethod shutdown ((taskmaster thread-pooling-taskmaster)) ;; just wait until all workers are done, send them all a shutdown message, then die. - (with-lock-held ((taskmaster-master-lock taskmaster)) + (bt:with-lock-held ((taskmaster-master-lock taskmaster)) (when (eq (taskmaster-status taskmaster) :running) - (chanl:send (dispatcher-channel taskmaster) `(:shutdown) :blockp nil)))) + (send taskmaster `(:shutdown) :blockp nil)))) + +(defgeneric start-thread (context thunk &key)) (defmethod start-thread ((taskmaster thread-pooling-taskmaster) thunk &key name) (declare (ignorable taskmaster)) (bt:make-thread thunk :name name)) +(defmethod send ((taskmaster thread-pooling-taskmaster) message &rest keys &key &allow-other-keys) + (apply 'send (dispatcher-channel taskmaster) message keys)) + +(defmethod recv ((taskmaster thread-pooling-taskmaster) &rest keys &key &allow-other-keys) + (apply 'recv (dispatcher-channel taskmaster) keys)) + (defmethod execute-acceptor ((taskmaster thread-pooling-taskmaster)) - (with-lock-held ((taskmaster-master-lock taskmaster)) + (bt:with-lock-held ((taskmaster-master-lock taskmaster)) (when (eq (taskmaster-status taskmaster) :stopped) (setf (taskmaster-status taskmaster) :running) (setf (acceptor-process taskmaster) @@ -164,7 +172,7 @@ implementations.")) (acceptor-port (taskmaster-acceptor taskmaster)))))))) (defmethod handle-incoming-connection ((taskmaster thread-pooling-taskmaster) connection) - (chanl:send (dispatcher-channel taskmaster) `(:process-connection ,connection))) + (send taskmaster `(:process-connection ,connection))) (defun mark-worker-ready (taskmaster worker-id chan) ;; POST: the worker has been removed from the busy-workers and pushed onto the available-workers @@ -180,55 +188,53 @@ implementations.")) (setf (gethash worker-id (taskmaster-busy-workers taskmaster)) (list connection chan)) (values)) -(defun get-worker-busy-on-connection (taskmaster worker-id chan connection) +(defun get-worker-busy-on-connection (taskmaster worker-id channel connection) ;; POST: the worker is added to the busy-workers - (chanl:send chan `(:process-connection ,connection) :blockp nil) - (mark-worker-busy taskmaster worker-id connection chan)) + (send channel `(:process-connection ,connection) :blockp nil) + (mark-worker-busy taskmaster worker-id connection channel)) (defmethod too-many-taskmaster-requests ((taskmaster thread-pooling-taskmaster) connection) (acceptor-log-message (taskmaster-acceptor taskmaster) :warning "Can't handle a new request, too many request threads already")) -(defgeneric initialize-worker-thread (taskmaster worker-id &optional connection)) -(defmethod initialize-worker-thread ((taskmaster thread-pooling-taskmaster) worker-id &optional connection) +(defgeneric initialize-worker-thread (taskmaster worker-id channel)) +(defmethod initialize-worker-thread ((taskmaster thread-pooling-taskmaster) worker-id channel) ;; NB: define :before methods to initialize any application-specific worker thread state - (let ((channel (make-channel))) - (when connection - (process-connection (taskmaster-acceptor taskmaster) connection)) - (loop :for request = (progn - (chanl:send (dispatcher-channel taskmaster) - `(:worker-ready ,worker-id ,channel) :blockp nil) - (chanl:recv channel :blockp t)) - :do (ematch request - ((list :process-connection connection) - (process-connection (taskmaster-acceptor taskmaster) connection)) - ((list :shutdown) - (return)))))) + (loop :for request = (recv channel :blockp t) + :do (ematch request + ((list :process-connection connection) + (process-connection (taskmaster-acceptor taskmaster) connection)) + ((list :shutdown) + (return))) + (send taskmaster `(:worker-ready ,worker-id ,channel) :blockp nil))) (defgeneric next-worker-id (taskmaster)) (defmethod next-worker-id ((taskmaster thread-pooling-taskmaster)) (incf (taskmaster-thread-count taskmaster))) -(defmethod create-worker-thread ((taskmaster thread-pooling-taskmaster) connection) +(defgeneric create-worker-thread (taskmaster)) + +(defmethod create-worker-thread ((taskmaster thread-pooling-taskmaster)) "Create a worker thread for handling requests" ;; we are handling all conditions here as we want to make sure that ;; the acceptor process never crashes while trying to create a ;; worker thread; one such problem exists in ;; GET-PEER-ADDRESS-AND-PORT which can signal socket conditions on ;; some platforms in certain situations. - (let ((worker-id (next-worker-id taskmaster))) - (when connection - (mark-worker-busy taskmaster worker-id connection nil)) + (let ((worker-id (next-worker-id taskmaster)) + (channel (make-channel))) (handler-case* (start-thread taskmaster - (lambda () (initialize-worker-thread taskmaster worker-id connection)) + (lambda () (initialize-worker-thread taskmaster worker-id channel)) :name (format nil (taskmaster-worker-thread-name-format taskmaster) worker-id)) (error (condition) (let ((*acceptor* (taskmaster-acceptor taskmaster))) (log-message* *lisp-errors-log-level* "Error while creating worker thread: ~A" condition)))))) +(defgeneric dispatch-work (taskmaster)) + (defmethod dispatch-work ((taskmaster thread-pooling-taskmaster)) ;; Here's the idea, with the stipulations given in THREAD-POOLING-TASKMASTER ;; - If MAX-THREAD-COUNT is null, just start a taskmaster @@ -241,7 +247,6 @@ implementations.")) (with-accessors ((master-lock taskmaster-master-lock) (dispatcher-process dispatcher-process) (taskmaster-status taskmaster-status) - (channel dispatcher-channel) (pending-connections taskmaster-pending-connections) (available-workers taskmaster-available-workers) (busy-workers taskmaster-busy-workers) @@ -250,13 +255,13 @@ implementations.")) (max-accept-count taskmaster-max-accept-count) (max-thread-count taskmaster-max-thread-count)) taskmaster - (with-lock-held (master-lock) + (bt:with-lock-held (master-lock) (assert (eq taskmaster-status :running)) (assert (eq dispatcher-process (bt:current-thread)))) (loop (loop :for wait = t :then nil - :for request = (chanl:recv channel :blockp wait) + :for request = (recv taskmaster :blockp wait) :while request :do (ematch request ((list :worker-ready worker-id chan) @@ -271,9 +276,9 @@ implementations.")) (:stopping (dolist (worker (dequeue-all available-workers)) (ematch worker - ((list :worker worker-id chan) + ((list :worker worker-id channel) (declare (ignorable worker-id)) - (chanl:send chan '(:shutdown) :blockp nil)))) + (send channel '(:shutdown) :blockp nil)))) (dolist (connection (dequeue-all pending-connections)) (too-many-taskmaster-requests taskmaster connection) (send-service-unavailable-reply taskmaster connection)) @@ -288,13 +293,16 @@ implementations.")) (return)) ;; worker available? give him the job! ((not (empty-p available-workers)) - (let ((connection (dequeue pending-connections))) - (ematch (dequeue available-workers) - ((list :worker worker-id chan) - (get-worker-busy-on-connection taskmaster worker-id chan connection))))) + (ematch (dequeue available-workers) + ((list :worker worker-id channel) + (get-worker-busy-on-connection + taskmaster worker-id channel (dequeue pending-connections))))) ;; positions available for more workers? hire a new one for the job! ((or (null max-thread-count) (< thread-count max-thread-count)) - (create-worker-thread taskmaster (dequeue pending-connections))) + (multiple-value-bind (worker-id channel) + (create-worker-thread taskmaster) + (get-worker-busy-on-connection + taskmaster worker-id channel (dequeue pending-connections)))) ;; Already trying to handle too many connections? Deny request with 503. ((if max-accept-count (> accept-count max-accept-count)