Cleanup worker initialization protocol. Add defgeneric's. Package tweak.
authorFrancois-Rene Rideau <tunes@google.com>
Sun, 17 Mar 2013 20:59:14 +0000 (16:59 -0400)
committerFrancois-Rene Rideau <tunes@google.com>
Sun, 17 Mar 2013 20:59:45 +0000 (16:59 -0400)
pkgdcl.lisp
thread-pooling.lisp

index 380c258..febff80 100644 (file)
@@ -1,8 +1,8 @@
 (asdf/package:define-package quux-hunchentoot
   (:use :common-lisp
    :hunchentoot
-   :bordeaux-threads
    :alexandria
+   :chanl ;; for exchanging messages
    :optima ;; for matching inter-thread messages
    :classy) ;; for its >simple-fifo-queue<
   (:import-from
index 33ee796..f26ebce 100644 (file)
@@ -2,7 +2,7 @@
 
 (defun make-channel ()
   "Create a communication channel between threads in our pool"
-  (make-instance 'chanl:unbounded-channel))
+  (make-instance 'unbounded-channel))
 
 ;; This taskmaster takes threads out of a worker thread pool.
 ;; Workers have to register themselves whenever they are ready
@@ -13,7 +13,7 @@
 ;;
 (defclass thread-pooling-taskmaster (taskmaster)
   ((master-lock
-    :initform (make-lock "taskmaster-master")
+    :initform (bt:make-lock "taskmaster-master")
     :reader taskmaster-master-lock
     :documentation
     "Thread-unsafe operations without a clear owner use this lock")
@@ -136,16 +136,24 @@ implementations."))
 
 (defmethod shutdown ((taskmaster thread-pooling-taskmaster))
   ;; just wait until all workers are done, send them all a shutdown message, then die.
-  (with-lock-held ((taskmaster-master-lock taskmaster))
+  (bt:with-lock-held ((taskmaster-master-lock taskmaster))
     (when (eq (taskmaster-status taskmaster) :running)
-      (chanl:send (dispatcher-channel taskmaster) `(:shutdown) :blockp nil))))
+      (send taskmaster `(:shutdown) :blockp nil))))
+
+(defgeneric start-thread (context thunk &key))
 
 (defmethod start-thread ((taskmaster thread-pooling-taskmaster) thunk &key name)
   (declare (ignorable taskmaster))
   (bt:make-thread thunk :name name))
 
+(defmethod send ((taskmaster thread-pooling-taskmaster) message &rest keys &key &allow-other-keys)
+  (apply 'send (dispatcher-channel taskmaster) message keys))
+
+(defmethod recv ((taskmaster thread-pooling-taskmaster) &rest keys &key &allow-other-keys)
+  (apply 'recv (dispatcher-channel taskmaster) keys))
+
 (defmethod execute-acceptor ((taskmaster thread-pooling-taskmaster))
-  (with-lock-held ((taskmaster-master-lock taskmaster))
+  (bt:with-lock-held ((taskmaster-master-lock taskmaster))
     (when (eq (taskmaster-status taskmaster) :stopped)
       (setf (taskmaster-status taskmaster) :running)
       (setf (acceptor-process taskmaster)
@@ -164,7 +172,7 @@ implementations."))
                            (acceptor-port (taskmaster-acceptor taskmaster))))))))
 
 (defmethod handle-incoming-connection ((taskmaster thread-pooling-taskmaster) connection)
-  (chanl:send (dispatcher-channel taskmaster) `(:process-connection ,connection)))
+  (send taskmaster `(:process-connection ,connection)))
 
 (defun mark-worker-ready (taskmaster worker-id chan)
   ;; POST: the worker has been removed from the busy-workers and pushed onto the available-workers
@@ -180,55 +188,53 @@ implementations."))
   (setf (gethash worker-id (taskmaster-busy-workers taskmaster)) (list connection chan))
   (values))
 
-(defun get-worker-busy-on-connection (taskmaster worker-id chan connection)
+(defun get-worker-busy-on-connection (taskmaster worker-id channel connection)
   ;; POST: the worker is added to the busy-workers
-  (chanl:send chan `(:process-connection ,connection) :blockp nil)
-  (mark-worker-busy taskmaster worker-id connection chan))
+  (send channel `(:process-connection ,connection) :blockp nil)
+  (mark-worker-busy taskmaster worker-id connection channel))
 
 (defmethod too-many-taskmaster-requests ((taskmaster thread-pooling-taskmaster) connection)
   (acceptor-log-message (taskmaster-acceptor taskmaster)
                         :warning "Can't handle a new request, too many request threads already"))
 
-(defgeneric initialize-worker-thread (taskmaster worker-id &optional connection))
-(defmethod initialize-worker-thread ((taskmaster thread-pooling-taskmaster) worker-id &optional connection)
+(defgeneric initialize-worker-thread (taskmaster worker-id channel))
+(defmethod initialize-worker-thread ((taskmaster thread-pooling-taskmaster) worker-id channel)
   ;; NB: define :before methods to initialize any application-specific worker thread state
-  (let ((channel (make-channel)))
-    (when connection
-      (process-connection (taskmaster-acceptor taskmaster) connection))
-    (loop :for request = (progn
-                           (chanl:send (dispatcher-channel taskmaster)
-                                       `(:worker-ready ,worker-id ,channel) :blockp nil)
-                           (chanl:recv channel :blockp t))
-          :do (ematch request
-                ((list :process-connection connection)
-                 (process-connection (taskmaster-acceptor taskmaster) connection))
-                ((list :shutdown)
-                 (return))))))
+  (loop :for request = (recv channel :blockp t)
+        :do (ematch request
+              ((list :process-connection connection)
+               (process-connection (taskmaster-acceptor taskmaster) connection))
+              ((list :shutdown)
+               (return)))
+            (send taskmaster `(:worker-ready ,worker-id ,channel) :blockp nil)))
 
 (defgeneric next-worker-id (taskmaster))
 (defmethod next-worker-id ((taskmaster thread-pooling-taskmaster))
   (incf (taskmaster-thread-count taskmaster)))
 
-(defmethod create-worker-thread ((taskmaster thread-pooling-taskmaster) connection)
+(defgeneric create-worker-thread (taskmaster))
+
+(defmethod create-worker-thread ((taskmaster thread-pooling-taskmaster))
   "Create a worker thread for handling requests"
   ;; we are handling all conditions here as we want to make sure that
   ;; the acceptor process never crashes while trying to create a
   ;; worker thread; one such problem exists in
   ;; GET-PEER-ADDRESS-AND-PORT which can signal socket conditions on
   ;; some platforms in certain situations.
-  (let ((worker-id (next-worker-id taskmaster)))
-    (when connection
-      (mark-worker-busy taskmaster worker-id connection nil))
+  (let ((worker-id (next-worker-id taskmaster))
+        (channel (make-channel)))
     (handler-case*
      (start-thread
       taskmaster
-      (lambda () (initialize-worker-thread taskmaster worker-id connection))
+      (lambda () (initialize-worker-thread taskmaster worker-id channel))
       :name (format nil (taskmaster-worker-thread-name-format taskmaster) worker-id))
      (error (condition)
        (let ((*acceptor* (taskmaster-acceptor taskmaster)))
          (log-message* *lisp-errors-log-level*
                        "Error while creating worker thread: ~A" condition))))))
 
+(defgeneric dispatch-work (taskmaster))
+
 (defmethod dispatch-work ((taskmaster thread-pooling-taskmaster))
   ;; Here's the idea, with the stipulations given in THREAD-POOLING-TASKMASTER
   ;;  - If MAX-THREAD-COUNT is null, just start a taskmaster
@@ -241,7 +247,6 @@ implementations."))
   (with-accessors ((master-lock taskmaster-master-lock)
                    (dispatcher-process dispatcher-process)
                    (taskmaster-status taskmaster-status)
-                   (channel dispatcher-channel)
                    (pending-connections taskmaster-pending-connections)
                    (available-workers taskmaster-available-workers)
                    (busy-workers taskmaster-busy-workers)
@@ -250,13 +255,13 @@ implementations."))
                    (max-accept-count taskmaster-max-accept-count)
                    (max-thread-count taskmaster-max-thread-count))
       taskmaster
-    (with-lock-held (master-lock)
+    (bt:with-lock-held (master-lock)
       (assert (eq taskmaster-status :running))
       (assert (eq dispatcher-process (bt:current-thread))))
     (loop
       (loop
         :for wait = t :then nil
-        :for request = (chanl:recv channel :blockp wait)
+        :for request = (recv taskmaster :blockp wait)
         :while request
         :do (ematch request
               ((list :worker-ready worker-id chan)
@@ -271,9 +276,9 @@ implementations."))
         (:stopping
          (dolist (worker (dequeue-all available-workers))
            (ematch worker
-             ((list :worker worker-id chan)
+             ((list :worker worker-id channel)
               (declare (ignorable worker-id))
-              (chanl:send chan '(:shutdown) :blockp nil))))
+              (send channel '(:shutdown) :blockp nil))))
          (dolist (connection (dequeue-all pending-connections))
            (too-many-taskmaster-requests taskmaster connection)
            (send-service-unavailable-reply taskmaster connection))
@@ -288,13 +293,16 @@ implementations."))
               (return))
              ;; worker available? give him the job!
              ((not (empty-p available-workers))
-              (let ((connection (dequeue pending-connections)))
-                (ematch (dequeue available-workers)
-                  ((list :worker worker-id chan)
-                   (get-worker-busy-on-connection taskmaster worker-id chan connection)))))
+              (ematch (dequeue available-workers)
+                ((list :worker worker-id channel)
+                 (get-worker-busy-on-connection
+                  taskmaster worker-id channel (dequeue pending-connections)))))
              ;; positions available for more workers? hire a new one for the job!
              ((or (null max-thread-count) (< thread-count max-thread-count))
-              (create-worker-thread taskmaster (dequeue pending-connections)))
+              (multiple-value-bind (worker-id channel)
+                  (create-worker-thread taskmaster)
+                (get-worker-busy-on-connection
+                 taskmaster worker-id channel (dequeue pending-connections))))
              ;; Already trying to handle too many connections? Deny request with 503.
              ((if max-accept-count
                   (> accept-count max-accept-count)