Skip to content
local-tasks.lisp 9.16 KiB
Newer Older
;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
;;;; Manage tasks in the local Lisp image
;;;; Tasks can be scheduled on a queue,
;;;; to be processed either immediately or when a given start time arrives.
(in-package :philip-jose)

;;(declaim (optimize (speed 3) (safety 3) (debug 3)));DEBUG

;; All of this assumes single-threaded execution.
;; If you want concurrent execution, you need to either modify this code and add locking,
;; Or to isolate instances that run this code in separate threads that each have a
;; thread-local binding for each of the special variables used.


;;; Tasks
(defclass local-task ()
  ((callback :accessor task-callback :initarg :callback)
   (name :accessor task-name :initarg :name :initform nil)))

(defclass timed-local-task (local-task)
  ((start-time :accessor task-start-time :initarg :start-time :initform nil)))

(defvar *current-local-task* nil)


;;; Scheduled tasks (to executed ASAP in FIFO order)

(defparameter *scheduled-local-tasks*
  (make-fifo)
  "tasks that need to be executed locally")

(defun enqueue-immediate-local-task (task)
  (fifo-enqueue task *scheduled-local-tasks*))

(defun schedule-immediate-local-task (callback &key name)
  (enqueue-immediate-local-task
   (make-instance 'local-task
     :callback callback :name name))
  nil)

(defun process-local-tasks ()
  (loop until (fifo-empty-p *scheduled-local-tasks*) do
    (let ((*current-local-task* (fifo-dequeue *scheduled-local-tasks*)))
      (funkall (task-callback *current-local-task*)))))


;;; Timed tasks

(defun time-sooner-p (t1 t2)
  (cond
    ((null t2) nil)
    ((null t1) t)
    (t (< t1 t2))))

(defun task-scheduled-sooner-p (task1 task2)
  (time-sooner-p (task-start-time task1) (task-start-time task2)))

(defparameter *timed-local-tasks*
  (make-instance 'binary-heap :lessp #'task-scheduled-sooner-p)
  "tasks to be executed on farmer at a given time. e.g. timeout functions")

(defun schedule-timed-local-task (callback &key start-time name)
  (insert-item!
   *timed-local-tasks*
   (make-instance 'timed-local-task
     :callback callback :start-time start-time :name name))
  nil)

(defun next-timed-task-start-time ()
  (when-bind task
      (unless (container-empty-p *timed-local-tasks*)
        (least-item *timed-local-tasks*))
    (task-start-time task)))

(defun process-timed-task-heap (&optional (time (et:gettimeofday)))
  (loop for next-start-time = (next-timed-task-start-time)
    while (and next-start-time (< next-start-time time)) do
    (enqueue-immediate-local-task (pop-least-item! *timed-local-tasks*))))

(defun schedule-local-task (callback &key start-time name)
  (if start-time
      (schedule-timed-local-task callback :start-time start-time :name name)
      (schedule-immediate-local-task callback :name name))
  nil)


;;; Event-Base

(defun process-events ()
  (if (iomux::event-base-empty-p *event-base*)
      (process-no-events)
      (do-process-events)))

;;(defparameter *event-processing-task*
;;  (make-instance 'local-task
;;    :name "Event processor"
;;    :callback #'process-events))
;;
;;(defun reschedule-process-events ()
;;  (schedule-local-task *event-processing-task*))

(defun process-no-events ()
  (if-bind time (next-timed-task-start-time)
    (let ((delay (- time (get-real-time))))
      (fsleep delay)
      (process-timed-task-heap))
    (nothing-to-do)))

(defun nothing-to-do ()
  (logger "~&Nothing left to do. Quitting.")
  (throw :exit 0))

(defun do-process-events ()
  (let* ((time (next-timed-task-start-time))
         (now (get-real-time))
         (timeout (when time (max 0 (- time now)))))
    (event-dispatch *event-base* :timeout timeout :only-once t)))


;;; General Loop

(defun task-step ()
  (process-local-tasks)
  (process-timed-task-heap)
  (process-local-tasks)
  (process-events))

(def*fun task-loop ()
  (loop (task-step)))

(defun task-steps (&optional (repeat 1))
  (catch :exit
    (if repeat
        (loop :repeat repeat
          :do (task-step))
        (task-loop))))

;;; Local tasks as a cooperative threading mechanism

(defun/cc yield-local-task ()
  (let/cc k (schedule-local-task k)))

(defun/cc sleep/cc (duration)
  (let/cc k (schedule-timed-local-task k :start-time (+ duration (get-real-time)))))


(defun/cc call-in-forks (things &key first-one-later)
  (loop for thing in (if first-one-later things (cdr things))
        do (schedule-local-task thing))
  (unless first-one-later
    (funcall (wrap (car things)))))

(defmacro do-in-forks (&body body)
  `(call-in-forks (list ,@(mapcar (lambda (x) `(lambda () ,x)) body))))


