;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
;;;; Trivial Job Management
(in-package :philip-jose)

Advice from #lisp: maybe use cells or computed-class -- but not really
lemonodor: Jim Firby's RAP (not free software, but a good read)

Proper interface:
a little language to dynamically build a dependency graph.
A node remembers those existing nodes that still need to be completed
before that node may be undertaken.
When a node is completed, its dependencies are propagated,
which may themselves generate more jobs...

And the language is Lisp, thanks to CALL/CC!


;;; Global lock to protect state (not used at this time)
(defparameter *tracker-lock* (make-lock)
  "global lock for multithreaded servers")

(defmacro with-tracker-lock (() &body body)
  `(with-lock-held (*tracker-lock*)

;;; Workers have an id, a known status

(defvar *current-worker*
  "the worker making current request")

(defparameter *registered-workers*
  (make-hash-table :test 'equal)
  "workers that have registered a job")

(defun register-worker (id &optional (status :working))
  (Setf (gethash id *registered-workers*) status))

(defun unregister-worker (id)
  (remhash id *registered-workers*))

;;; Your every worker job

(defclass worker-job (simple-print-object-mixin)
  ((id :accessor job-id :initarg :id :documentation
       "nonce to identify the job")
   (description :accessor job-description :initarg :description :documentation
                "description of the job, a SEXP to be transmitted to the worker")
   (status :accessor job-status :initarg :status :initform nil :documentation
           "Status of the job, NIL if to be dispatched, T if done, a worker ID if being done")
   (validator :accessor job-validator :initarg :validator :initform t :documentation
              "thunk to validate whether a job is still needed by the time a worker is ready to claim it.")
   (on-claim :accessor job-on-claim :initarg :on-claim :initform nil :documentation
             "hook thunk to call when a worker claims the job")
   (on-completion :accessor job-on-completion :initarg :on-completion :documentation
                  "continuation to call when a job returns")))

(defparameter *worker-job-counter* 0)

(defun make-worker-job-id ()
  `(,@*id* :worker-job ,(incf *worker-job-counter*)))

(defvar *current-worker-job*
  "the worker job currently examined")

;;; Worker jobs can be scheduled or claimed

