;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*- ;;;; Trivial Job Management (in-package :philip-jose) #| Advice from #lisp: maybe use cells or computed-class -- but not really http://common-lisp.net/project/cells/ http://common-lisp.net/project/computed-class/ lemonodor: Jim Firby's RAP (not free software, but a good read) http://people.cs.uchicago.edu/~firby/raps/index.shtml Proper interface: a little language to dynamically build a dependency graph. A node remembers those existing nodes that still need to be completed before that node may be undertaken. When a node is completed, its dependencies are propagated, which may themselves generate more jobs... And the language is Lisp, thanks to CALL/CC! |# ;;; Global lock to protect state (not used at this time) #| (defparameter *tracker-lock* (make-lock) "global lock for multithreaded servers") (defmacro with-tracker-lock (() &body body) `(with-lock-held (*tracker-lock*) ,@body)) |# ;;; Workers have an id, a known status (defvar *current-worker* nil "the worker making current request") (defparameter *registered-workers* (make-hash-table :test 'equal) "workers that have registered a job") (defun register-worker (id &optional (status :working)) (Setf (gethash id *registered-workers*) status)) (defun unregister-worker (id) (remhash id *registered-workers*)) ;;; Your every worker job (defclass worker-job (simple-print-object-mixin) ((id :accessor job-id :initarg :id :documentation "nonce to identify the job") (description :accessor job-description :initarg :description :documentation "description of the job, a SEXP to be transmitted to the worker") (status :accessor job-status :initarg :status :initform nil :documentation "Status of the job, NIL if to be dispatched, T if done, a worker ID if being done") (validator :accessor job-validator :initarg :validator :initform t :documentation "thunk to validate whether a job is still needed by the time a worker is ready to claim it.") (on-claim :accessor job-on-claim :initarg :on-claim :initform nil :documentation "hook thunk to call when a worker claims the job") (on-completion :accessor job-on-completion :initarg :on-completion :documentation "continuation to call when a job returns"))) (defparameter *worker-job-counter* 0) (defun make-worker-job-id () `(,@*id* :worker-job ,(incf *worker-job-counter*))) (defvar *current-worker-job* nil "the worker job currently examined") ;;; Worker jobs can be scheduled or claimed (defparameter *claimed-worker-jobs* (make-hash-table :test 'equal) "jobs claimed by some worker") (defun claim-job (job &optional (worker *current-worker*)) (setf (gethash (job-id job) *claimed-worker-jobs*) job) (setf (job-status job) (list worker (get-real-time))) (notify-claimed-job job)) (defun notify-claimed-job (job) (funkall (job-on-claim job))) (defun notify-completed-job (job &rest results) (setf (job-status job) :completed) (apply #'funkall (job-on-completion job) results)) (defun unclaim-job (job) (setf (job-status job) '(:unclaimed)) (remhash (job-id job) *claimed-worker-jobs*) nil) (defparameter *scheduled-worker-jobs* (make-fifo) "jobs that need to be executed by some worker") (defun format-job (stream job) "Format only the interesting slots of job to stream." (format stream "#" (job-id job) (job-description job))) (defun enqueue-worker-job (job) (logger "~&Enqueueing ~A" (format-job nil job)) (fifo-enqueue job *scheduled-worker-jobs*) job) (defun issue-worker-job (description &rest keys &key validator on-claim on-completion) (declare (ignore validator on-claim on-completion)) (enqueue-worker-job (apply #'make-instance 'worker-job :id (make-worker-job-id) :description description keys))) (defun get-next-worker-job () (loop until (fifo-empty-p *scheduled-worker-jobs*) do (let ((job (fifo-dequeue *scheduled-worker-jobs*))) (when (validate-job job) (logger "~&Issuing ~A" (job-id job)) (return job))))) ;;; Global defaults when looking for a job (defun get-next-worker-job-specification () (if-bind job (get-next-worker-job) (progn (claim-job job) (job-specification job)) (no-job-to-do))) (defun job-specification (job) `(:job :id ,(job-id job) :description ,(job-description job))) (defparameter *all-done* nil "when everything is said and done, we'll tell each worker to die in peace") (defun no-job-to-do () (if *all-done* (suicide-job) (sleep-job))) (defun suicide-job (&optional (worker *current-worker*)) (unregister-worker worker) '(:die)) (defun sleep-job () `(:sleep ,*sleep-delay*)) (defgeneric validate-job (job) (:method ((x t)) t)) (defmethod validate-job ((job worker-job)) (funkall (or (job-validator job) t))) (defun hand-job-over-to-worker (job &optional (worker *current-worker*)) (with-accessors ((id job-id) (desc job-description) (on-claim job-on-claim) (on-completion job-on-completion)) job (register-worker worker id) (claim-job id worker))) (defun-request-handler worker-request (id &rest keys &key completed results error) (logger "~&Got worker-request from~{ ~A~}, with keys ~S" id keys) (when error (logger "~&Error from worker~{ ~A~}: ~A" id error)) (let ((*current-worker* id)) (when completed (job-done completed id results)) (reply (get-next-worker-job-specification)))) (defun job-done (job-id worker results) (if-bind job (gethash job-id *claimed-worker-jobs*) (destructuring-bind (claimant start-time) (job-status job) (if (equal worker claimant) (logger "~&Job ~D completed by worker ~S in ~D seconds" (job-id job) worker (- (get-real-time) start-time)) (logger "~&Worker ~S announced he completed job ~A, originally claimed by ~S" worker (format-job nil job) claimant)) (unclaim-job job) (register-worker worker :idle) (apply #'notify-completed-job job results)) (logger "~&Worker ~S announced completion of job ~A, that doesn't exist (anymore?)" worker (format-job nil job)))) (defun-request-handler show-job-status () (reply* :all-done *all-done* :claimed-worker-jobs (hash-table->alist *claimed-worker-jobs*))) (defun notify-worker-timeout (worker &aux (pity nil)) ;; In this Trotskyist/Guevarist approach to management, we simply kill those ;; counter-revolutionary saboteurs who are late at delivering the results ;; expected by the manager. Oh, of course, being good socialists, we never ;; kill any worker. Mind you, the miserable insect is unregistered first, so ;; that not being a worker anymore, it is killed without spilling sacred ;; proletarian blood. Insects are worthless, and multiply anyway. (unregister-worker worker) (destructuring-bind (insect without rights) worker (declare (ignore rights)) (kill-machine-process insect without) pity (spawn-client insect))) ;;; Advanced Control Flow (exporting-definitions (defvar *retry-on-timeout* nil) (defun/cc issue-worker-job/cc (description &key validator on-claim on-completion retry-on-timeout) "Issue one worker job" (if retry-on-timeout (loop for attempts from 1 do (let ((guard t) (worker nil)) (multiple-value-bind (successp &optional results) (with-local-task-competition (c) (issue-worker-job description :validator (lambda () (and guard (funkall (or validator t)))) :on-claim (lambda () (funkall (prog1 on-claim (setf on-claim nil worker *current-worker*) (local-task-competitor (c) (sleep/cc retry-on-timeout) nil)))) :on-completion (lambda (&rest results) (setf worker nil) (maybe-win-local-task-competition c t results)))) ;;(DBG :gah guard successp results attempts) (setf guard nil) (if successp (return (apply on-completion results)) (when worker (logger "~&Timed out:~{ ~A~}" worker) (notify-worker-timeout worker)))))) (issue-worker-job description :validator validator :on-claim on-claim :on-completion on-completion))) (defun/cc issue-sequential-job (description &key retry-on-timeout validator on-claim on-completion) "Issue a job. When the job is completed, return the values returned by the worker." (let/cc k (issue-worker-job/cc description :retry-on-timeout retry-on-timeout :validator validator :on-claim on-claim :on-completion (if on-completion (lambda (&rest results) (apply #'funkall on-completion results) (apply #'kall k results)) k)))) (defun/cc issue-sequential-job-with-timeout (description &key timeout retry-on-timeout validator on-claim on-completion) "Issue a job. If the job is completed before the timeout, return T and the values returned by the worker, otherwise, return NIL" (let ((guard t) (worker nil)) (multiple-value-bind (successp &optional results) (with-local-timeout (timeout) (let/cc k (issue-worker-job/cc description :retry-on-timeout retry-on-timeout :validator (lambda () (and guard (funkall (or validator t)))) :on-claim (lambda () (setf worker *current-worker*) (funkall on-claim)) :on-completion (lambda (&rest results) (setf worker nil) (apply #'funkall on-completion results) (kall k t results))))) (setf guard nil) (when worker (notify-worker-timeout worker)) (apply #'values successp results)))) ;;; Jobs to be run in parallel of each other. (defun/cc default-reducer (accumulator primary-value &rest other-values) (declare (ignore other-values)) (cons primary-value accumulator)) (defun/cc call-with-parallel-job-issuer (thunk &key (reducer #'default-reducer) (initial-value nil)) (let/cc k (let ((remaining-jobs 0) (accumulator initial-value) (thunk-done nil)) (labels ((maybe-exit () (when (and thunk-done (zerop remaining-jobs)) (kall k accumulator))) (one-less (values) (setf accumulator (apply reducer accumulator values)) (decf remaining-jobs) (maybe-exit)) (one-more (description &key retry-on-timeout on-claim on-completion) ;; should we error out when thunk-done is true, ;; or is it OK for jobs or escaped continuations ;; to cause more jobs to be queued? (let/cc k (incf remaining-jobs) (issue-worker-job/cc description :retry-on-timeout retry-on-timeout :on-claim (lambda () (funkall on-claim) (kall k)) :on-completion (lambda (&rest values) (apply #'funkall on-completion values) (one-less values)))))) (funcall thunk #'one-more) (setf thunk-done t) (maybe-exit))))) (defparameter *parallel-job-issuer* nil) (defun/cc issue-parallel-job (description &key retry-on-timeout on-claim on-completion (issuer *parallel-job-issuer*)) (unless issuer (error "No parallel job issuer for job ~S" description)) (funcall issuer description :retry-on-timeout retry-on-timeout :on-claim on-claim :on-completion on-completion)) (defmacro with-parallel-jobs ((issuer &rest keys &key reducer initial-value) &body body) "BODY will be executed where ISSUER is lexically bound and fbound (or if it is NIL, *PARALLEL-JOB-ISSUER* dynamically bound) to a function that will issue jobs to be run in parallel then pause until the job is claimed by a worker. When a worker returns, the REDUCER is applied to the accumulator and the values provided by the worker, where the accumulator is initialized with INITIAL-VALUE. When the body is fully evaluated and all the jobs issued so far have returned, then the value of the accumulator is returned. The default INITIAL-VALUE is NIL, and the default REDUCER is DEFAULT-REDUCER, which conses the primary value returned by each job into a list ordered by last-returned first. If you just want to drop those values, use (CONSTANTLY NIL) as the REDUCER." (declare (ignore reducer initial-value)) (with-gensyms (args) `(call-with-parallel-job-issuer (lambda (,(or issuer '*parallel-job-issuer*)) (labels (,@(when issuer `(,issuer (&rest ,args) (apply ,issuer ,args)))) ,@body)) ,@keys))) )