;; Rendez-vous
(defparameter *local-task-rendez-vous* nil)

(defun/cc call-with-rendez-vous (thunk)
  (let/cc k
    (let ((remaining-tasks 0))
      (labels ((maybe-exit ()
                 (when (zerop remaining-tasks)
                   (kall k)))
               (one-less ()
                 (decf remaining-tasks)
                 (maybe-exit))
               (one-more (thing)
                 (incf remaining-tasks)
                 (schedule-local-task
                  (lambda () (funkall thing) (one-less)))))
        (funcall thunk one-more)
        (maybe-exit)))))

(defun/cc call-with-rendez-vous-after (thunk &optional (rendez-vous *local-task-rendez-vous*))
  (unless rendez-vous
    (error "No rendez-vous."))
  (funcall rendez-vous thunk))

(defmacro with-rendez-vous-after ((&optional (rendez-vous '*local-task-rendez-vous*)) &body body)
  `(call-with-rendez-vous-after
    (lambda () ,@body)
    ,rendez-vous))

(defmacro with-rendez-vous ((&optional (rendez-vous '*local-task-rendez-vous*)) &body body)
  `(call-with-rendez-vous
    (lambda (,rendez-vous) ,@body)))


;;; Do In Parallel (and join at the end)
(defun/cc call-in-parallel (thunks)
  (with-rendez-vous (done)
    (dolist (thunk thunks)
      (call-with-rendez-vous-after thunk done))))

(defmacro do-in-parallel (&body body)
  `(call-in-parallel (list ,@(mapcar (lambda (x) `(lambda () ,x)) body))))


;;; Map In Parallel (return a list with results in first-terminates-last order)
(defun/cc map-in-parallel (function &rest lists)
  (let ((results ()))
    (call-in-parallel
     (apply mapcar
            #'(lambda (&rest args)
                #'(lambda () (push (apply function args) results)))
            lists))
    results))


;; First to finish wins
(defstruct (local-task-competition
             (:conc-name local-task-competition-)
             (:constructor make-local-task-competition (compete maybe-lose maybe-win)))
  (compete)
  (maybe-lose)
  (maybe-win))

(defun/cc call-with-local-task-competition (thunk)
  (let/cc k
    (let ((competition-over-p nil))
      (labels ((maybe-win (&rest r)
                 (unless competition-over-p
                   (setf competition-over-p t)
                   (apply #'kall k r)))
               (maybe-lose ()
                 (let/cc k
                   (unless competition-over-p
                     (kall k))))
               (compete (thunk)
                 (maybe-lose)
                 (schedule-local-task
                  (lambda () (multiple-value-call #'maybe-win (funcall thunk))))))
        (funcall thunk (make-local-task-competition #'compete #'maybe-lose #'maybe-win))))))

(defparameter *local-task-competition* nil)

(defun/cc call-as-local-task-competitor (thunk &optional (competition *local-task-competition*))
  (unless competition
    (error "No local task competition."))
  (funcall (local-task-competition-compete competition) thunk))

(defmacro local-task-competitor
    ((&optional (competition '*local-task-competition*)) &body body)
  `(call-as-local-task-competitor (lambda () ,@body) ,competition))

(defun/cc maybe-lose-local-task-competition (&optional (competition *local-task-competition*))
  (unless competition
    (error "No local task competition."))
  (funcall (local-task-competition-maybe-lose competition)))

(defun/cc maybe-win-local-task-competition
    (&optional (competition *local-task-competition*) &rest r)
  (unless competition
    (error "No local task competition."))
  (apply (local-task-competition-maybe-win competition) r))

(defmacro with-local-task-competition
    ((&optional (competition '*local-task-competition*)) &body body)
  `(call-with-local-task-competition
    (lambda (,competition) ,@body)))

#|;Working test:
(defun/cc foo (nrepeat char maybe-lose)
  (loop repeat nrepeat do (yield-local-task) (funcall maybe-lose) (write-char char)) char)
(schedule-local-task
 (lambda ()
   (with-call/cc
     (DBG (with-local-task-competition (competition)
            (loop for c across "ABCDEFGHI" do
              (let ((x c))
                (local-task-competitor (competition)
                  (foo (+ 2 (random 100)) x
                       (lambda () (maybe-lose-local-task-competition competition)))))))))))
(process-local-tasks)
|#

;; timeout
(defun/cc call-with-local-timeout (timeout thunk)
  (with-local-task-competition (speed-competition)
    (when timeout
      (local-task-competitor (speed-competition) (sleep/cc timeout)))
    (maybe-lose-local-task-competition speed-competition)
    (maybe-win-local-task-competition
     speed-competition
     (funcall thunk speed-competition))))

(defmacro with-local-timeout ((timeout &optional (competition (gensym))) &body body)
  `(call-with-local-timeout ,timeout (lambda (,competition) (declare (ignorable ,competition)) ,@body)))