;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*- ;;;; Manage tasks in the local Lisp image ;;;; Tasks can be scheduled on a queue, ;;;; to be processed either immediately or when a given start time arrives. (in-package :philip-jose) ;;(declaim (optimize (speed 3) (safety 3) (debug 3)));DEBUG ;; All of this assumes single-threaded execution. ;; If you want concurrent execution, you need to either modify this code and add locking, ;; Or to isolate instances that run this code in separate threads that each have a ;; thread-local binding for each of the special variables used. ;;; Tasks (defclass local-task () ((callback :accessor task-callback :initarg :callback) (name :accessor task-name :initarg :name :initform nil))) (defclass timed-local-task (local-task) ((start-time :accessor task-start-time :initarg :start-time :initform nil))) (defvar *current-local-task* nil) ;;; Scheduled tasks (to executed ASAP in FIFO order) (defparameter *scheduled-local-tasks* (make-fifo) "tasks that need to be executed locally") (defun enqueue-immediate-local-task (task) (fifo-enqueue task *scheduled-local-tasks*)) (defun schedule-immediate-local-task (callback &key name) (enqueue-immediate-local-task (make-instance 'local-task :callback callback :name name)) nil) (defun process-local-tasks () (loop until (fifo-empty-p *scheduled-local-tasks*) do (let ((*current-local-task* (fifo-dequeue *scheduled-local-tasks*))) (funkall (task-callback *current-local-task*))))) ;;; Timed tasks (defun time-sooner-p (t1 t2) (cond ((null t2) nil) ((null t1) t) (t (< t1 t2)))) (defun task-scheduled-sooner-p (task1 task2) (time-sooner-p (task-start-time task1) (task-start-time task2))) (defparameter *timed-local-tasks* (make-instance 'binary-heap :lessp #'task-scheduled-sooner-p) "tasks to be executed on farmer at a given time. e.g. timeout functions") (defun schedule-timed-local-task (callback &key start-time name) (insert-item! *timed-local-tasks* (make-instance 'timed-local-task :callback callback :start-time start-time :name name)) nil) (defun next-timed-task-start-time () (when-bind task (unless (container-empty-p *timed-local-tasks*) (least-item *timed-local-tasks*)) (task-start-time task))) (defun process-timed-task-heap (&optional (time (et:gettimeofday))) (loop for next-start-time = (next-timed-task-start-time) while (and next-start-time (< next-start-time time)) do (enqueue-immediate-local-task (pop-least-item! *timed-local-tasks*)))) (defun schedule-local-task (callback &key start-time name) (if start-time (schedule-timed-local-task callback :start-time start-time :name name) (schedule-immediate-local-task callback :name name)) nil) ;;; Event-Base (defun process-events () (if (iomux::event-base-empty-p *event-base*) (process-no-events) (do-process-events))) ;;(defparameter *event-processing-task* ;; (make-instance 'local-task ;; :name "Event processor" ;; :callback #'process-events)) ;; ;;(defun reschedule-process-events () ;; (schedule-local-task *event-processing-task*)) (defun process-no-events () (if-bind time (next-timed-task-start-time) (let ((delay (- time (get-real-time)))) (fsleep delay) (process-timed-task-heap)) (nothing-to-do))) (defun nothing-to-do () (logger "~&Nothing left to do. Quitting.") (throw :exit 0)) (defun do-process-events () (let* ((time (next-timed-task-start-time)) (now (get-real-time)) (timeout (when time (max 0 (- time now))))) (event-dispatch *event-base* :timeout timeout :only-once t))) ;;; General Loop (defun task-step () (process-local-tasks) (process-timed-task-heap) (process-local-tasks) (process-events)) (def*fun task-loop () (loop (task-step))) (defun task-steps (&optional (repeat 1)) (catch :exit (if repeat (loop :repeat repeat :do (task-step)) (task-loop)))) ;;; Local tasks as a cooperative threading mechanism (defun/cc yield-local-task () (let/cc k (schedule-local-task k))) (defun/cc sleep/cc (duration) (let/cc k (schedule-timed-local-task k :start-time (+ duration (get-real-time))))) (defun/cc call-in-forks (things &key first-one-later) (loop for thing in (if first-one-later things (cdr things)) do (schedule-local-task thing)) (unless first-one-later (funcall (wrap (car things))))) (defmacro do-in-forks (&body body) `(call-in-forks (list ,@(mapcar (lambda (x) `(lambda () ,x)) body)))) ;; Rendez-vous (defparameter *local-task-rendez-vous* nil) (defun/cc call-with-rendez-vous (thunk) (let/cc k (let ((remaining-tasks 0)) (labels ((maybe-exit () (when (zerop remaining-tasks) (kall k))) (one-less () (decf remaining-tasks) (maybe-exit)) (one-more (thing) (incf remaining-tasks) (schedule-local-task (lambda () (funkall thing) (one-less))))) (funcall thunk one-more) (maybe-exit))))) (defun/cc call-with-rendez-vous-after (thunk &optional (rendez-vous *local-task-rendez-vous*)) (unless rendez-vous (error "No rendez-vous.")) (funcall rendez-vous thunk)) (defmacro with-rendez-vous-after ((&optional (rendez-vous '*local-task-rendez-vous*)) &body body) `(call-with-rendez-vous-after (lambda () ,@body) ,rendez-vous)) (defmacro with-rendez-vous ((&optional (rendez-vous '*local-task-rendez-vous*)) &body body) `(call-with-rendez-vous (lambda (,rendez-vous) ,@body))) ;;; Do In Parallel (and join at the end) (defun/cc call-in-parallel (thunks) (with-rendez-vous (done) (dolist (thunk thunks) (call-with-rendez-vous-after thunk done)))) (defmacro do-in-parallel (&body body) `(call-in-parallel (list ,@(mapcar (lambda (x) `(lambda () ,x)) body)))) ;;; Map In Parallel (return a list with results in first-terminates-last order) (defun/cc map-in-parallel (function &rest lists) (let ((results ())) (call-in-parallel (apply mapcar #'(lambda (&rest args) #'(lambda () (push (apply function args) results))) lists)) results)) ;; First to finish wins (defstruct (local-task-competition (:conc-name local-task-competition-) (:constructor make-local-task-competition (compete maybe-lose maybe-win))) (compete) (maybe-lose) (maybe-win)) (defun/cc call-with-local-task-competition (thunk) (let/cc k (let ((competition-over-p nil)) (labels ((maybe-win (&rest r) (unless competition-over-p (setf competition-over-p t) (apply #'kall k r))) (maybe-lose () (let/cc k (unless competition-over-p (kall k)))) (compete (thunk) (maybe-lose) (schedule-local-task (lambda () (multiple-value-call #'maybe-win (funcall thunk)))))) (funcall thunk (make-local-task-competition #'compete #'maybe-lose #'maybe-win)))))) (defparameter *local-task-competition* nil) (defun/cc call-as-local-task-competitor (thunk &optional (competition *local-task-competition*)) (unless competition (error "No local task competition.")) (funcall (local-task-competition-compete competition) thunk)) (defmacro local-task-competitor ((&optional (competition '*local-task-competition*)) &body body) `(call-as-local-task-competitor (lambda () ,@body) ,competition)) (defun/cc maybe-lose-local-task-competition (&optional (competition *local-task-competition*)) (unless competition (error "No local task competition.")) (funcall (local-task-competition-maybe-lose competition))) (defun/cc maybe-win-local-task-competition (&optional (competition *local-task-competition*) &rest r) (unless competition (error "No local task competition.")) (apply (local-task-competition-maybe-win competition) r)) (defmacro with-local-task-competition ((&optional (competition '*local-task-competition*)) &body body) `(call-with-local-task-competition (lambda (,competition) ,@body))) #|;Working test: (defun/cc foo (nrepeat char maybe-lose) (loop repeat nrepeat do (yield-local-task) (funcall maybe-lose) (write-char char)) char) (schedule-local-task (lambda () (with-call/cc (DBG (with-local-task-competition (competition) (loop for c across "ABCDEFGHI" do (let ((x c)) (local-task-competitor (competition) (foo (+ 2 (random 100)) x (lambda () (maybe-lose-local-task-competition competition))))))))))) (process-local-tasks) |# ;; timeout (defun/cc call-with-local-timeout (timeout thunk) (with-local-task-competition (speed-competition) (when timeout (local-task-competitor (speed-competition) (sleep/cc timeout))) (maybe-lose-local-task-competition speed-competition) (maybe-win-local-task-competition speed-competition (funcall thunk speed-competition)))) (defmacro with-local-timeout ((timeout &optional (competition (gensym))) &body body) `(call-with-local-timeout ,timeout (lambda (,competition) (declare (ignorable ,competition)) ,@body)))