(defparameter *claimed-worker-jobs*
  (make-hash-table :test 'equal)
  "jobs claimed by some worker")

(defun claim-job (job &optional (worker *current-worker*))
  (setf (gethash (job-id job) *claimed-worker-jobs*) job)
  (setf (job-status job) (list worker (get-real-time)))
  (notify-claimed-job job))

(defun notify-claimed-job (job)
  (funkall (job-on-claim job)))

(defun notify-completed-job (job &rest results)
  (setf (job-status job) :completed)
  (apply #'funkall (job-on-completion job) results))

(defun unclaim-job (job)
  (setf (job-status job) '(:unclaimed))
  (remhash (job-id job) *claimed-worker-jobs*)

(defparameter *scheduled-worker-jobs*
  "jobs that need to be executed by some worker")

(defun format-job (stream job)
  "Format only the interesting slots of job to stream."
  (format stream "#<job ~d ~A>"
          (job-id job)
          (job-description job)))

(defun enqueue-worker-job (job)
  (logger "~&Enqueueing ~A" (format-job nil job))
  (fifo-enqueue job *scheduled-worker-jobs*)

(defun issue-worker-job (description &rest keys
                        &key validator on-claim on-completion)
  (declare (ignore validator on-claim on-completion))
   (apply #'make-instance 'worker-job
          :id (make-worker-job-id)
          :description description

(defun get-next-worker-job ()
  (loop until (fifo-empty-p *scheduled-worker-jobs*)
        do (let ((job (fifo-dequeue *scheduled-worker-jobs*)))
             (when (validate-job job)
               (logger "~&Issuing ~A" (job-id job))
               (return job)))))

;;; Global defaults when looking for a job

(defun get-next-worker-job-specification ()
  (if-bind job (get-next-worker-job)
      (claim-job job)
      (job-specification job))

(defun job-specification (job)
  `(:job :id ,(job-id job) :description ,(job-description job)))

(defparameter *all-done*
  "when everything is said and done, we'll tell each worker to die in peace")

(defun no-job-to-do ()
  (if *all-done*

(defun suicide-job (&optional (worker *current-worker*))
  (unregister-worker worker)

(defun sleep-job ()
  `(:sleep ,*sleep-delay*))

(defgeneric validate-job (job)
  (:method ((x t))

(defmethod validate-job ((job worker-job))
  (funkall (or (job-validator job) t)))

(defun hand-job-over-to-worker (job &optional (worker *current-worker*))
  (with-accessors ((id job-id) (desc job-description)
                   (on-claim job-on-claim) (on-completion job-on-completion))
    (register-worker worker id)
    (claim-job id worker)))

(defun-request-handler worker-request (id &rest keys &key completed results error)
  (logger "~&Got worker-request from~{ ~A~}, with keys ~S" id keys)
  (when error
    (logger "~&Error from worker~{ ~A~}: ~A" id error))
  (let ((*current-worker* id))
    (when completed
      (job-done completed id results))
    (reply (get-next-worker-job-specification))))

(defun job-done (job-id worker results)
  (if-bind job (gethash job-id *claimed-worker-jobs*)
    (destructuring-bind (claimant start-time) (job-status job)
      (if (equal worker claimant)
          (logger "~&Job ~D completed by worker ~S in ~D seconds"
                  (job-id job) worker
                  (- (get-real-time) start-time))
          (logger "~&Worker ~S announced he completed job ~A, originally claimed by ~S"
                  worker (format-job nil job) claimant))
      (unclaim-job job)
      (register-worker worker :idle)
      (apply #'notify-completed-job job results))
    (logger "~&Worker ~S announced completion of job ~A, that doesn't exist (anymore?)"
            worker (format-job nil job))))

(defun-request-handler show-job-status ()
  (reply* :all-done *all-done* :claimed-worker-jobs (hash-table->alist *claimed-worker-jobs*)))

(defun notify-worker-timeout (worker &aux (pity nil))
  ;; In this Trotskyist/Guevarist approach to management, we simply kill those
  ;; counter-revolutionary saboteurs who are late at delivering the results
  ;; expected by the manager. Oh, of course, being good socialists, we never
  ;; kill any worker. Mind you, the miserable insect is unregistered first, so
  ;; that not being a worker anymore, it is killed without spilling sacred
  ;; proletarian blood. Insects are worthless, and multiply anyway.
  (unregister-worker worker)
  (destructuring-bind (insect without rights) worker
    (declare (ignore rights))
    (kill-machine-process insect without) pity
    (spawn-client insect)))

;;; Advanced Control Flow


(defvar *retry-on-timeout* nil)

(defun/cc issue-worker-job/cc (description &key validator on-claim on-completion
  "Issue one worker job"
  (if retry-on-timeout
        for attempts from 1 do
        (let ((guard t) (worker nil))
          (multiple-value-bind (successp &optional results)
              (with-local-task-competition (c)
                   :validator (lambda ()
                                (and guard (funkall (or validator t))))
                   :on-claim (lambda ()
                                (prog1 on-claim
                                  (setf on-claim nil
                                        worker *current-worker*)
                                  (local-task-competitor (c) (sleep/cc retry-on-timeout) nil))))
                   :on-completion (lambda (&rest results)
                                    (setf worker nil)
                                    (maybe-win-local-task-competition c t results))))
            ;;(DBG :gah guard successp results attempts)
            (setf guard nil)
            (if successp
                (return (apply on-completion results))
                (when worker
                  (logger "~&Timed out:~{ ~A~}" worker)
                  (notify-worker-timeout worker))))))
      (issue-worker-job description
                        :validator validator
                        :on-claim on-claim
                        :on-completion on-completion)))

(defun/cc issue-sequential-job (description &key retry-on-timeout validator on-claim on-completion)
  "Issue a job. When the job is completed, return the values returned by the worker."
  (let/cc k
     :retry-on-timeout retry-on-timeout
     :validator validator
     :on-claim on-claim
     :on-completion (if on-completion
                        (lambda (&rest results)
                          (apply #'funkall on-completion results)
                          (apply #'kall k results))

(defun/cc issue-sequential-job-with-timeout
    (description &key timeout retry-on-timeout validator on-claim on-completion)
  "Issue a job. If the job is completed before the timeout, return T and the values returned by the worker,
otherwise, return NIL"
  (let ((guard t)
        (worker nil))
    (multiple-value-bind (successp &optional results)
        (with-local-timeout (timeout)
          (let/cc k
             :retry-on-timeout retry-on-timeout
             :validator (lambda () (and guard (funkall (or validator t))))
             :on-claim (lambda () (setf worker *current-worker*) (funkall on-claim))
             :on-completion (lambda (&rest results)
                              (setf worker nil)
                              (apply #'funkall on-completion results)
                              (kall k t results)))))
      (setf guard nil)
      (when worker
        (notify-worker-timeout worker))
      (apply #'values successp results))))

;;; Jobs to be run in parallel of each other.

(defun/cc default-reducer (accumulator primary-value &rest other-values)
  (declare (ignore other-values))
  (cons primary-value accumulator))

(defun/cc call-with-parallel-job-issuer (thunk &key
                                               (reducer #'default-reducer)
                                               (initial-value nil))
  (let/cc k
    (let ((remaining-jobs 0)
          (accumulator initial-value)
          (thunk-done nil))
      (labels ((maybe-exit ()
                 (when (and thunk-done (zerop remaining-jobs))
                   (kall k accumulator)))
               (one-less (values)
                 (setf accumulator (apply reducer accumulator values))
                 (decf remaining-jobs)
               (one-more (description &key retry-on-timeout on-claim on-completion)
                 ;; should we error out when thunk-done is true,
                 ;; or is it OK for jobs or escaped continuations
                 ;; to cause more jobs to be queued?
                 (let/cc k
                   (incf remaining-jobs)
                    :retry-on-timeout retry-on-timeout
                    :on-claim (lambda ()
                                (funkall on-claim)
                                (kall k))
                    :on-completion (lambda (&rest values)
                                     (apply #'funkall on-completion values)
                                     (one-less values))))))
        (funcall thunk #'one-more)
        (setf thunk-done t)

(defparameter *parallel-job-issuer* nil)

(defun/cc issue-parallel-job (description
                              &key retry-on-timeout on-claim on-completion
                              (issuer *parallel-job-issuer*))
  (unless issuer
    (error "No parallel job issuer for job ~S" description))
  (funcall issuer description
           :retry-on-timeout retry-on-timeout :on-claim on-claim :on-completion on-completion))

(defmacro with-parallel-jobs ((issuer &rest keys &key reducer initial-value) &body body)
  "BODY will be executed where ISSUER is lexically bound and fbound
(or if it is NIL, *PARALLEL-JOB-ISSUER* dynamically bound) to a function that will
issue jobs to be run in parallel then pause until the job is claimed by a worker.

When a worker returns, the REDUCER is applied to the accumulator and the values
provided by the worker, where the accumulator is initialized with INITIAL-VALUE.
When the body is fully evaluated and all the jobs issued so far have returned,
then the value of the accumulator is returned.

The default INITIAL-VALUE is NIL, and the default REDUCER is DEFAULT-REDUCER, which
conses the primary value returned by each job into a list ordered by last-returned first.
If you just want to drop those values, use (CONSTANTLY NIL) as the REDUCER."
  (declare (ignore reducer initial-value))
  (with-gensyms (args)
      (lambda (,(or issuer '*parallel-job-issuer*))
        (labels (,@(when issuer
                    `(,issuer (&rest ,args)
                      (apply ,issuer ,args))))