--- /dev/null
+FARMER_PATH=/home/fare/off-hours-computations/lisp/
+LISP_SYSTEMS=/home/fare/lib/lisp-systems/
+SYSTEM=philip-jose
+LISP=sbcl
+
+farmer.sh: setup.lisp
+ cl-launch -p ${LISP_SYSTEMS} -p ${FARMER_PATH} -l ${LISP} -f $< -s ${SYSTEM} -r philip-jose::farmer-start -o $@
--- /dev/null
+==========================================================
+PHILIP-JOSE, a farmer for distributed computations in Lisp
+==========================================================
+
+Summary
+=======
+
+This package implements a "farmer": a program to control a farm of computers
+and coordinate them into achieving a large computation.
+
+The basic usage model is that all the flow control happens in the farmer,
+whereas all the intensive computation happens in worker processes
+each of which needs only know how to do a simple part of the whole.
+The overall computation may have many components and sub-components
+that include both massive parallelism and sequential dependencies,
+where the control-flow that may depend on the intermediary results,
+where failure of components can happen, be detected and acted upon, etc.
+
+
+License
+=======
+
+This software is released under the bugroff license. Use at your own risk.
+
+ http://www.geocities.com/SoHo/Cafe/5947/bugroff.html
+
+At the insistence of several hackers, I hereby state what is obvious to me,
+that they can reuse any software released under the bugroff license and
+publish it as part or totality of packages under any other license they see fit
+if it really matters to them, including a BSD-style license or a MIT license.
+Yes they can. Of course, if they choose a proprietary software license,
+they only deserve scorn. But even that, they may do!
+
+
+Communication
+=============
+
+Communication happens through a simple request/response protocol (like HTTP).
+The current protocol is designed to make it very easy to pass around
+Lisp data as the arguments and results of computations. It does that well,
+but its current incarnation has many limitations. Most importantly,
+the current system is intrinsically unsecure. DO NOT USE IT ON THE INTERNET.
+Use it only within a trusted network, or wrapped over encrypted lines.
+Also, the protocol is NOT designed for transmitting large amounts of data.
+If workers need share data with the farmer and/or with each other,
+they should access it through independent means, such as a file server.
+The farmer as it currently is is only designed to handle
+the general flow control of the computation.
+
+Workers are spawned by the farmer itself, using ssh or fork+exec,
+from a farm of registered machines.
+A simple mechanism to register and unregister machines is provided.
+A validation may be run on each worker machine
+to qualify it or not for the current computation.
+Because simple forking (without exec) is not (currently) supported
+(and because it is not usually available for distributed computations),
+an independent means of storing and retrieving data
+is once again needed if workers are to share it.
+
+
+Flow Control
+============
+
+On the plus side, the flow-control of the computation
+can be written in a very natural way.
+The farmer supports for sequential, parallel and first-wins subcomputations.
+Moreover, it provides a universal primitive (i.e. delimited continuations)
+on top of of which you can build your own control structures
+if you ever need something more elaborate.
+And indeed, the previous high-level control structures
+were built in terms of these primitives.
+
+I am told the implementation of continuations I am using (arnesi)
+allows continuations to be serialized, so in theory,
+you could build upon this to achieve mobile code between several farmers
+(as in the Tube), persistent threads, load-balancing, etc.
+
+Note that arnesi's call/cc and kall are more like shift and reset
+than like Scheme's call/cc. I admit I haven't tried nesting them
+syntactically and see if it still does the right thing, and even less to
+see if it handles exceptions properly.
+I'd have to compare the results to those of the examples given
+in the following documents:
+
+ * http://okmij.org/ftp/Computation/Continuations.html
+ * http://mumble.net/~campbell/scheme/bshift.scm
+ * http://calculist.blogspot.com/2007/01/non-native-shiftreset-with-exceptions.html
+
+
+Threading
+=========
+
+The farmer is written with the assumption that it will work
+on a sequential Lisp implementation, or
+in a single thread of a multithreaded Lisp implementation.
+IT WILL NOT WORK CORRECTLY IN A MULTITHREADED ENVIRONMENT.
+Its concurrent programming activities are built out of
+a home-grown green thread mechanism on top of arnesi's call/cc.
+
+Philip-jose provides green threads intended to be used on the farmer.
+Because of their execution model, all computations between two calls
+to some threading or I/O primitives are done in a same atomic transaction.
+
+The upside is that you can get a lot done without ever having to use locks
+or any other explicit mutual exclusion mechanism --
+just avoid calling any computation with complex control flow between two
+operations the results of which should be seen atomically by other threads.
+Many places in the existing code indeed assume this sequential execution
+to access to various shared data-structures in atomic transactions
+without the explicit use of locks.
+
+The downside is that when you really need such mutual exclusion mechanism,
+it is not provided (yet), and other parts of the code haven't been made
+to use any such mechanism (yet). Writing and providing such mechanisms
+should be pretty easy however, and are let as an exercise to the astute
+contributor.
+
+Though a skeleton of multithreaded server is provided,
+it is not tested, probably buggy, lacking in features, and
+will not work in conjunction with other code in philip-jose.
+I'm told that the author of IOLib is building
+a better, more featureful, more stable, infrastructure for I/O in Lisp.
+Philip-Jose is no such infrastructure.
+
+
+TO-DO List
+==========
+
+ 1- package the system into something usable. Provide documentation, etc.
+
+ 2- use non-blocking IO to provide for better networking in farmers:
+ the ability to be both a server and client, etc.
+
+ 3- find how to serialize continuations to provide richer capabilities:
+ persistent (more robust) threads, and mobile code (including load balancing).
+
+ 4- Implement fork() as a more efficient (though trickier) alternative to
+ fork+exec() for spawning new clients that share initial state.
+
+ 5- provide a faster alternative to arnesi's with-call/cc, based on e.g. Screamer.
+
+ 6- implement interfaces that are compatible with existing distributed systems?
+ (Scheme) Termite, Askemos, Tube, Kali, Dreme, [Something by Queinnec]
+ (CL) StarLisp, NetCLOS, GBBopen, ...
+ Erlang
+
+ 7- use hunchentoot or such to support HTTP instead of the current protocol.
+
+ 8- implement a simple map/reduce for local-tasks, for worker-jobs
+
+ 9- implement pure streams, and mappings between pure & impure streams,
+ so that I/O in competing threads only gets committed in the winning thread;
+ also allows for easy backtracking with I/O operations. One thread may
+ (read-string "foo" s) and just catch the exception and die if not available.
+
+ 10- have a get-read-buffer interface to not read character-by-character
+
+ 11- have thread IDs, and guard various event-handlers with a test for the thread
+ still being alive. Alternatively have a mechanism to (atomically) remove event-handlers,
+ queued jobs, etc., from their respective queue, set, etc.
+
+ 12- enforce atomicity with a global flag \*atomic* or such; when this flag is on,
+ weak yields turn into NOPs and strong yields raise error when it is on.
+ Now we also have to distinguish between weak yields and strong yields.
+
+ 13- Have sub-queues, etc., for scheduling.
+
+ 14- provide some explicit mechanisms for mutual-exclusion and transactionality.
+
+ 15- merge with Erlang-in-Lisp
+
+
+Behaving like an Erlang node
+============================
+
+ * see distel: http://bc.tech.coop/blog/070528.html
+
+ * the Erlang distribution protocol can be found in the source release at
+ erts/emulator/internal_doc/erl_ext_dist.txt
+
+
+Using ancillary data to BSD UNIX sockets
+========================================
+
+If you are send multiple fds in a same sendmsg()
+make sure you send them all in a single CMSG
+because multiple CMSGs of same type are broken on many kernels.
+man cmsg. See SCM_RIGHTS.
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Main loop for the Philip-Jose Farmer
+
+;;; Manage a farm of processes each doing a part of a grand job.
+
+(in-package :philip-jose)
+
+(defparameter *philip-jose-farmer-version* 0)
+
+(defun-request-handler :farmer-version ()
+ (reply *philip-jose-farmer-version*))
+
+(defun-request-handler :square (n)
+ (reply (* n n)))
+
+(defun-request-handler :1+ (n)
+ (reply (1+ n)))
+
+(defun-request-handler :reload ()
+ (reply (asdf:oos 'asdf:load-op *farmer-system*)))
+
+(defun-request-handler :register-client (id &key valid explanation)
+ (logger "~&Registering client~{ ~A~}" id)
+ (reply t))
+
+(defun-request-handler :armageddon ()
+ (flet ((kill-worker-id (worker-id)
+ (destructuring-bind (machine process connect-time) worker-id
+ (declare (ignorable connect-time))
+ (logger "~&Armageddon killing pid ~D on ~A" process machine)
+ (kill-machine-process machine process))))
+ (maphash (lambda (worker-id status)
+ (declare (ignorable status))
+ (kill-worker-id worker-id))
+ *registered-workers*)
+ (maphash (lambda (job-id job)
+ (declare (ignorable job-id))
+ (kill-worker-id (car (job-status job))))
+ *claimed-worker-jobs*))
+ (reply t)
+ (logger "~&Armageddon quitting")
+ (sb-ext:quit))
+
+(defun-job fake (m)
+ (setf *random-state* (make-random-state t))
+ (let ((x (* 1d-3 (random 10000))))
+ (DBG :fake m x)
+ (fsleep x)
+ (values m x)))
+
+
+(defun show-tasks ()
+ (let ((*print-level* 4))
+ (logger "~&scheduled local-tasks: ~S~%" (fifo-head *scheduled-local-tasks*))
+ (logger "~&timed local-tasks: ~S~%" (container-contents *timed-local-tasks*))
+ (logger "~&scheduled worker jobs: ~S~%" (fifo-head *scheduled-worker-jobs*))
+ (logger "~&claimed worker jobs: ~S~%" (hash-table->alist *claimed-worker-jobs*)))
+ nil)
+
+(defun clear-tasks ()
+ (fifo-empty! *scheduled-local-tasks*)
+ (empty-container! *timed-local-tasks*)
+ (fifo-empty! *scheduled-worker-jobs*)
+ (clrhash *claimed-worker-jobs*)
+ nil)
+
+#|
+(trace call-request-handler handle-sexp-request)
+
+(hash-table->alist *request-heads*)
+|#
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Forking
+
+(in-package :philip-jose)
+
+#-sbcl
+(progn
+ (defvar *expected-children 2.1)
+ (defvar *expected-space* 16000000))
+
+#+sbcl
+(progn
+ ;; Simple heuristic: if we have allocated more than the given ratio
+ ;; of what is allowed between GCs, then trigger the GC.
+ ;; note: can possibly modify parameters and reset in sb-ext:*after-gc-hooks*
+ (defparameter *prefork-allocation-reserve-ratio* .10) ; default ratio: 10%
+ (defun should-i-gc-p ()
+ (let ((available-bytes (- (sb-alien:extern-alien "auto_gc_trigger" sb-alien:long)
+ (sb-kernel:dynamic-usage)))
+ (allocation-threshhold (sb-ext:bytes-consed-between-gcs)))
+ (< available-bytes (* *prefork-allocation-reserve-ratio* allocation-threshhold)))))
+
+(defun pre-fork-hook ()
+ #+sbcl
+ (when (should-i-gc-p)
+ (sb-ext:gc))
+ nil)
+
+(defun do-fork ()
+ #-sbcl (et:fork)
+ #+sbcl (sb-posix:fork))
+
+(defun post-fork-child-cleanup ()
+ nil)
+
+(defun post-fork-parent-hook (pid)
+ pid)
+
+(defun fork ()
+ (pre-fork-hook)
+ (let ((pid (do-fork)))
+ (if (zerop pid)
+ (post-fork-child-cleanup)
+ (post-fork-parent-hook pid))
+ pid))
+
+
+(defvar *cleanup-pusher* nil
+ "dynamically-scoped function for declaring cleanups")
+(defun push-cleanup (cleanup)
+ (if *cleanup-pusher*
+ (funcall *cleanup-pusher* cleanup)
+ (error "No cleanup pusher declared")))
+(defun call-with-cleanups (thunk)
+ (let ((cleanups nil))
+ (flet ((push-cleanup (cleanup) (push cleanup cleanups)))
+ (unwind-protect
+ (funcall thunk #'push-cleanup)
+ (dolist (cleanup cleanups)
+ (funcall cleanup))))))
+(defun
+
+(defun call-with-foreign-temporaries (thunk)
+ (let (temporaries)
+ (flet ((alloc (s)
+ (push s temporaries)))
+ (unwind-protect
+ (funcall thunk alloc)
+ (loop for s in temporaries do
+ (apply #'cffi:free-converted-object s)))))))
+
+(defmacro with-foreign-temporaries (&body body)
+ (with-gensyms (s)
+ `(call-with-foreign-temporaries
+ (lambda (,s)
+ (macrolet ()
+ ,@body)))))
+
+(cffi:defcenum frobork-kode
+ :return
+ :execve
+ :dup2
+ :close
+ :fchdir
+ :setuid
+ :setgid
+ :signal
+ :setsid
+ :setpgid
+ :tcsetpgrp)
+
+(cffi:defcfun (c-frobork "frobork") :int (kode :pointer))
+
+(defun frobork-convert (v)
+ (typecase v
+ (integer
+ v)
+ (keyword
+
+ (string
+ (cffi:convert-to-foreign v :string))
+ (integer
+ v)
+
+
+
+(defun frobork (kl)
+ (let ((l (length kl)))
+ (with-foreign-object (kode :int (1+ l))
+ (let ((alloc-state
+ (loop with converted = nil
+ with state = nil
+ for i below l
+ for v in kl
+ do (multiple-value-setq (converted state)
+ (frobork-convert v))
+ (setf (mem-aref kode :int i) converted)
+ when state
+ collect state)))
+ (setf (mem-aref kode :int l) 0)
+ (prog1
+ (c-frobork kode)
+ (loop for s in alloc-state
+ do (apply #'cffi:free-converted-object s)))))))
--- /dev/null
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+
+enum frobork_kode {
+ FK_RETURN,
+ FK_EXECVE,
+ FK_DUP2,
+ FK_CLOSE,
+ FK_FCHDIR,
+ FK_SETUID,
+ FK_SETGID,
+ FK_SIGNAL,
+ FK_SETSID,
+ FK_SETPGID,
+ FK_TCSETPGRP
+};
+
+
+/*
+ * frobork.c
+ */
+
+#define XX (*kode++)
+#define T(type) ((type)XX)
+#define X T(int)
+#define TRY2(x,y) if((x)==(y)) { goto on_error; } ; break;
+#define TRY(x) TRY2(x,-1)
+#define OBJLEN(x) (x),sizeof(x)
+#define GETX {x=X;}
+
+#if 0 /* support for any of the types we use being larger than an int */
+#define MAXUINTP1 (1ULL+(unsigned long long)((unsigned int)-1))
+#define SIZUI (sizeof(unsigned int))
+#define SIZULL (sizeof(unsigned int))
+
+unsigned long long read_and_increment_pointer (unsigned int**p, unsigned int size) {
+ unsigned long long r;
+ unsigned int *q = *p;
+ unsigned int i = 0;
+ unsigned long long factor = 1;
+
+ if (size <= SIZUI) {
+ r = *q;
+ q++;
+ } else if (size > SIZULL) {
+ write(2,OBJLEN("frobork: bad int size\n"));
+ exit(43);
+ } else {
+ r = 0;
+ while (i<size) {
+ r += *q*f;
+ f *= MAXUINTP1;
+ q++;
+ i+=SIZUI;
+ }
+ }
+ *p = q;
+ return r;
+}
+#define T(type) ((type)read_and_increment_pointer((unsigned int**)&kode,sizeof(type)))
+#endif
+
+int frobork (int* kode)
+{
+ pid_t pid, p1,p2;
+ int x;
+ sighandler_t sh;
+ char *a,**b,**c;
+
+ pid = fork();
+ if (pid) {
+ return pid;
+ }
+
+ while (1) {
+ switch (X) {
+ case FK_RETURN:
+ return(0);
+ case FK_EXECVE:
+ a=T(char *);b=T(char **);c=T(char **);
+ TRY(execve(a,b,c));
+ break;
+ case FK_DUP2:
+ GETX; TRY(dup2(x,X));
+ break;
+ case FK_CLOSE:
+ TRY(close(X));
+ break;
+ case FK_FCHDIR:
+ TRY(fchdir(X));
+ break;
+ case FK_SETUID:
+ TRY(setuid(T(uid_it)));
+ break;
+ case FK_SETGID:
+ TRY(setgid(T(gid_it)));
+ break;
+ case FK_SIGNAL:
+ GETX;
+ switch(X) {
+ case 0: sh = SIG_...;
+ case 1: sh = SIG_...;
+ case 2: sh = SIG_...;
+ }
+ TRY2(signal(x,y,SIG_ERR);
+ break;
+ case FK_SETSID:
+ TRY(setsid());
+ break;
+ case FK_SETPGID:
+ p1=T(pid_t);p2=T(pid_t);
+ TRY(setpgid(p1,p2));
+ break;
+ case FK_TCSETPGRP:
+ GETX;TRY(tcsetpgrp(x,T(pid_t)));
+ break;
+ default:
+ write(2,OBJLEN("frobork: bad code\n"));
+ exit(42);
+ }
+ }
+ return 0;
+on_error:
+ perror("frobork");
+ exit(44);
+}
+
+#ifdef TEST_FROBORK
+int main (int argc, char** argv) {
+ int kode[] = {
+ FK_DUP2, 0, 4,
+ FK_CLOSE, 4,
+ FK_SETSID,
+ FK_RETURN
+ };
+ argc++;argv++;
+ printf("pid=%d frobork returned %d\n",getpid(),frobork(kode));
+ return 0;
+}
+#endif
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Main loop for the Philip-Jose Farmer
+
+;;;;; Incremental parser protocol
+
+(in-package :philip-jose)
+
+;;;; The Incremental parser protocol
+;;
+;; This protocol allows for a single-threaded server to monitor many connections at once,
+;; possibly on many ports, with each connection having partial request message that will
+;; either be trivially handled, or lead to spawning after the request is complete.
+;;
+;; An incremental parser function is given a buffer (byte or char array),
+;; the size up to which the buffer is filled, and
+;; the previous (incomplete) state of the incremental parser, or nil on the first attempt.
+;; It returns two values. The first is a boolean that tells if the parse was successful.
+;; If the first value is T, then the second value is the result of the parse.
+;; If the first value is NIL, then the second value is the state of the incremental parser,
+;; from which to resume when the buffer is made more full.
+;;
+;; In a world with any kind of first-class (partial) continuations, either linear or multi-entry,
+;; any direct parser could be made incremental, by having the READ-CHAR primitive return the
+;; partial continuation of the parser to the caller.
+;; Maybe we can hook Screamer or arnesi's call/cc into here.
+
+;; TODO:
+;; * add a buffer management protocol to make the thing work for bigger requests and
+;; on-going conversations
+
+;;;; Trivial implementations of simple cases
+
+
+;;; Line parser -- the request in complete at the end of the first line
+(defun line-end-code-p (cc)
+ (member cc '(10 13)))
+(defun line-end-char-p (ch)
+ (line-end-code-p (char-code ch)))
+
+(defun trivial-incremental-line-parser (buffer size state)
+ ;; Implements the incremental parser protocol for character input, detecting a full line.
+ (let* ((line-end-pos (position-if #'line-end-char-p buffer :start (or state 0))))
+ (if line-end-pos
+ (values t line-end-pos)
+ (values nil size))))
+
+
+;;; Wrapper that prevents too many retries
+;; Drop (presumably buggy or malevolent) connections with too many packets for the initial request.
+;; As in attempts to flood with small packets, connection with a lot of fragmentation,
+;; malformed request, overlarge request, buffer overflow tentative.
+
+(define-condition too-many-attempts ()
+ ()
+ (:report (lambda (condition stream)
+ (declare (ignore condition))
+ (format stream "Too many attempts in incremental request parsing"))))
+
+(defun incremental-parser-attempt-limiter (max-attempts incremental-parser)
+ (labels ((check (x)
+ (when (>= x max-attempts)
+ (error 'too-many-attempts)))
+ (try (buffer size state)
+ (multiple-value-bind (count inner-state)
+ (if state (values (car state) (cdr state)) (values 0 nil))
+ (check count)
+ (multiple-value-bind (successp x)
+ (funcall incremental-parser buffer size inner-state)
+ (if successp
+ (values t x)
+ (let ((c (1+ count)))
+ (check c)
+ (values nil (cons c x))))))))
+ (if (typep max-attempts '(unsigned-byte 16))
+ #'try
+ incremental-parser)))
+
+
+;;; Trivial transformation of a parser into a pseudo incremental parser.
+;; Restarting from the beginning everytime.
+;; It's inefficient and a big DoS target unless combined with the limiter above.
+
+(defun pseudo-incremental-parser-from-reader (reader)
+ ;; Implements the incremental parser protocol for character input, given a reader.
+ ;; Not really incremental, always restart from scratch, so called pseudo.
+ ;; Could do better with call/cc and an open stream protocol.
+ #'(lambda (buffer size state &aux index)
+ (declare (ignore state))
+ (handler-case
+ (values t (cons (with-input-from-string (s buffer :index index :start 0 :end size)
+ (funcall reader s t))
+ index))
+ (end-of-file () (values nil nil)))))
+
+
+;;; Use of the Lisp reader as a parser.
+;; NOT FOR USE IN THE WILD! See warning in trivial-sexp-server.lisp
+
+(let ((f (pseudo-incremental-parser-from-reader #'safe-read)))
+ (defun make-incremental-reader (&optional (max-attempts 4))
+ (incremental-parser-attempt-limiter max-attempts f)))
+
+
+(defun incrementally-parsed-request-handler (connection buffer size state &optional (handler #'handle-sexp-request))
+ (with-trivial-logging ()
+ (destructuring-bind (request . index) state
+ (with-open-stream (bufin (make-string-input-stream buffer index size))
+ (with-open-stream (in (make-concatenated-stream bufin connection))
+ (let ((*standard-input* in)
+ (*standard-output* connection))
+ (funcall handler request)))))))
+
+(defun make-incrementally-parsed-request-handler (h)
+ #'(lambda (connection buffer size state)
+ (incrementally-parsed-request-handler connection buffer size state h)))
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Manage tasks in the local Lisp image
+;;;; Tasks can be scheduled on a queue,
+;;;; to be processed either immediately or when a given start time arrives.
+(in-package :philip-jose)
+
+;;(declaim (optimize (speed 3) (safety 3) (debug 3)));DEBUG
+
+;; All of this assumes single-threaded execution.
+;; If you want concurrent execution, you need to either modify this code and add locking,
+;; Or to isolate instances that run this code in separate threads that each have a
+;; thread-local binding for each of the special variables used.
+
+
+;;; Tasks
+(defclass local-task ()
+ ((callback :accessor task-callback :initarg :callback)
+ (name :accessor task-name :initarg :name :initform nil)))
+
+(defclass timed-local-task (local-task)
+ ((start-time :accessor task-start-time :initarg :start-time :initform nil)))
+
+(defvar *current-local-task* nil)
+
+
+;;; Scheduled tasks (to executed ASAP in FIFO order)
+
+(defparameter *scheduled-local-tasks*
+ (make-fifo)
+ "tasks that need to be executed locally")
+
+(defun enqueue-immediate-local-task (task)
+ (fifo-enqueue task *scheduled-local-tasks*))
+
+(defun schedule-immediate-local-task (callback &key name)
+ (enqueue-immediate-local-task
+ (make-instance 'local-task
+ :callback callback :name name))
+ nil)
+
+(defun process-local-tasks ()
+ (loop until (fifo-empty-p *scheduled-local-tasks*) do
+ (let ((*current-local-task* (fifo-dequeue *scheduled-local-tasks*)))
+ (funkall (task-callback *current-local-task*)))))
+
+
+;;; Timed tasks
+
+(defun time-sooner-p (t1 t2)
+ (cond
+ ((null t2) nil)
+ ((null t1) t)
+ (t (< t1 t2))))
+
+(defun task-scheduled-sooner-p (task1 task2)
+ (time-sooner-p (task-start-time task1) (task-start-time task2)))
+
+(defparameter *timed-local-tasks*
+ (make-instance 'binary-heap :lessp #'task-scheduled-sooner-p)
+ "tasks to be executed on farmer at a given time. e.g. timeout functions")
+
+(defun schedule-timed-local-task (callback &key start-time name)
+ (insert-item!
+ *timed-local-tasks*
+ (make-instance 'timed-local-task
+ :callback callback :start-time start-time :name name))
+ nil)
+
+(defun next-timed-task-start-time ()
+ (when-bind task
+ (unless (container-empty-p *timed-local-tasks*)
+ (least-item *timed-local-tasks*))
+ (task-start-time task)))
+
+(defun process-timed-task-heap (&optional (time (et:gettimeofday)))
+ (loop for next-start-time = (next-timed-task-start-time)
+ while (and next-start-time (< next-start-time time)) do
+ (enqueue-immediate-local-task (pop-least-item! *timed-local-tasks*))))
+
+(defun schedule-local-task (callback &key start-time name)
+ (if start-time
+ (schedule-timed-local-task callback :start-time start-time :name name)
+ (schedule-immediate-local-task callback :name name))
+ nil)
+
+
+;;; Event-Base
+
+(defun process-events ()
+ (if (iomux::event-base-empty-p *event-base*)
+ (process-no-events)
+ (do-process-events)))
+
+;;(defparameter *event-processing-task*
+;; (make-instance 'local-task
+;; :name "Event processor"
+;; :callback #'process-events))
+;;
+;;(defun reschedule-process-events ()
+;; (schedule-local-task *event-processing-task*))
+
+(defun process-no-events ()
+ (if-bind time (next-timed-task-start-time)
+ (let ((delay (- time (get-real-time))))
+ (fsleep delay)
+ (process-timed-task-heap))
+ (nothing-to-do)))
+
+(defun nothing-to-do ()
+ (logger "~&Nothing left to do. Quitting.")
+ (throw :exit 0))
+
+(defun do-process-events ()
+ (let* ((time (next-timed-task-start-time))
+ (now (get-real-time))
+ (timeout (when time (max 0 (- time now)))))
+ (event-dispatch *event-base* :timeout timeout :only-once t)))
+
+
+;;; General Loop
+
+(defun task-step ()
+ (process-local-tasks)
+ (process-timed-task-heap)
+ (process-local-tasks)
+ (process-events))
+
+(def*fun task-loop ()
+ (loop (task-step)))
+
+(defun task-steps (&optional (repeat 1))
+ (catch :exit
+ (if repeat
+ (loop :repeat repeat
+ :do (task-step))
+ (task-loop))))
+
+;;; Local tasks as a cooperative threading mechanism
+
+(defun/cc yield-local-task ()
+ (let/cc k (schedule-local-task k)))
+
+(defun/cc sleep/cc (duration)
+ (let/cc k (schedule-timed-local-task k :start-time (+ duration (get-real-time)))))
+
+
+(defun/cc call-in-forks (things &key first-one-later)
+ (loop for thing in (if first-one-later things (cdr things))
+ do (schedule-local-task thing))
+ (unless first-one-later
+ (funcall (wrap (car things)))))
+
+(defmacro do-in-forks (&body body)
+ `(call-in-forks (list ,@(mapcar (lambda (x) `(lambda () ,x)) body))))
+
+
+;; Rendez-vous
+(defparameter *local-task-rendez-vous* nil)
+
+(defun/cc call-with-rendez-vous (thunk)
+ (let/cc k
+ (let ((remaining-tasks 0))
+ (labels ((maybe-exit ()
+ (when (zerop remaining-tasks)
+ (kall k)))
+ (one-less ()
+ (decf remaining-tasks)
+ (maybe-exit))
+ (one-more (thing)
+ (incf remaining-tasks)
+ (schedule-local-task
+ (lambda () (funkall thing) (one-less)))))
+ (funcall thunk one-more)
+ (maybe-exit)))))
+
+(defun/cc call-with-rendez-vous-after (thunk &optional (rendez-vous *local-task-rendez-vous*))
+ (unless rendez-vous
+ (error "No rendez-vous."))
+ (funcall rendez-vous thunk))
+
+(defmacro with-rendez-vous-after ((&optional (rendez-vous '*local-task-rendez-vous*)) &body body)
+ `(call-with-rendez-vous-after
+ (lambda () ,@body)
+ ,rendez-vous))
+
+(defmacro with-rendez-vous ((&optional (rendez-vous '*local-task-rendez-vous*)) &body body)
+ `(call-with-rendez-vous
+ (lambda (,rendez-vous) ,@body)))
+
+
+;;; Do In Parallel (and join at the end)
+(defun/cc call-in-parallel (thunks)
+ (with-rendez-vous (done)
+ (dolist (thunk thunks)
+ (call-with-rendez-vous-after thunk done))))
+
+(defmacro do-in-parallel (&body body)
+ `(call-in-parallel (list ,@(mapcar (lambda (x) `(lambda () ,x)) body))))
+
+
+;;; Map In Parallel (return a list with results in first-terminates-last order)
+(defun/cc map-in-parallel (function &rest lists)
+ (let ((results ()))
+ (call-in-parallel
+ (apply mapcar
+ #'(lambda (&rest args)
+ #'(lambda () (push (apply function args) results)))
+ lists))
+ results))
+
+
+;; First to finish wins
+(defstruct (local-task-competition
+ (:conc-name local-task-competition-)
+ (:constructor make-local-task-competition (compete maybe-lose maybe-win)))
+ (compete)
+ (maybe-lose)
+ (maybe-win))
+
+(defun/cc call-with-local-task-competition (thunk)
+ (let/cc k
+ (let ((competition-over-p nil))
+ (labels ((maybe-win (&rest r)
+ (unless competition-over-p
+ (setf competition-over-p t)
+ (apply #'kall k r)))
+ (maybe-lose ()
+ (let/cc k
+ (unless competition-over-p
+ (kall k))))
+ (compete (thunk)
+ (maybe-lose)
+ (schedule-local-task
+ (lambda () (multiple-value-call #'maybe-win (funcall thunk))))))
+ (funcall thunk (make-local-task-competition #'compete #'maybe-lose #'maybe-win))))))
+
+(defparameter *local-task-competition* nil)
+
+(defun/cc call-as-local-task-competitor (thunk &optional (competition *local-task-competition*))
+ (unless competition
+ (error "No local task competition."))
+ (funcall (local-task-competition-compete competition) thunk))
+
+(defmacro local-task-competitor
+ ((&optional (competition '*local-task-competition*)) &body body)
+ `(call-as-local-task-competitor (lambda () ,@body) ,competition))
+
+(defun/cc maybe-lose-local-task-competition (&optional (competition *local-task-competition*))
+ (unless competition
+ (error "No local task competition."))
+ (funcall (local-task-competition-maybe-lose competition)))
+
+(defun/cc maybe-win-local-task-competition
+ (&optional (competition *local-task-competition*) &rest r)
+ (unless competition
+ (error "No local task competition."))
+ (apply (local-task-competition-maybe-win competition) r))
+
+(defmacro with-local-task-competition
+ ((&optional (competition '*local-task-competition*)) &body body)
+ `(call-with-local-task-competition
+ (lambda (,competition) ,@body)))
+
+#|;Working test:
+(defun/cc foo (nrepeat char maybe-lose)
+ (loop repeat nrepeat do (yield-local-task) (funcall maybe-lose) (write-char char)) char)
+(schedule-local-task
+ (lambda ()
+ (with-call/cc
+ (DBG (with-local-task-competition (competition)
+ (loop for c across "ABCDEFGHI" do
+ (let ((x c))
+ (local-task-competitor (competition)
+ (foo (+ 2 (random 100)) x
+ (lambda () (maybe-lose-local-task-competition competition)))))))))))
+(process-local-tasks)
+|#
+
+;; timeout
+(defun/cc call-with-local-timeout (timeout thunk)
+ (with-local-task-competition (speed-competition)
+ (when timeout
+ (local-task-competitor (speed-competition) (sleep/cc timeout)))
+ (maybe-lose-local-task-competition speed-competition)
+ (maybe-win-local-task-competition
+ speed-competition
+ (funcall thunk speed-competition))))
+
+(defmacro with-local-timeout ((timeout &optional (competition (gensym))) &body body)
+ `(call-with-local-timeout ,timeout (lambda (,competition) (declare (ignorable ,competition)) ,@body)))
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Trivial Logging mechanism
+
+(in-package :philip-jose)
+
+(defun trivial-logger (s fmt &rest args)
+ (with-standard-io-syntax
+ (let ((s (or s *error-output*))
+ (*print-pretty* nil)
+ (*print-readably* nil))
+ (format s "~&")
+ (apply #'format s fmt args)
+ (format s "~&")
+ (finish-output s))))
+
+(def*fun logger (fmt &rest args)
+ (apply #'trivial-logger *error-output* fmt args))
+
+(def*macro with-trivial-logging ((&optional s) &body body)
+ `(let ((*error-output* (or ,s *error-output*)))
+ (handler-bind
+ ((condition #'(lambda (condition)
+ (logger "~&~S~%" condition)
+ #+sbcl (ignore-errors (sb-debug:backtrace 100)))))
+ ,@body)))
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Machines and executing on them...
+
+
+(in-package :philip-jose)
+
+(defparameter *hostname* nil)
+(def*fun gethostname ()
+ (or *hostname* (setf *hostname* (sb-unix:unix-gethostname))))
+
+;;(defparameter *domainname* nil)
+;;(defun getdomainname ()
+;; (or *domainname* (setf *domainname* (sb-unix:unix-getdomainname))))
+
+(defparameter *localhost-names* nil)
+(def*fun get-localhost-names ()
+ (or *localhost-names* (setf *localhost-names* (compute-localhost-names))))
+(defun compute-localhost-names ()
+ (list nil
+ "localhost"
+ (gethostname)))
+;;; (concatenate 'string (gethostname) "." (getdomainname))))
+
+(defun machine-localhost-p (m)
+ ;;--- the most correct way would be to resolve and then compare to 127.0.0.1/8 and other local addresses from ifconfig output...
+ (member m (get-localhost-names) :test #'equal))
+
+
+
+(def*fun spawn-process-on (machine &rest process-spec)
+ (if (machine-localhost-p machine)
+ (apply #'spawn-process process-spec)
+ (apply #'spawn-remote-process machine process-spec)))
+
+;;;---*** make less SBCL-dependent?
+#-sbcl (error "Philip-Jose requires SBCL at this point")
+(def*fun spawn-process (program &rest args)
+ (sb-ext:run-program (->string program) (mapcar #'->string args)
+ :wait nil :pty nil :input nil :output nil :error nil))
+
+(defun ->string (x)
+ (typecase x
+ (string x)
+ (symbol (string-downcase (symbol-name x)))
+ (t (format nil "~A" x))))
+
+(def*fun spawn-remote-process (machine program &rest args)
+ (spawn-process *ssh-path*
+ machine (command-line-from-strings
+ (mapcar #'->string (cons program args)))))
+
+(defun kill-machine-process (machine pid)
+ (spawn-process-on machine "/bin/kill" -9 pid))
+
+(defun pts-old-p (p)
+ (let ((mtime (nth-value 11 (et:stat (namestring p)))))
+ (< mtime (- (et:gettimeofday) 3600))))
+
+(defun all-pts-old-p ()
+ (every #'pts-old-p
+ (directory (make-pathname :directory "/dev/pts/" :name :wild))))
+
+(defun low-loadavg-p ()
+ (< (nth-value 2 (read-loadavg)) .38))
+
+(cffi:defcfun getloadavg :int (loadavg :pointer) (nelem :int))
+
+(defun read-loadavg ()
+ (values-list
+ (cffi:with-foreign-object (loadavg :double 3)
+ (getloadavg loadavg 3)
+ (loop for i below 3 collect
+ (cffi:mem-aref loadavg :double i)))))
+
+(defun get-id ()
+ (or *id*
+ (setf *id* (list (gethostname) (iolib-posix:getpid) (get-real-time)))))
+
+(defun validate-and-start-client (server port)
+ (let* ((all-pts-old-p (all-pts-old-p))
+ (low-loadavg-p (low-loadavg-p))
+ (valid (and all-pts-old-p low-loadavg-p)))
+ (simple-client `(:register-client ,(get-id) :valid t ;;,valid
+ ;; :explanation (:all-pts-old-p ,all-pts-old-p :low-loadavg-p ,low-loadavg-p)
+ )
+ server port)
+ (if t ;valid
+ (worker-loop :server server :port port)
+ (logger "~&Client not taking part in the farm: ~S"
+ (list :all-pts-old-p all-pts-old-p :low-loadavg-p low-loadavg-p)))))
+
+(def*fun simple-client (message target port)
+ (let* ((address (ensure-address target))
+ (socket (make-tcp-connection address port)))
+ (unwind-protect
+ (progn
+ (safe-write message :stream socket)
+ (princ +crlf+ socket)
+ (finish-output socket)
+ ;;(shutdown socket :write)
+ (read socket))
+ (close socket))))
+
+(defun spawn-client (machine &optional (server (gethostname)) (port *port*))
+ (logger "~&Spawning client on ~S" machine)
+ (spawn-process-on machine *farmer-path* "client" server port))
+
+(def*fun start-clients ()
+ (spawn-all-clients)
+ nil)
+
+(def*fun spawn-all-clients ()
+ (dolist (machine (process-registry-log))
+ (loop repeat *workers-per-machine* do
+ (spawn-client machine))))
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Trivial Job Management
+(in-package :philip-jose)
+
+#|
+Advice from #lisp: maybe use cells or computed-class -- but not really
+ http://common-lisp.net/project/cells/
+ http://common-lisp.net/project/computed-class/
+lemonodor: Jim Firby's RAP (not free software, but a good read)
+ http://people.cs.uchicago.edu/~firby/raps/index.shtml
+
+Proper interface:
+a little language to dynamically build a dependency graph.
+A node remembers those existing nodes that still need to be completed
+before that node may be undertaken.
+When a node is completed, its dependencies are propagated,
+which may themselves generate more jobs...
+
+And the language is Lisp, thanks to CALL/CC!
+
+|#
+
+
+;;; Global lock to protect state (not used at this time)
+#|
+(defparameter *tracker-lock* (make-lock)
+ "global lock for multithreaded servers")
+
+(defmacro with-tracker-lock (() &body body)
+ `(with-lock-held (*tracker-lock*)
+ ,@body))
+|#
+
+;;; Workers have an id, a known status
+
+(defvar *current-worker*
+ nil
+ "the worker making current request")
+
+(defparameter *registered-workers*
+ (make-hash-table :test 'equal)
+ "workers that have registered a job")
+
+(defun register-worker (id &optional (status :working))
+ (Setf (gethash id *registered-workers*) status))
+
+(defun unregister-worker (id)
+ (remhash id *registered-workers*))
+
+
+;;; Your every worker job
+
+(defclass worker-job (simple-print-object-mixin)
+ ((id :accessor job-id :initarg :id :documentation
+ "nonce to identify the job")
+ (description :accessor job-description :initarg :description :documentation
+ "description of the job, a SEXP to be transmitted to the worker")
+ (status :accessor job-status :initarg :status :initform nil :documentation
+ "Status of the job, NIL if to be dispatched, T if done, a worker ID if being done")
+ (validator :accessor job-validator :initarg :validator :initform t :documentation
+ "thunk to validate whether a job is still needed by the time a worker is ready to claim it.")
+ (on-claim :accessor job-on-claim :initarg :on-claim :initform nil :documentation
+ "hook thunk to call when a worker claims the job")
+ (on-completion :accessor job-on-completion :initarg :on-completion :documentation
+ "continuation to call when a job returns")))
+
+(defparameter *worker-job-counter* 0)
+
+(defun make-worker-job-id ()
+ `(,@*id* :worker-job ,(incf *worker-job-counter*)))
+
+(defvar *current-worker-job*
+ nil
+ "the worker job currently examined")
+
+;;; Worker jobs can be scheduled or claimed
+
+(defparameter *claimed-worker-jobs*
+ (make-hash-table :test 'equal)
+ "jobs claimed by some worker")
+
+(defun claim-job (job &optional (worker *current-worker*))
+ (setf (gethash (job-id job) *claimed-worker-jobs*) job)
+ (setf (job-status job) (list worker (get-real-time)))
+ (notify-claimed-job job))
+
+(defun notify-claimed-job (job)
+ (funkall (job-on-claim job)))
+
+(defun notify-completed-job (job &rest results)
+ (setf (job-status job) :completed)
+ (apply #'funkall (job-on-completion job) results))
+
+(defun unclaim-job (job)
+ (setf (job-status job) '(:unclaimed))
+ (remhash (job-id job) *claimed-worker-jobs*)
+ nil)
+
+(defparameter *scheduled-worker-jobs*
+ (make-fifo)
+ "jobs that need to be executed by some worker")
+
+(defun format-job (stream job)
+ "Format only the interesting slots of job to stream."
+ (format stream "#<job ~d ~A>"
+ (job-id job)
+ (job-description job)))
+
+(defun enqueue-worker-job (job)
+ (logger "~&Enqueueing ~A" (format-job nil job))
+ (fifo-enqueue job *scheduled-worker-jobs*)
+ job)
+
+(defun issue-worker-job (description &rest keys
+ &key validator on-claim on-completion)
+ (declare (ignore validator on-claim on-completion))
+ (enqueue-worker-job
+ (apply #'make-instance 'worker-job
+ :id (make-worker-job-id)
+ :description description
+ keys)))
+
+(defun get-next-worker-job ()
+ (loop until (fifo-empty-p *scheduled-worker-jobs*)
+ do (let ((job (fifo-dequeue *scheduled-worker-jobs*)))
+ (when (validate-job job)
+ (logger "~&Issuing ~A" (job-id job))
+ (return job)))))
+
+;;; Global defaults when looking for a job
+
+(defun get-next-worker-job-specification ()
+ (if-bind job (get-next-worker-job)
+ (progn
+ (claim-job job)
+ (job-specification job))
+ (no-job-to-do)))
+
+(defun job-specification (job)
+ `(:job :id ,(job-id job) :description ,(job-description job)))
+
+(defparameter *all-done*
+ nil
+ "when everything is said and done, we'll tell each worker to die in peace")
+
+(defun no-job-to-do ()
+ (if *all-done*
+ (suicide-job)
+ (sleep-job)))
+
+(defun suicide-job (&optional (worker *current-worker*))
+ (unregister-worker worker)
+ '(:die))
+
+(defun sleep-job ()
+ `(:sleep ,*sleep-delay*))
+
+(defgeneric validate-job (job)
+ (:method ((x t))
+ t))
+
+(defmethod validate-job ((job worker-job))
+ (funkall (or (job-validator job) t)))
+
+(defun hand-job-over-to-worker (job &optional (worker *current-worker*))
+ (with-accessors ((id job-id) (desc job-description)
+ (on-claim job-on-claim) (on-completion job-on-completion))
+ job
+ (register-worker worker id)
+ (claim-job id worker)))
+
+(defun-request-handler worker-request (id &rest keys &key completed results error)
+ (logger "~&Got worker-request from~{ ~A~}, with keys ~S" id keys)
+ (when error
+ (logger "~&Error from worker~{ ~A~}: ~A" id error))
+ (let ((*current-worker* id))
+ (when completed
+ (job-done completed id results))
+ (reply (get-next-worker-job-specification))))
+
+(defun job-done (job-id worker results)
+ (if-bind job (gethash job-id *claimed-worker-jobs*)
+ (destructuring-bind (claimant start-time) (job-status job)
+ (if (equal worker claimant)
+ (logger "~&Job ~D completed by worker ~S in ~D seconds"
+ (job-id job) worker
+ (- (get-real-time) start-time))
+ (logger "~&Worker ~S announced he completed job ~A, originally claimed by ~S"
+ worker (format-job nil job) claimant))
+ (unclaim-job job)
+ (register-worker worker :idle)
+ (apply #'notify-completed-job job results))
+ (logger "~&Worker ~S announced completion of job ~A, that doesn't exist (anymore?)"
+ worker (format-job nil job))))
+
+(defun-request-handler show-job-status ()
+ (reply* :all-done *all-done* :claimed-worker-jobs (hash-table->alist *claimed-worker-jobs*)))
+
+(defun notify-worker-timeout (worker &aux (pity nil))
+ ;; In this Trotskyist/Guevarist approach to management, we simply kill those
+ ;; counter-revolutionary saboteurs who are late at delivering the results
+ ;; expected by the manager. Oh, of course, being good socialists, we never
+ ;; kill any worker. Mind you, the miserable insect is unregistered first, so
+ ;; that not being a worker anymore, it is killed without spilling sacred
+ ;; proletarian blood. Insects are worthless, and multiply anyway.
+ (unregister-worker worker)
+ (destructuring-bind (insect without rights) worker
+ (declare (ignore rights))
+ (kill-machine-process insect without) pity
+ (spawn-client insect)))
+
+
+;;; Advanced Control Flow
+
+(exporting-definitions
+
+(defvar *retry-on-timeout* nil)
+
+(defun/cc issue-worker-job/cc (description &key validator on-claim on-completion
+ retry-on-timeout)
+ "Issue one worker job"
+ (if retry-on-timeout
+ (loop
+ for attempts from 1 do
+ (let ((guard t) (worker nil))
+ (multiple-value-bind (successp &optional results)
+ (with-local-task-competition (c)
+ (issue-worker-job
+ description
+ :validator (lambda ()
+ (and guard (funkall (or validator t))))
+ :on-claim (lambda ()
+ (funkall
+ (prog1 on-claim
+ (setf on-claim nil
+ worker *current-worker*)
+ (local-task-competitor (c) (sleep/cc retry-on-timeout) nil))))
+ :on-completion (lambda (&rest results)
+ (setf worker nil)
+ (maybe-win-local-task-competition c t results))))
+ ;;(DBG :gah guard successp results attempts)
+ (setf guard nil)
+ (if successp
+ (return (apply on-completion results))
+ (when worker
+ (logger "~&Timed out:~{ ~A~}" worker)
+ (notify-worker-timeout worker))))))
+ (issue-worker-job description
+ :validator validator
+ :on-claim on-claim
+ :on-completion on-completion)))
+
+(defun/cc issue-sequential-job (description &key retry-on-timeout validator on-claim on-completion)
+ "Issue a job. When the job is completed, return the values returned by the worker."
+ (let/cc k
+ (issue-worker-job/cc
+ description
+ :retry-on-timeout retry-on-timeout
+ :validator validator
+ :on-claim on-claim
+ :on-completion (if on-completion
+ (lambda (&rest results)
+ (apply #'funkall on-completion results)
+ (apply #'kall k results))
+ k))))
+
+(defun/cc issue-sequential-job-with-timeout
+ (description &key timeout retry-on-timeout validator on-claim on-completion)
+ "Issue a job. If the job is completed before the timeout, return T and the values returned by the worker,
+otherwise, return NIL"
+ (let ((guard t)
+ (worker nil))
+ (multiple-value-bind (successp &optional results)
+ (with-local-timeout (timeout)
+ (let/cc k
+ (issue-worker-job/cc
+ description
+ :retry-on-timeout retry-on-timeout
+ :validator (lambda () (and guard (funkall (or validator t))))
+ :on-claim (lambda () (setf worker *current-worker*) (funkall on-claim))
+ :on-completion (lambda (&rest results)
+ (setf worker nil)
+ (apply #'funkall on-completion results)
+ (kall k t results)))))
+ (setf guard nil)
+ (when worker
+ (notify-worker-timeout worker))
+ (apply #'values successp results))))
+
+
+;;; Jobs to be run in parallel of each other.
+
+(defun/cc default-reducer (accumulator primary-value &rest other-values)
+ (declare (ignore other-values))
+ (cons primary-value accumulator))
+
+(defun/cc call-with-parallel-job-issuer (thunk &key
+ (reducer #'default-reducer)
+ (initial-value nil))
+ (let/cc k
+ (let ((remaining-jobs 0)
+ (accumulator initial-value)
+ (thunk-done nil))
+ (labels ((maybe-exit ()
+ (when (and thunk-done (zerop remaining-jobs))
+ (kall k accumulator)))
+ (one-less (values)
+ (setf accumulator (apply reducer accumulator values))
+ (decf remaining-jobs)
+ (maybe-exit))
+ (one-more (description &key retry-on-timeout on-claim on-completion)
+ ;; should we error out when thunk-done is true,
+ ;; or is it OK for jobs or escaped continuations
+ ;; to cause more jobs to be queued?
+ (let/cc k
+ (incf remaining-jobs)
+ (issue-worker-job/cc
+ description
+ :retry-on-timeout retry-on-timeout
+ :on-claim (lambda ()
+ (funkall on-claim)
+ (kall k))
+ :on-completion (lambda (&rest values)
+ (apply #'funkall on-completion values)
+ (one-less values))))))
+ (funcall thunk #'one-more)
+ (setf thunk-done t)
+ (maybe-exit)))))
+
+(defparameter *parallel-job-issuer* nil)
+
+(defun/cc issue-parallel-job (description
+ &key retry-on-timeout on-claim on-completion
+ (issuer *parallel-job-issuer*))
+ (unless issuer
+ (error "No parallel job issuer for job ~S" description))
+ (funcall issuer description
+ :retry-on-timeout retry-on-timeout :on-claim on-claim :on-completion on-completion))
+
+(defmacro with-parallel-jobs ((issuer &rest keys &key reducer initial-value) &body body)
+ "BODY will be executed where ISSUER is lexically bound and fbound
+(or if it is NIL, *PARALLEL-JOB-ISSUER* dynamically bound) to a function that will
+issue jobs to be run in parallel then pause until the job is claimed by a worker.
+
+When a worker returns, the REDUCER is applied to the accumulator and the values
+provided by the worker, where the accumulator is initialized with INITIAL-VALUE.
+When the body is fully evaluated and all the jobs issued so far have returned,
+then the value of the accumulator is returned.
+
+The default INITIAL-VALUE is NIL, and the default REDUCER is DEFAULT-REDUCER, which
+conses the primary value returned by each job into a list ordered by last-returned first.
+If you just want to drop those values, use (CONSTANTLY NIL) as the REDUCER."
+ (declare (ignore reducer initial-value))
+ (with-gensyms (args)
+ `(call-with-parallel-job-issuer
+ (lambda (,(or issuer '*parallel-job-issuer*))
+ (labels (,@(when issuer
+ `(,issuer (&rest ,args)
+ (apply ,issuer ,args))))
+ ,@body))
+ ,@keys)))
+)
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+
+(in-package :cl-user)
+
+(cl:defpackage :philip-jose
+ (:use :common-lisp
+ :fare-utils
+ :split-sequence :parse-number
+ :net.sockets :cffi :iomux
+ :threads
+ :arnesi :closer-mop)
+ (:shadowing-import-from :arnesi
+ #:aif2 #:acond2 #:aif #:acond #:it ;same anaphoric macros in fare-utils
+ #:let1 #:compose #:if-bind ;also in fare-utils
+ #:join-strings #:copy-array #:strcat #:quit ;also in fare-utils
+ #:_ ;conflicts with fare-utils
+ #:name ;conflicts with net.sockets
+ #:partition ;conflicts with split-sequence
+ )
+ (:export #:*farmer-path* #:*farmer-system* #:*id*
+ #:*local-task-name* #:*max-request-size*
+ #:*parallel-job-issuer* #:*registry-path*
+ #:*retry-on-timeout* #:*ssh-path* #:*startup-actions*
+ #:*usage* #:*workers-per-machine* #:+crlf+ #:+unix-epoch+
+ #:bork #:call-job-handler #:call-request-handler
+ #:call-with-parallel-job-issuer #:collect-slots
+ #:command-line-from-strings #:defun-job
+ #:defun-request-handler #:errexit #:errformat #:exit #:fsleep
+ #:funkall #:get-localhost-names #:get-real-time #:getarg
+ #:gethostname #:issue-parallel-job #:issue-sequential-job
+ #:issue-sequential-job-with-timeout #:issue-worker-job/cc
+ #:logger #:main #:no-more-arg #:no-more-args #:print-object
+ #:program-help #:register-job-handler #:register-request-head
+ #:reply #:reply* #:simple-client #:simple-print-object
+ #:simple-print-object-mixin #:simple-worker-loop
+ #:simple-worker-thing #:slots-to-print #:spawn-all-clients
+ #:spawn-process #:spawn-process-on #:spawn-remote-process
+ #:start-clients #:start-multi-threaded-server
+ #:start-single-threaded-server #:start-test #:startup
+ #:stop-server #:strings-from-command-line
+ #:strings-from-command-line-stream #:task-loop #:unexpected-end-of-text
+ #:with-error-message-handler #:with-parallel-jobs
+ #:with-rendez-vous #:with-rendez-vous-after
+ #:with-trivial-logging))
+
+;;(write (fare-utils:make-defpackage-form :philip-jose :gensym) :pretty t :case :downcase)(terpri)
--- /dev/null
+;;;-*- Lisp -*-
+
+(in-package :cl-user)
+
+(proclaim '(optimize (speed 3) (safety 2) (debug 1) (space 3)))
+
+(asdf:oos 'asdf:load-op :cl-launch)
+
+(asdf:defsystem :philip-jose
+ :name "Philip Jose Farmer"
+ :serial nil
+ :depends-on (:cl-launch ;; needed first, for fasl caching(?)
+ :cffi :iolib :io.multiplex :net.dns-client
+ :split-sequence :net-telent-date :parse-number
+ :bordeaux-threads :arnesi :closer-mop
+ :fare-utils)
+ :components ((:file "package")
+ (:file "utilities" :depends-on ("package"))
+ (:file "specials" :depends-on ("package"))
+ (:file "logger" :depends-on ("specials"))
+ (:file "registry" :depends-on ("logger" "utilities"))
+ (:file "trivial-sexp-server" :depends-on ("logger"))
+ (:file "incremental-parsing" :depends-on ("trivial-sexp-server"))
+ (:file "tcp-client" :depends-on ("logger"))
+ (:file "worker" :depends-on ("tcp-client" "utilities" "specials"))
+ (:file "machines" :depends-on ("trivial-command-line" "registry" "worker" "specials"))
+ (:file "trivial-command-line" :depends-on ("logger"))
+ (:file "tcp-server" :depends-on ("trivial-sexp-server" "incremental-parsing"
+ "logger"))
+ (:file "local-tasks" :depends-on ("logger" "utilities"))
+ (:file "manager" :depends-on
+ ("tcp-server" "tcp-client" "logger" "utilities" "local-tasks"))
+ (:file "farmer" :depends-on ("machines" "registry" "manager"))))
--- /dev/null
+#!/bin/zsh -f
+
+PROG="${0##*/}"
+case "$PROG" in
+ register-*|ohcreg) FOO=register ;;
+ *) FOO="${PROG%%-*}" ;;
+esac
+
+LOG=/home/fare/off-hours-computations/incoming/log
+
+DATE="$(date -u +"%Y-%m-%d.%H:%M:%S")"
+IPADDRS="$(/sbin/ifconfig | grep "inet addr" | grep -v 127.0.0.1 | \
+ perl -npe 's/^.*addr:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+) .*$/$1/')"
+
+ENTRY="$(echo $FOO $USERNAME $HOST $DATE $IPADDRS ${1+# $*})"
+
+echo "$ENTRY" >> $LOG
+
+echo "The following entry has been logged:"
+echo "$ENTRY"
+echo
+echo "Thank you for participating in the off-hours computation farm."
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Registering machines for use on the farm.
+
+(in-package :philip-jose)
+
+(defparameter *start-time* nil)
+(defun get-start-time ()
+ (or *start-time* (setf *start-time* (get-real-time))))
+
+(defun read-time (time)
+ (date:parse-time time :patterns
+ '((date::year (date::date-divider) date::month (date::date-divider) date::day
+ (date::time-divider) date::hour (date::time-divider) (date::minute)
+ (date::time-divider) (date::secondp)))))
+
+(defun decode-registry-log-line (x)
+ (destructuring-bind (op user machine time-string &rest rest) (split-sequence #\SPACE x)
+ (let ((time (- (read-time time-string) +unix-epoch+))
+ (hash (position "#" rest :test #'string-equal))
+ ips limit args)
+ (if hash
+ (setf ips (subseq rest 0 hash)
+ args (subseq rest (1+ hash))
+ limit (ignore-errors
+ (+ time
+ (* (parse-number:parse-positive-real-number x)
+ (* 24 60 60)))))
+ (setf ips rest))
+ (values op user machine time ips limit args))))
+
+(defparameter *registered-hosts* nil)
+
+(defun process-registry-log ()
+ (let ((*registered-hosts* (make-hash-table :test 'equal)))
+ (with-open-file (s *registry-path*)
+ (process-registry-log-stream s))
+ (mapcar #'car (sort (hash-table->alist *registered-hosts*) #'< :key #'cdr))))
+
+(defun process-registry-log-stream (s)
+ (loop for x = (read-line s nil)
+ while x do
+ (multiple-value-bind (op user machine time ips limit args) (decode-registry-log-line x)
+ (declare (ignore user ips args))
+ (when (and (<= time (get-start-time))
+ (or (null limit)
+ (< (get-start-time) limit)))
+ (cond
+ ((string= "register" op)
+ (setf (gethash machine *registered-hosts*) time))
+ ((string= "unregister" op)
+ (remhash machine *registered-hosts*)))))))
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Default configuration for the Philip-Jose Farmer
+
+(in-package :philip-jose)
+
+(def*var *farmer-system*
+ :farmer
+ "Name of the system, as visible to the user. Override with the name of your application")
+
+(def*var *farmer-path*
+ "/home/fare/off-hours-computations/bin/farmer"
+ "path of the farmer, to spawn new clients with fork+exec or ssh")
+
+(def*var *registry-path*
+ "/home/fare/off-hours-computations/incoming/log"
+ "path of the registry identifying which machines are part of the farm")
+
+(def*var *ssh-path*
+ "/usr/bin/ssh"
+ "Path to the SSH executable, for fork+exec")
+
+(def*parameter +crlf+
+ (make-array
+ '(2) :element-type 'base-char
+ :initial-contents (list (code-char 13) (code-char 10)))
+ "Constant string for CR+LF, the standard line terminator on the net")
+
+(def*var *id*
+ nil
+ "Identity of the current process: a list with machine name, pid and start time")
+
+(def*var *local-task-name*
+ nil
+ "TODO: use this to give a name to the currently executed task, for debugging purpose")
+
+(def*var *max-request-size*
+ 4096 ;; for debugging, use 128. Normally, more like 4096
+ "Maximum size of a request, in bytes")
+;; Note that (1) the response can be of any size, and
+;; (2) at present, we preallocate a buffer for the whole request
+;; instead of making it extensible so don't make it too large without fixing the code.
+
+(def*var *workers-per-machine*
+ 4
+ "Number of workers to spawn per machine")
+;; TODO: have a dynamic strategy that takes the number of CPUs and/or the load into account.
+
+(defparameter *sleep-delay*
+ 10
+ "duration in seconds to ask workers to sleep when there is no job ready for them yet")
+
+
+;;; Global variables for server stuff
+(defparameter *server* nil)
+(defparameter *port* 6666)
+(defparameter *server-event* nil)
+
+(defparameter *event-base* nil
+ "Base object for I/O multiplexing")
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Trivial TCP client
+
+(in-package :philip-jose)
+
+(defun make-tcp-connection (address port)
+ (let ((socket (make-socket :address-family :internet :type :stream :connect :active
+ :protocol :default :ipv6 nil)))
+ (connect socket address :port port)
+ socket))
+
+(defun ensure-address (x)
+ (etypecase x
+ (sockaddr x)
+ (string
+ (let ((host (lookup-host x :ipv6 nil)))
+ (first (host-addresses host))))))
+
+
+(def*fun simple-client (message target port)
+ (let* ((address (ensure-address target))
+ (socket (make-tcp-connection address port)))
+ (unwind-protect
+ (progn
+ (safe-write message :stream socket)
+ (princ +crlf+ socket)
+ (finish-output socket)
+ ;;(shutdown socket :write)
+ (read socket))
+ (close socket))))
+
+
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Simple TCP server
+
+(in-package :philip-jose)
+
+
+;;;; Generic TCP routines
+
+(defun make-tcp-server-socket (address port)
+ (let ((socket (make-socket :address-family :internet
+ :type :stream
+ :connect :passive
+ :ipv6 nil)))
+ (bind-address socket address :port (or port 0))
+ (socket-listen socket)
+ socket))
+
+(defun call-connection-handler (socket handler &rest args)
+ (with-trivial-logging ()
+ (unwind-protect
+ (let* ((*standard-input* socket)
+ (*standard-output* socket))
+ (apply handler args)
+ (finish-output))
+ (close socket))))
+
+
+;;;; Spawners for a multi-threaded system having one thread per connection.
+(defun make-connection-worker (socket handler &rest args)
+ #'(lambda ()
+ (apply #'call-connection-handler socket handler args)))
+
+(defun connection-spawner (listener-socket handler)
+ #'(lambda (fd evtype)
+ (if (eql evtype :read)
+ (let ((connection (accept-connection listener-socket)))
+ (make-thread (make-connection-worker connection handler)) :name "connection worker")
+ (logger "~&Error on FD ~A" fd))))
+
+(defun add-multi-threaded-server (event-base listener-socket handler)
+ (iomux::add-fd event-base (sockets::socket-fd listener-socket)
+ :read (connection-spawner listener-socket handler)
+ :persistent t))
+
+
+;;;; Spawners for a single-threaded system
+;; Generic code that implements an efficient non-blocking single-threaded server
+;; based on an event-loop and incremental message:
+;; maintain incrementally parsed request buffers
+;; and process requests when complete.
+;;
+;; The same single server thread can listen to many event sources at once and
+;; implement many servers using this (or other compatible) protocols.
+;;
+;; Reading bytes (or -- ugly hack -- latin1) would avoid ugly SB-IMPL::END-OF-INPUT-IN-CHARACTER
+;; when a packet is cut in the middle of a multi-byte character representation.
+;; For anything non-trivial, we really want bytes when reading/writing, and a
+;; flexible stream protocol.
+;;
+;; My design choice: I want to minimize the time spent in
+;; the single-threaded network message parser/dispatcher
+;; so although my source code for the parser will have characters and strings,
+;; I will be processing bytes directly the forked/threaded clients can do
+;; octets-to-string on whatever the parser gives them.
+;;
+;; As long as I only use ASCII this will do with BASE-CHAR.
+
+(defun add-connection-handler (event-base connection incremental-parser request-handler
+ &key (element-type 'base-char)
+ (external-format :iso-8859-1) ;; :default
+ (max-buffer-size *max-request-size*))
+ ;;; There is no race condition between setting handle and reading it in close-handler,
+ ;;; because events are not handled asynchronously, but only at the next event-dispatch
+ ;;; so everything we do inside a handler is properly isolated from other handlers
+ ;;; (but not from other threads).
+ (declare (ignorable element-type external-format max-buffer-size)) ;---***
+ (let* ((bytes-p (typep 0 element-type))
+ (initial-element (if bytes-p 0 #\space))
+ (buffer (make-array (list max-buffer-size)
+ :element-type element-type
+ :initial-element initial-element))
+ (size 0)
+ (state nil)
+ handle)
+ (labels
+ ((close-handler ()
+ (remove-event event-base handle)
+ (close connection))
+ (handler (fd evtype)
+ (declare (ignorable fd))
+ (handler-case
+ (cond
+ ((eql evtype :read)
+ ;;--- cannot read-sequence, because it will block. Have to use read.
+ (let ((packet-size
+ (cffi:with-pointer-to-vector-data (buf-ptr buffer)
+ (et:read fd (cffi:inc-pointer buf-ptr size) (- max-buffer-size size)))))
+ (incf size packet-size)
+ (logger "~&Incoming packet from ~A: ~D bytes~%"
+ (sockets::vector-to-dotted
+ (sockets::name
+ (sockets::remote-address connection)))
+ packet-size)
+ (multiple-value-bind (x y)
+ (funcall incremental-parser buffer size state)
+ (cond
+ (x
+ (funcall request-handler connection buffer size y)
+ (close-handler))
+ ((= size max-buffer-size)
+ ;;---*** log an error?
+ (close-handler))
+ (t
+ (setf state y)))))) ; get ready for next packet
+ (t
+ (close-handler)))
+ (condition (c)
+ (logger "~&~S~%" c)
+ (close-handler)))))
+ (setf handle (iomux::add-fd event-base (sockets::socket-fd connection)
+ :read #'handler :persistent t)))))
+
+(defun add-single-threaded-server (event-base socket incremental-parser request-handler &rest keys
+ &key element-type external-format max-buffer-size)
+ (declare (ignorable element-type external-format max-buffer-size))
+ (labels
+ ((handler (fd evtype)
+ (declare (ignorable fd))
+ (with-trivial-logging ()
+ (case evtype
+ (:read
+ (let ((connection (accept-connection socket)))
+ (logger "~&Connection opened with ~A~%"
+ (sockets::vector-to-dotted
+ (sockets::name (sockets::remote-address connection))))
+ (apply #'add-connection-handler event-base connection incremental-parser request-handler keys)))
+ (otherwise
+ (logger "~&Weird event ~S on ~S~%" evtype fd))))))
+ (iomux::add-fd event-base (sockets::socket-fd socket)
+ :read #'handler :persistent t)))
+
+
+
+
+
+(defun ensure-event-base ()
+ ; can't put this make-instance form in specials.lisp because compiletime != runtime
+ (if (null *event-base*)
+ (setf *event-base* (make-instance 'iomux:event-base))))
+
+;; A one central server socket
+(defun make-server (&optional (address +ipv4-unspecified+) (port *port*))
+ (when *server*
+ (close *server*))
+ (ensure-event-base)
+ (setf *server* (make-tcp-server-socket address port))
+ (setf *port* (sockets::local-port *server*)))
+
+(def*fun start-multi-threaded-server (&key (handler #'sexp-request-handler)
+ client-starter process-events)
+ (make-server)
+ (when client-starter
+ (funcall client-starter))
+ (format *error-output* "Starting server on port ~A~%" *port*)
+ (finish-output *error-output*)
+ (setf *server-event* (add-multi-threaded-server *event-base* *server* handler))
+ (when process-events
+ (event-dispatch *event-base*)))
+
+(def*fun start-single-threaded-server (&key (incremental-parser (make-incremental-reader))
+ (handler (make-incrementally-parsed-request-handler #'handle-sexp-request))
+ client-starter process-events)
+ (make-server)
+ (when client-starter
+ (funcall client-starter))
+ (format *error-output* "Starting server on port ~A~%" *port*)
+ (finish-output *error-output*)
+ (setf *server-event* (add-single-threaded-server
+ *event-base* *server* incremental-parser handler))
+ (when process-events
+ (event-dispatch *event-base*)))
+
+(def*fun stop-server ()
+ (close *server*)
+ (remove-event *event-base* *server-event*)
+ (setf *server* nil)
+ (setf *server-event* nil))
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Trivial Testing
+
+(in-package :philip-jose)
+
+#|
+(require :asdf) (pushnew "/home/fare/lib/lisp-systems/" asdf:*central-registry* :test (function equal)) (asdf:oos (quote asdf:load-op) :qultivator :verbose t) (in-package :philip-jose) (DBG :foo (simple-client (list 7 8) "localhost" 7))
+
+(unless *server* (start-single-threaded-server :process-events nil))
+(issue-worker-job '(:fake 0) :on-completion (lambda (&rest x) (DBG :fake x))) (task-test 4)
+
+(defvar *keys* nil)
+(setf *keys* (apply (function worker-step) :server "localhost" *keys*))
+|#
+
+(defun run-test (f &rest args)
+ (catch :exit
+ (let (x keys)
+ (with-call/cc
+ (DBG :test (apply f args))
+ (setf x t))
+ (loop until x do (task-step)))))
+
+(defmacro xtest (&body body)
+ `(run-test (with-call/cc (lambda () ,@body))))
+
+(defun task-loop-1 (n)
+ (DBG :tl1 n)
+ (when (plusp n)
+ (schedule-local-task
+ #'(lambda () (task-loop-1 (1- n)))))
+ nil)
+
+(defun test-seq (n)
+ (xtest
+ (loop for i from 1 to n do
+ (DBG :test-seq i (issue-sequential-job `(:fake ,i))))
+ (DBG :test-seq-end)))
+
+(defun test-para (n)
+ (xtest
+ (DBG :tpara)
+ (with-parallel-jobs (i)
+ (loop for i from 1 to n do
+ (DBG :i i)
+ (i `(:fake ,i) :on-completion (lambda (&rest x) (DBG :fake x))))
+ (DBG :no-issue-left))
+ (DBG :z)))
+
+(defun test-timeout (x y)
+ (xtest
+ (with-local-timeout (x c)
+ (sleep/cc y)
+ (maybe-win-local-task-competition c 42))))
+
+#|(defun spawn-workers ()
+ (let ((num-workers (hash-table-count *registered-hosts*)))
+ (loop for x being the hash-keys of *registered-hosts* doing
+ (shell-command (format nil "ssh ~a" x))
+ (|#
+
+#|
+(defun run-client (server)
+ (loop for count from 0 do
+ (multiple-value-bind (start-block num-blocks) (get-next-slice server)
+ ;;---*** finish me
+ ))
+|#
+
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Trivial Command-Line server.
+
+;; This provides a simple command-line dispatch framework,
+;; where each set of arguments
+;; a list of non-whitespace words separated by whitespace (duh!)
+;; where the first word is the operator and other words are arguments.
+
+(in-package :philip-jose)
+
+;;; A bit of parsing and unparsing between shell command-lines and list of shell strings
+(defmacro define-ascii-char-code-predicates (name (x) &body body)
+ ;; assumes ASCII / latin1 / unicode (!)
+ (let* ((code-p-name (conc-symbol "ASCII-" name "-CODE-P"))
+ (char-p-name (conc-symbol "ASCII-" name "-CHAR-P")))
+ `(progn
+ (defun ,code-p-name (,x) ,@body)
+ (defun ,char-p-name (c) (,code-p-name (char-code c))))))
+(define-ascii-char-code-predicates uppercase-letter (x)
+ (<= 65 x 90))
+(define-ascii-char-code-predicates lowercase-letter (x)
+ (<= 97 x 122))
+(define-ascii-char-code-predicates digit (x)
+ (let ((d (- x #x30)))
+ (when (<= 0 d 9)
+ d)))
+(define-ascii-char-code-predicates whitespace (x)
+ (or (<= x 32)
+ (= x 127)
+ (= x #xA0)))
+(define-ascii-char-code-predicates letter (x)
+ (or (ascii-uppercase-letter-code-p x)
+ (ascii-lowercase-letter-code-p x)))
+(define-ascii-char-code-predicates letter-or-digit (x)
+ (or (ascii-letter-code-p x)
+ (ascii-digit-code-p x)))
+(define-ascii-char-code-predicates normal-shell (x)
+ (or (ascii-letter-code-p x)
+ (<= #x2B x #x3A) ; +,-./0123456789:
+ (member x '(#x25 #x3D #x5F)))) ; %=_
+
+(defun normal-shell-string-p (s)
+ (and (stringp s)
+ (every #'ascii-normal-shell-char-p s)))
+
+(defun write-string-single-quoted-stream (string stream)
+ (write-char #\' stream)
+ (loop for c across string do
+ (case c
+ ((#\' #\\) (write-char #\\ stream)))
+ (write-char c stream))
+ (write-char #\' stream))
+
+(def*fun command-line-from-strings (strings &optional stream)
+ (with-output (stream)
+ (loop for (s . rest) on strings do
+ (if (normal-shell-string-p s)
+ (write-string s stream)
+ (write-string-single-quoted-stream s stream))
+ (when rest (write-char #\space stream)))))
+
+(def*fun strings-from-command-line (command-line)
+ (with-input-from-string (s command-line)
+ (strings-from-command-line-stream s)))
+
+(define*-condition unexpected-end-of-text (simple-error) ())
+
+(def*fun strings-from-command-line-stream (command-line-stream)
+ ;; handles quoting and escaping like a shell, but does not handle $ and other subtleties
+ (declare (optimize (speed 3) (safety 2) (debug 1)))
+ (let* ((s (make-string-output-stream))
+ (r nil))
+ (labels
+ ((flush ()
+ (push (get-output-stream-string s) r))
+ (exit ()
+ (return-from strings-from-command-line-stream (nreverse r)))
+ (putc (c)
+ (write-char c s))
+ (getc ()
+ (read-char command-line-stream nil))
+ (ungetc (c)
+ (unread-char c command-line-stream))
+ (eot (x &rest args)
+ (error 'unexpected-end-of-text
+ :format-control x
+ :format-arguments args))
+ (skip-spaces ()
+ (loop for c = (getc)
+ while (and c (ascii-whitespace-char-p c))
+ finally
+ (if c
+ (ungetc c)
+ (exit)))
+ (get-normal-char))
+ (get-normal-char ()
+ (let ((c (getc)))
+ (case c
+ ((nil) (flush) (exit))
+ (#\\ (get-escaped-char))
+ (#\' (get-single-quoted-char))
+ (#\" (get-double-quoted-char))
+ (otherwise
+ (if
+ (ascii-whitespace-char-p c)
+ (progn (flush) (skip-spaces))
+ (progn (putc c) (get-normal-char)))))))
+ (get-escaped-char ()
+ (let ((c (getc)))
+ (case c
+ ((nil) (eot "Command-line ended where escaped character expected"))
+ (otherwise (putc c) (get-normal-char)))))
+ (get-double-quoted-char ()
+ (let ((c (getc)))
+ (case c
+ ((nil) (eot "Command-line ended inside double quotes"))
+ (#\\ (get-escaped-double-quoted-char))
+ (#\" (get-normal-char))
+ (otherwise (putc c) (get-double-quoted-char)))))
+ (get-escaped-double-quoted-char ()
+ (let ((c (getc)))
+ (case c
+ ((nil) (eot "Command-line ended where escaped double quoted character expected"))
+ (otherwise (putc c) (get-double-quoted-char)))))
+ (get-single-quoted-char ()
+ (let ((c (getc)))
+ (case c
+ ((nil) (eot "Command-line ended inside single quotes"))
+ (#\\ (get-escaped-single-quoted-char))
+ (#\' (get-normal-char))
+ (otherwise (putc c) (get-single-quoted-char)))))
+ (get-escaped-single-quoted-char ()
+ (let ((c (getc)))
+ (case c
+ ((nil) (eot "Command-line ended where escaped character expected"))
+ ((#\\ #\') (putc c) (get-single-quoted-char))
+ (otherwise (putc #\\) (putc c) (get-single-quoted-char))))))
+ (skip-spaces))))
+
+(def*var *usage*
+ "farmer [action] [args...]
+
+For a list of valid actions, try
+ farmer help
+")
+
+(def*parameter *startup-actions*
+ '(("server" start-server "[port]") ; the main server tracks what happens
+ ("client" start-client "server port"); each client is a worker (future plan: spawns and monitors workers)
+ ;;("worker" start-worker "server port id job") ; each worker does a minutes worth of work
+ ("test" start-test) ; test
+ ("status" query-status) ; queries the current status
+ ("help" program-help) ; queries available actions
+ (nil start-server "server"))) ; default: start the server
+
+(def*fun bork (&rest r)
+ (declare (ignore r))
+ (error "bork"))
+
+(def*fun exit (&optional (code 0) &rest r)
+ (declare (ignore r))
+ (throw :exit code))
+
+(def*fun errformat (fmt &rest args)
+ (apply #'format *error-output* fmt args))
+
+(def*fun errexit (code fmt &rest args)
+ (apply #'errformat fmt args)
+ (exit code))
+
+(defun call-argument-getter (x decoder-function default-thunk errmsg)
+ (handler-case
+ (if x
+ (funcall decoder-function x)
+ (funcall default-thunk))
+ (t () (errexit 3 "~&Invalid argument~@[ ~A~]~%~A" errmsg *usage*))))
+
+(def*macro getarg (args decoder &optional default errmsg)
+ `(call-argument-getter
+ (pop ,args)
+ (function ,decoder)
+ #'(lambda () ,default)
+ ,errmsg))
+
+(def*macro no-more-args (args)
+ `(getarg ,args bork nil "(too many arguments)"))
+
+(def*fun start-test (&rest r)
+ (let ((foo (getarg r parse-integer 0)))
+ (no-more-args r)
+ (DBG :test
+ foo
+ (gethostname)
+ (process-registry-log))))
+
+(def*fun program-help (&rest r)
+ (let ((s (if (streamp (car r)) (car r) *error-output*)))
+ (format s "~&Available actions:~%")
+ (loop with d = nil for x in *startup-actions* do
+ (if (car x)
+ (format s " ~{~A~*~#[~; ~A~]~}~%" x)
+ (setf d (third x)))
+ finally (when d (format s "default action: ~A~%" d)))))
+
+(defun start-server (&rest r)
+ (let* ((port (getarg r parse-integer 6666)))
+ (no-more-args r)
+ (start-single-threaded-server :client-starter #'start-clients)
+ (task-loop)))
+
+(defun start-client (&rest r)
+ (let* ((server (getarg r identity (bork)))
+ (port (getarg r parse-integer (bork))))
+ (no-more-args r)
+ (validate-and-start-client server port)))
+
+(defun start-worker (&rest r)
+ (let* ((server (getarg r identity (bork)))
+ (port (getarg r parse-integer (bork)))
+ (id (getarg r read-from-string (bork)))
+ (job (getarg r read-from-string (bork))))
+ (no-more-args r)
+ ;;---***
+ (DBG :start-worker server port id job)))
+
+(defun query-status (&rest r)
+ (no-more-args r)
+ ;;---***
+ (DBG :query-status))
+
+(def*fun main (args)
+ (let* ((action (pop args))
+ (fun (second (assoc action *startup-actions* :test #'equalp))))
+ (catch :exit
+ (if fun
+ (progn (apply fun args) 0)
+ (errexit 2 "~&Invalid action ~S~%~A" action *usage*)))))
+
+(def*fun startup ()
+ (cl-launch:quit (main cl-launch:*arguments*)))
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Trivial SEXP server.
+
+;; BIG FAT WARNING!!!!
+;; safe-read is subject to DoS attacks whereby attackers force the server
+;; to intern a lot of useless symbols until it runs out of memory.
+;; THIS NOT FOR USE IN THE WILD! Only on safe (virtual) private networks.
+;;
+;; If you want a robust server, you'll have to supply your own robust parser.
+;; And handle all kind of weird issues such as handling character encoding,
+;; not to speak about avoiding extra consing for performance.
+;; I recommend then trying UDF-A.
+
+(in-package :philip-jose)
+
+(defparameter *request-heads* (make-hash-table :test #'equal))
+
+(def*fun register-request-head (x func)
+ (setf (gethash x *request-heads*) func))
+
+(def*macro defun-request-handler (name formals &body body)
+ `(progn
+ (defun ,name ,formals ,@body)
+ (register-request-head ,(conc-keyword name) (function ,name))))
+
+(def*fun call-request-handler (f &rest args)
+ (let ((fun (gethash f *request-heads*)))
+ (if fun
+ (apply fun args)
+ (error "Not a registered request type ~S" f))))
+
+
+(defun handle-sexp-request (request)
+ (if (consp request)
+ (apply #'call-request-handler request)
+ (error "Bad request ~S" request)))
+
+(defun sexp-request-handler ()
+ (handle-sexp-request (safe-read)))
+
+(defun handle-sexp-request-from-stream (i &optional (o i))
+ (let* ((*standard-input* i)
+ (*standard-output* o))
+ (sexp-request-handler)))
+
+(defun handle-sexp-request-from-socket (socket)
+ (handle-sexp-request-from-stream socket))
+
+(def*fun reply (x)
+ (safe-write x)
+ (princ +crlf+)
+ (finish-output))
+
+(def*fun reply* (&rest x)
+ (reply x))
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Various utilities (to move to some other package...)
+
+(in-package :philip-jose)
+
+(exporting-definitions
+
+;; depends on closer-mop
+(defun collect-slots (object &optional (slot-list t))
+ (loop with class = (class-of object)
+ with slots = (if (eq slot-list t) (compute-slots class) slot-list)
+ for slot in slots
+ for name = (slot-definition-name slot)
+ for iarg = (or (first (slot-definition-initargs slot)) name)
+ nconc (when (slot-boundp object name) `(,iarg ,(slot-value object name)))))
+
+(defun simple-print-object (object stream &key identity (slots t))
+ (with-output (stream)
+ (print-unreadable-object (object stream :type t :identity identity)
+ (write (collect-slots object slots) :stream stream))))
+
+(defgeneric slots-to-print (object)
+ (:method ((object t))
+ t))
+
+(defclass simple-print-object-mixin (standard-object)
+ ())
+
+(defmethod print-object ((object simple-print-object-mixin) stream)
+ (simple-print-object object stream :slots (slots-to-print object)))
+
+;; depends on arnesi
+(defun funkall (f &rest args)
+ (etypecase f
+ (function
+ (apply f args)) ; fast path for functions
+ (boolean
+ f) ; booleans evaluate as constant functions returning self -- must be before symbols
+ ((and symbol (satisfies arnesi::fdefinition/cc))
+ (with-call/cc (apply (arnesi::fdefinition/cc f) args)))
+ (symbol
+ (apply f args))
+ (arnesi::closure/cc
+ (with-call/cc (apply f args)))
+ (cons
+ (apply #'kall f args)))) ; arnesi represents continuations as lists
+
+;; This code should be moved to iolib-posix and exported.
+;; gettime should be renamed get-monotonic-time and exported, too.
+(defun get-real-time ()
+ (multiple-value-bind (sec nsec)
+ (et:clock-get-time et:clock-realtime)
+ (+ sec (* nsec 1d-9))))
+
+(defconstant +unix-epoch+ (encode-universal-time 0 0 0 1 1 1970 0))
+
+(defun fsleep (delay)
+ (when (plusp delay)
+ (multiple-value-bind (i d) (floor delay)
+ (unless (zerop i)
+ ;; TODO: here insert check for overflow...
+ (et::sleep i))
+ (unless (zerop d)
+ (et::usleep (floor (* d 1000000))))))
+ nil)
+
+(defmacro with-error-message-handler ((&rest handler) &body body)
+ (with-gensyms (err)
+ `(handler-bind
+ ((simple-condition (lambda (,err)
+ (,@handler
+ (apply #'format nil
+ (simple-condition-format-control ,err)
+ (simple-condition-format-arguments ,err)))))
+ (t (lambda (,err) (,@handler (format nil "~A" ,err)))))
+ ,@body)))
+
+
+);exporting-definitions
--- /dev/null
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Trivial Job tracker
+(in-package :philip-jose)
+
+;;; Simple worker...
+(defparameter *job-handlers* (make-hash-table :test #'equal))
+
+(def*fun register-job-handler (x func)
+ (setf (gethash x *job-handlers*) func))
+
+(def*macro defun-job (name formals &body body)
+ `(progn
+ (defun ,name ,formals ,@body)
+ (register-job-handler ,(conc-keyword name) #',name)))
+
+(def*fun call-job-handler (f &rest args)
+ (let ((fun (gethash f *job-handlers*)))
+ (if fun
+ (apply fun args)
+ (error "Not a registered response type ~S" f))))
+
+(defun worker-step (&key completed results error (server "localhost") (port *port*))
+ (flet ((result-keys ()
+ (when completed
+ (list* :completed completed
+ (append
+ (when results (list :results results))
+ (when error (list :error error)))))))
+ (let ((work
+ (flet ((handle-error (x)
+ (logger "~&Error while talking to server: ~A" x)
+ #+sbcl (sb-impl::backtrace 20 t)
+ (throw 'wemh nil)))
+ (catch 'wemh
+ (with-error-message-handler (handle-error)
+ (simple-client
+ (list* :worker-request (get-id) (result-keys))
+ server port))))))
+ (setf completed nil results nil error nil)
+ (cond
+ ((null work)
+ (logger "~&No work to do")
+ (fsleep *sleep-delay*))
+ (t
+ (logger "~&Work to do: ~S" work)
+ ((lambda (x) (apply x (rest work)))
+ (ecase (first work)
+ (:sleep #'fsleep)
+ (:die #'exit)
+ (:job (lambda (&key id description)
+ (setf completed id results
+ (flet ((handle-error (x)
+ (logger "~&Error while processing ~S: ~A" description x)
+ (setf error x)
+ (throw 'wemh nil)))
+ (catch 'wemh
+ (with-error-message-handler (handle-error)
+ (multiple-value-list
+ (apply #'call-job-handler description))))))))))))
+ (result-keys))))
+
+(defun worker-loop (&key (server "localhost") (port *port*))
+ (let ((keys nil))
+ (loop
+ (setf keys
+ (handler-case
+ (apply #'worker-step :server server :port port keys))))))
+