diff --git a/thread-pooling.lisp b/thread-pooling.lisp index b191b33de74badcd3aadc8f6deea517b2bcd4f31..684b3945db9866f489c23f93e099eab5ae3c7bd1 100644 --- a/thread-pooling.lisp +++ b/thread-pooling.lisp @@ -13,6 +13,8 @@ ;; (defclass thread-pooling-taskmaster (multi-threaded-taskmaster) ((master-lock + ;; controls status, acceptor-process, and + ;; all slots that are valid while there is or may be no dispatcher-process to process messages. :initform (bt:make-lock "taskmaster-master") :reader taskmaster-master-lock :documentation @@ -22,11 +24,6 @@ :initform :stopped :documentation "The status of the taskmaster: :STOPPED, :RUNNING, :STOPPING.") - (acceptor-process ;; must hold the master lock to change - :accessor acceptor-process - :documentation - "A process that accepts incoming connections and hands them off to new processes - for request handling.") (dispatcher-process ;; must hold the master lock to change :accessor dispatcher-process :documentation @@ -56,7 +53,7 @@ :type (or integer null) :initarg :max-thread-count :initform nil - :accessor taskmaster-max-thread-count + :reader taskmaster-max-thread-count :documentation "The maximum number of request threads this taskmaster will simultaneously run before refusing or queueing new connections requests. If the value @@ -71,7 +68,7 @@ :type (or integer boolean) :initarg :max-accept-count :initform nil - :accessor taskmaster-max-accept-count + :reader taskmaster-max-accept-count :documentation "The maximum number of connections this taskmaster will accept before refusing new connections. If supplied and an integer, @@ -95,13 +92,13 @@ (:default-initargs :max-thread-count *default-max-thread-count* :max-accept-count *default-max-accept-count*) - (:documentation "A taskmaster that starts one thread for listening -to incoming requests and one new thread for each incoming connection. + (:documentation "A taskmaster that maintains a pool of worker threads +and a queue of accepted connections to be processed. -If MAX-THREAD-COUNT is null, a new thread will always be created for -each request. +If MAX-THREAD-COUNT is null, a new thread will always be created +when all existing workers are busy. -If MAX-THREAD-COUNT is supplied, the number of request threads is +If MAX-THREAD-COUNT is supplied, the number of worker threads is limited to that. Furthermore, if MAX-ACCEPT-COUNT is not supplied, an HTTP 503 will be sent if the thread limit is exceeded. Otherwise, if MAX-ACCEPT-COUNT is supplied, it must be greater than MAX-THREAD-COUNT; @@ -122,10 +119,7 @@ In an environment with a single Hunchentoot server, it's reasonable to provide both MAX-THREAD-COUNT and a somewhat larger value for MAX-ACCEPT-COUNT. This will cause a server that's almost out of resources to wait a bit; if the server is completely out of resources, -then the reply will be HTTP 503. - -This is the default taskmaster implementation for multi-threaded Lisp -implementations.")) +then the reply will be HTTP 503.")) (defmethod initialize-instance :after ((taskmaster thread-pooling-taskmaster) &rest init-args) "Ensure the if MAX-ACCEPT-COUNT is supplied, that it is greater than MAX-THREAD-COUNT." @@ -142,21 +136,23 @@ implementations.")) (defmethod hunchentoot::decrement-taskmaster-accept-count ((taskmaster thread-pooling-taskmaster)) ;; Compatibility function so we can reuse hunchentoot:send-service-unavailable-reply - (declare (ignorable taskmaster)) (values)) (defmethod shutdown ((taskmaster thread-pooling-taskmaster)) ;; just wait until all workers are done, send them all a shutdown message, then die. (bt:with-lock-held ((taskmaster-master-lock taskmaster)) - (when (eq (taskmaster-status taskmaster) :running) - (send taskmaster `(:shutdown) :blockp nil)))) + (ecase (taskmaster-status taskmaster) + ((:stopped :stopping)) ;; no active dispatcher to receive a message + ((:running) (dispatcher-send taskmaster `(:shutdown) :blockp t))))) ;; NB: by using the send and recv gf's, we provide a specialization point. -(defmethod send ((taskmaster thread-pooling-taskmaster) message &rest keys &key &allow-other-keys) - (apply 'send (dispatcher-channel taskmaster) message keys)) +(defgeneric dispatcher-send (taskmaster message &key &allow-other-keys) + (:method ((taskmaster thread-pooling-taskmaster) message &rest keys &key &allow-other-keys) + (apply 'send (dispatcher-channel taskmaster) message keys))) -(defmethod recv ((taskmaster thread-pooling-taskmaster) &rest keys &key &allow-other-keys) - (apply 'recv (dispatcher-channel taskmaster) keys)) +(defgeneric dispatcher-recv (taskmaster &key &allow-other-keys) + (:method ((taskmaster thread-pooling-taskmaster) &rest keys &key &allow-other-keys) + (apply 'recv (dispatcher-channel taskmaster) keys))) (defmethod execute-acceptor ((taskmaster thread-pooling-taskmaster)) (bt:with-lock-held ((taskmaster-master-lock taskmaster)) @@ -172,13 +168,13 @@ implementations.")) (setf (dispatcher-process taskmaster) (start-thread taskmaster - (lambda () (dispatch-work taskmaster)) + (lambda () (run-dispatcher-thread taskmaster)) :name (format nil "hunchentoot-dispatcher-~A:~A" (or (acceptor-address (taskmaster-acceptor taskmaster)) "*") (acceptor-port (taskmaster-acceptor taskmaster)))))))) (defmethod handle-incoming-connection ((taskmaster thread-pooling-taskmaster) connection) - (send taskmaster `(:process-connection ,connection))) + (dispatcher-send taskmaster `(:process-connection ,connection))) (defun mark-worker-ready (taskmaster worker-id chan) ;; POST: the worker has been removed from the busy-workers and pushed onto the available-workers @@ -196,23 +192,23 @@ implementations.")) (defun get-worker-busy-on-connection (taskmaster worker-id channel connection) ;; POST: the worker is added to the busy-workers - (send channel `(:process-connection ,connection) :blockp nil) + (send channel `(:process-connection ,connection) :blockp t) (mark-worker-busy taskmaster worker-id connection channel)) (defmethod too-many-taskmaster-requests ((taskmaster thread-pooling-taskmaster) connection) (acceptor-log-message (taskmaster-acceptor taskmaster) :warning "Can't handle a new request, too many request threads already")) -(defgeneric initialize-worker-thread (taskmaster worker-id channel)) -(defmethod initialize-worker-thread ((taskmaster thread-pooling-taskmaster) worker-id channel) +(defgeneric run-worker-thread (taskmaster worker-id channel)) +(defmethod run-worker-thread ((taskmaster thread-pooling-taskmaster) worker-id channel) ;; NB: define :before methods to initialize any application-specific worker thread state - (loop :for request = (recv channel :blockp t) - :do (ematch request - ((list :process-connection connection) - (process-connection (taskmaster-acceptor taskmaster) connection)) - ((list :shutdown) - (return))) - (send taskmaster `(:worker-ready ,worker-id ,channel) :blockp nil))) + (loop for request = (recv channel :blockp t) + do (ematch request + ((list :process-connection connection) + (process-connection (taskmaster-acceptor taskmaster) connection)) + ((list :shutdown) + (return))) + (dispatcher-send taskmaster `(:worker-ready ,worker-id ,channel) :blockp t))) (defgeneric next-worker-id (taskmaster)) (defmethod next-worker-id ((taskmaster thread-pooling-taskmaster)) @@ -232,7 +228,7 @@ implementations.")) (handler-case* (start-thread taskmaster - (lambda () (initialize-worker-thread taskmaster worker-id channel)) + (lambda () (run-worker-thread taskmaster worker-id channel)) :name (format nil (taskmaster-worker-thread-name-format taskmaster) worker-id)) (error (condition) (let ((*acceptor* (taskmaster-acceptor taskmaster))) @@ -240,9 +236,9 @@ implementations.")) "Error while creating worker thread: ~A" condition)))) (values worker-id channel))) -(defgeneric dispatch-work (taskmaster)) +(defgeneric run-dispatcher-thread (taskmaster)) -(defmethod dispatch-work ((taskmaster thread-pooling-taskmaster)) +(defmethod run-dispatcher-thread ((taskmaster thread-pooling-taskmaster)) ;; Here's the idea, with the stipulations given in THREAD-POOLING-TASKMASTER ;; - If MAX-THREAD-COUNT is null, just start a taskmaster ;; - If the connection count will exceed MAX-ACCEPT-COUNT or if MAX-ACCEPT-COUNT @@ -266,19 +262,15 @@ implementations.")) (assert (eq taskmaster-status :running)) (assert (eq dispatcher-process (bt:current-thread)))) (loop - (loop - :for wait = t :then nil - :for request = (recv taskmaster :blockp wait) - :while request - :do (ematch request - ((list :worker-ready worker-id chan) - (mark-worker-ready taskmaster worker-id chan)) - ((list :process-connection connection) - (enqueue pending-connections connection) - (incf accept-count)) - ((list :shutdown) - ;; TODO: do something to notify the acceptor to that it should die? - (setf taskmaster-status :stopping)))) + (ematch (dispatcher-recv taskmaster :blockp t) + ((list :worker-ready worker-id chan) + (mark-worker-ready taskmaster worker-id chan)) + ((list :process-connection connection) + (enqueue pending-connections connection) + (incf accept-count)) + ((list :shutdown) + ;; NB: hunchentoot is supposed to stop the acceptor before the taskmaster + (setf taskmaster-status :stopping))) (ecase taskmaster-status (:stopping (dolist (worker (dequeue-all available-workers))