Some cleanups after code review by Sergey Vasilyev
authorFrancois-Rene Rideau <tunes@google.com>
Sat, 30 Mar 2013 03:14:35 +0000 (23:14 -0400)
committerFrancois-Rene Rideau <tunes@google.com>
Sat, 30 Mar 2013 03:14:35 +0000 (23:14 -0400)
thread-pooling.lisp

index b191b33..684b394 100644 (file)
@@ -13,6 +13,8 @@
 ;;
 (defclass thread-pooling-taskmaster (multi-threaded-taskmaster)
   ((master-lock
+    ;; controls status, acceptor-process, and
+    ;; all slots that are valid while there is or may be no dispatcher-process to process messages.
     :initform (bt:make-lock "taskmaster-master")
     :reader taskmaster-master-lock
     :documentation
     :initform :stopped
     :documentation
     "The status of the taskmaster: :STOPPED, :RUNNING, :STOPPING.")
-   (acceptor-process ;; must hold the master lock to change
-    :accessor acceptor-process
-    :documentation
-    "A process that accepts incoming connections and hands them off to new processes
-     for request handling.")
    (dispatcher-process ;; must hold the master lock to change
     :accessor dispatcher-process
     :documentation
@@ -56,7 +53,7 @@
     :type (or integer null)
     :initarg :max-thread-count
     :initform nil
-    :accessor taskmaster-max-thread-count
+    :reader taskmaster-max-thread-count
     :documentation 
     "The maximum number of request threads this taskmaster will simultaneously
      run before refusing or queueing new connections requests.  If the value
@@ -71,7 +68,7 @@
     :type (or integer boolean)
     :initarg :max-accept-count
     :initform nil
-    :accessor taskmaster-max-accept-count
+    :reader taskmaster-max-accept-count
     :documentation
     "The maximum number of connections this taskmaster will accept
      before refusing new connections.  If supplied and an integer,
   (:default-initargs
    :max-thread-count *default-max-thread-count*
    :max-accept-count *default-max-accept-count*)
-  (:documentation "A taskmaster that starts one thread for listening
-to incoming requests and one new thread for each incoming connection.
+  (:documentation "A taskmaster that maintains a pool of worker threads
+and a queue of accepted connections to be processed.
 
-If MAX-THREAD-COUNT is null, a new thread will always be created for
-each request.
+If MAX-THREAD-COUNT is null, a new thread will always be created
+when all existing workers are busy.
 
-If MAX-THREAD-COUNT is supplied, the number of request threads is
+If MAX-THREAD-COUNT is supplied, the number of worker threads is
 limited to that.  Furthermore, if MAX-ACCEPT-COUNT is not supplied, an
 HTTP 503 will be sent if the thread limit is exceeded.  Otherwise, if
 MAX-ACCEPT-COUNT is supplied, it must be greater than MAX-THREAD-COUNT;
@@ -122,10 +119,7 @@ In an environment with a single Hunchentoot server, it's reasonable
 to provide both MAX-THREAD-COUNT and a somewhat larger value for
 MAX-ACCEPT-COUNT.  This will cause a server that's almost out of
 resources to wait a bit; if the server is completely out of resources,
-then the reply will be HTTP 503.
-
-This is the default taskmaster implementation for multi-threaded Lisp
-implementations."))
+then the reply will be HTTP 503."))
 
 (defmethod initialize-instance :after ((taskmaster thread-pooling-taskmaster) &rest init-args)
   "Ensure the if MAX-ACCEPT-COUNT is supplied, that it is greater than MAX-THREAD-COUNT."
@@ -142,21 +136,23 @@ implementations."))
 
 (defmethod hunchentoot::decrement-taskmaster-accept-count ((taskmaster thread-pooling-taskmaster))
   ;; Compatibility function so we can reuse hunchentoot:send-service-unavailable-reply
-  (declare (ignorable taskmaster))
   (values))
 
 (defmethod shutdown ((taskmaster thread-pooling-taskmaster))
   ;; just wait until all workers are done, send them all a shutdown message, then die.
   (bt:with-lock-held ((taskmaster-master-lock taskmaster))
-    (when (eq (taskmaster-status taskmaster) :running)
-      (send taskmaster `(:shutdown) :blockp nil))))
+    (ecase (taskmaster-status taskmaster)
+      ((:stopped :stopping)) ;; no active dispatcher to receive a message
+      ((:running) (dispatcher-send taskmaster `(:shutdown) :blockp t)))))
 
 ;; NB: by using the send and recv gf's, we provide a specialization point.
-(defmethod send ((taskmaster thread-pooling-taskmaster) message &rest keys &key &allow-other-keys)
-  (apply 'send (dispatcher-channel taskmaster) message keys))
+(defgeneric dispatcher-send (taskmaster message &key &allow-other-keys)
+  (:method ((taskmaster thread-pooling-taskmaster) message &rest keys &key &allow-other-keys)
+    (apply 'send (dispatcher-channel taskmaster) message keys)))
 
-(defmethod recv ((taskmaster thread-pooling-taskmaster) &rest keys &key &allow-other-keys)
-  (apply 'recv (dispatcher-channel taskmaster) keys))
+(defgeneric dispatcher-recv (taskmaster &key &allow-other-keys)
+  (:method ((taskmaster thread-pooling-taskmaster) &rest keys &key &allow-other-keys)
+    (apply 'recv (dispatcher-channel taskmaster) keys)))
 
 (defmethod execute-acceptor ((taskmaster thread-pooling-taskmaster))
   (bt:with-lock-held ((taskmaster-master-lock taskmaster))
@@ -172,13 +168,13 @@ implementations."))
       (setf (dispatcher-process taskmaster)
             (start-thread
              taskmaster
-             (lambda () (dispatch-work taskmaster))
+             (lambda () (run-dispatcher-thread taskmaster))
              :name (format nil "hunchentoot-dispatcher-~A:~A"
                            (or (acceptor-address (taskmaster-acceptor taskmaster)) "*")
                            (acceptor-port (taskmaster-acceptor taskmaster))))))))
 
 (defmethod handle-incoming-connection ((taskmaster thread-pooling-taskmaster) connection)
-  (send taskmaster `(:process-connection ,connection)))
+  (dispatcher-send taskmaster `(:process-connection ,connection)))
 
 (defun mark-worker-ready (taskmaster worker-id chan)
   ;; POST: the worker has been removed from the busy-workers and pushed onto the available-workers
@@ -196,23 +192,23 @@ implementations."))
 
 (defun get-worker-busy-on-connection (taskmaster worker-id channel connection)
   ;; POST: the worker is added to the busy-workers
-  (send channel `(:process-connection ,connection) :blockp nil)
+  (send channel `(:process-connection ,connection) :blockp t)
   (mark-worker-busy taskmaster worker-id connection channel))
 
 (defmethod too-many-taskmaster-requests ((taskmaster thread-pooling-taskmaster) connection)
   (acceptor-log-message (taskmaster-acceptor taskmaster)
                         :warning "Can't handle a new request, too many request threads already"))
 
-(defgeneric initialize-worker-thread (taskmaster worker-id channel))
-(defmethod initialize-worker-thread ((taskmaster thread-pooling-taskmaster) worker-id channel)
+(defgeneric run-worker-thread (taskmaster worker-id channel))
+(defmethod run-worker-thread ((taskmaster thread-pooling-taskmaster) worker-id channel)
   ;; NB: define :before methods to initialize any application-specific worker thread state
-  (loop :for request = (recv channel :blockp t)
-        :do (ematch request
-              ((list :process-connection connection)
-               (process-connection (taskmaster-acceptor taskmaster) connection))
-              ((list :shutdown)
-               (return)))
-            (send taskmaster `(:worker-ready ,worker-id ,channel) :blockp nil)))
+  (loop for request = (recv channel :blockp t)
+        do (ematch request
+             ((list :process-connection connection)
+              (process-connection (taskmaster-acceptor taskmaster) connection))
+             ((list :shutdown)
+              (return)))
+           (dispatcher-send taskmaster `(:worker-ready ,worker-id ,channel) :blockp t)))
 
 (defgeneric next-worker-id (taskmaster))
 (defmethod next-worker-id ((taskmaster thread-pooling-taskmaster))
@@ -232,7 +228,7 @@ implementations."))
     (handler-case*
      (start-thread
       taskmaster
-      (lambda () (initialize-worker-thread taskmaster worker-id channel))
+      (lambda () (run-worker-thread taskmaster worker-id channel))
       :name (format nil (taskmaster-worker-thread-name-format taskmaster) worker-id))
      (error (condition)
        (let ((*acceptor* (taskmaster-acceptor taskmaster)))
@@ -240,9 +236,9 @@ implementations."))
                        "Error while creating worker thread: ~A" condition))))
     (values worker-id channel)))
 
-(defgeneric dispatch-work (taskmaster))
+(defgeneric run-dispatcher-thread (taskmaster))
 
-(defmethod dispatch-work ((taskmaster thread-pooling-taskmaster))
+(defmethod run-dispatcher-thread ((taskmaster thread-pooling-taskmaster))
   ;; Here's the idea, with the stipulations given in THREAD-POOLING-TASKMASTER
   ;;  - If MAX-THREAD-COUNT is null, just start a taskmaster
   ;;  - If the connection count will exceed MAX-ACCEPT-COUNT or if MAX-ACCEPT-COUNT
@@ -266,19 +262,15 @@ implementations."))
       (assert (eq taskmaster-status :running))
       (assert (eq dispatcher-process (bt:current-thread))))
     (loop
-      (loop
-        :for wait = t :then nil
-        :for request = (recv taskmaster :blockp wait)
-        :while request
-        :do (ematch request
-              ((list :worker-ready worker-id chan)
-               (mark-worker-ready taskmaster worker-id chan))
-              ((list :process-connection connection)
-               (enqueue pending-connections connection)
-               (incf accept-count))
-              ((list :shutdown)
-               ;; TODO: do something to notify the acceptor to that it should die?
-               (setf taskmaster-status :stopping))))
+      (ematch (dispatcher-recv taskmaster :blockp t)
+        ((list :worker-ready worker-id chan)
+         (mark-worker-ready taskmaster worker-id chan))
+        ((list :process-connection connection)
+         (enqueue pending-connections connection)
+         (incf accept-count))
+        ((list :shutdown)
+         ;; NB: hunchentoot is supposed to stop the acceptor before the taskmaster
+         (setf taskmaster-status :stopping)))
       (ecase taskmaster-status
         (:stopping
          (dolist (worker (dequeue-all available-workers))