Import from mtn repo
authorFrancois-Rene Rideau <fare@tunes.org>
Sat, 26 Sep 2009 02:40:01 +0000 (22:40 -0400)
committerFrancois-Rene Rideau <fare@tunes.org>
Sat, 26 Sep 2009 02:40:01 +0000 (22:40 -0400)
22 files changed:
Makefile [new file with mode: 0644]
README [new file with mode: 0644]
farmer.lisp [new file with mode: 0644]
fork.lisp [new file with mode: 0644]
frobork.c [new file with mode: 0644]
incremental-parsing.lisp [new file with mode: 0644]
local-tasks.lisp [new file with mode: 0644]
logger.lisp [new file with mode: 0644]
machines.lisp [new file with mode: 0644]
manager.lisp [new file with mode: 0644]
package.lisp [new file with mode: 0644]
philip-jose.asd [new file with mode: 0644]
register-machine [new file with mode: 0755]
registry.lisp [new file with mode: 0644]
specials.lisp [new file with mode: 0644]
tcp-client.lisp [new file with mode: 0644]
tcp-server.lisp [new file with mode: 0644]
test.lisp [new file with mode: 0644]
trivial-command-line.lisp [new file with mode: 0644]
trivial-sexp-server.lisp [new file with mode: 0644]
utilities.lisp [new file with mode: 0644]
worker.lisp [new file with mode: 0644]

diff --git a/Makefile b/Makefile
new file mode 100644 (file)
index 0000000..3e42153
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,7 @@
+FARMER_PATH=/home/fare/off-hours-computations/lisp/
+LISP_SYSTEMS=/home/fare/lib/lisp-systems/
+SYSTEM=philip-jose
+LISP=sbcl
+
+farmer.sh: setup.lisp
+       cl-launch -p ${LISP_SYSTEMS} -p ${FARMER_PATH} -l ${LISP} -f $< -s ${SYSTEM} -r philip-jose::farmer-start -o $@
diff --git a/README b/README
new file mode 100644 (file)
index 0000000..2c0efbe
--- /dev/null
+++ b/README
@@ -0,0 +1,188 @@
+==========================================================
+PHILIP-JOSE, a farmer for distributed computations in Lisp
+==========================================================
+
+Summary
+=======
+
+This package implements a "farmer": a program to control a farm of computers
+and coordinate them into achieving a large computation.
+
+The basic usage model is that all the flow control happens in the farmer,
+whereas all the intensive computation happens in worker processes
+each of which needs only know how to do a simple part of the whole.
+The overall computation may have many components and sub-components
+that include both massive parallelism and sequential dependencies,
+where the control-flow that may depend on the intermediary results,
+where failure of components can happen, be detected and acted upon, etc.
+
+
+License
+=======
+
+This software is released under the bugroff license. Use at your own risk.
+
+       http://www.geocities.com/SoHo/Cafe/5947/bugroff.html
+
+At the insistence of several hackers, I hereby state what is obvious to me,
+that they can reuse any software released under the bugroff license and
+publish it as part or totality of packages under any other license they see fit
+if it really matters to them, including a BSD-style license or a MIT license.
+Yes they can. Of course, if they choose a proprietary software license,
+they only deserve scorn. But even that, they may do!
+
+
+Communication
+=============
+
+Communication happens through a simple request/response protocol (like HTTP).
+The current protocol is designed to make it very easy to pass around
+Lisp data as the arguments and results of computations. It does that well,
+but its current incarnation has many limitations. Most importantly,
+the current system is intrinsically unsecure. DO NOT USE IT ON THE INTERNET.
+Use it only within a trusted network, or wrapped over encrypted lines.
+Also, the protocol is NOT designed for transmitting large amounts of data.
+If workers need share data with the farmer and/or with each other,
+they should access it through independent means, such as a file server.
+The farmer as it currently is is only designed to handle
+the general flow control of the computation.
+
+Workers are spawned by the farmer itself, using ssh or fork+exec,
+from a farm of registered machines.
+A simple mechanism to register and unregister machines is provided.
+A validation may be run on each worker machine
+to qualify it or not for the current computation.
+Because simple forking (without exec) is not (currently) supported
+(and because it is not usually available for distributed computations),
+an independent means of storing and retrieving data
+is once again needed if workers are to share it.
+
+
+Flow Control
+============
+
+On the plus side, the flow-control of the computation
+can be written in a very natural way.
+The farmer supports for sequential, parallel and first-wins subcomputations.
+Moreover, it provides a universal primitive (i.e. delimited continuations)
+on top of of which you can build your own control structures
+if you ever need something more elaborate.
+And indeed, the previous high-level control structures
+were built in terms of these primitives.
+
+I am told the implementation of continuations I am using (arnesi)
+allows continuations to be serialized, so in theory,
+you could build upon this to achieve mobile code between several farmers
+(as in the Tube), persistent threads, load-balancing, etc.
+
+Note that arnesi's call/cc and kall are more like shift and reset
+than like Scheme's call/cc. I admit I haven't tried nesting them
+syntactically and see if it still does the right thing, and even less to
+see if it handles exceptions properly.
+I'd have to compare the results to those of the examples given
+in the following documents:
+
+       * http://okmij.org/ftp/Computation/Continuations.html
+       * http://mumble.net/~campbell/scheme/bshift.scm
+       * http://calculist.blogspot.com/2007/01/non-native-shiftreset-with-exceptions.html
+
+
+Threading
+=========
+
+The farmer is written with the assumption that it will work
+on a sequential Lisp implementation, or
+in a single thread of a multithreaded Lisp implementation.
+IT WILL NOT WORK CORRECTLY IN A MULTITHREADED ENVIRONMENT.
+Its concurrent programming activities are built out of
+a home-grown green thread mechanism on top of arnesi's call/cc.
+
+Philip-jose provides green threads intended to be used on the farmer.
+Because of their execution model, all computations between two calls
+to some threading or I/O primitives are done in a same atomic transaction.
+
+The upside is that you can get a lot done without ever having to use locks
+or any other explicit mutual exclusion mechanism --
+just avoid calling any computation with complex control flow between two
+operations the results of which should be seen atomically by other threads.
+Many places in the existing code indeed assume this sequential execution
+to access to various shared data-structures in atomic transactions
+without the explicit use of locks.
+
+The downside is that when you really need such mutual exclusion mechanism,
+it is not provided (yet), and other parts of the code haven't been made
+to use any such mechanism (yet). Writing and providing such mechanisms
+should be pretty easy however, and are let as an exercise to the astute
+contributor.
+
+Though a skeleton of multithreaded server is provided,
+it is not tested, probably buggy, lacking in features, and
+will not work in conjunction with other code in philip-jose.
+I'm told that the author of IOLib is building
+a better, more featureful, more stable, infrastructure for I/O in Lisp.
+Philip-Jose is no such infrastructure.
+
+
+TO-DO List
+==========
+
+  1- package the system into something usable. Provide documentation, etc.
+
+  2- use non-blocking IO to provide for better networking in farmers:
+     the ability to be both a server and client, etc.
+
+  3- find how to serialize continuations to provide richer capabilities:
+   persistent (more robust) threads, and mobile code (including load balancing).
+
+  4- Implement fork() as a more efficient (though trickier) alternative to
+   fork+exec() for spawning new clients that share initial state.
+
+  5- provide a faster alternative to arnesi's with-call/cc, based on e.g. Screamer.
+
+  6- implement interfaces that are compatible with existing distributed systems?
+   (Scheme) Termite, Askemos, Tube, Kali, Dreme, [Something by Queinnec]
+   (CL) StarLisp, NetCLOS, GBBopen, ...
+   Erlang
+
+  7- use hunchentoot or such to support HTTP instead of the current protocol.
+
+  8- implement a simple map/reduce for local-tasks, for worker-jobs
+
+  9- implement pure streams, and mappings between pure & impure streams,
+   so that I/O in competing threads only gets committed in the winning thread;
+   also allows for easy backtracking with I/O operations. One thread may
+   (read-string "foo" s) and just catch the exception and die if not available.
+
+  10- have a get-read-buffer interface to not read character-by-character
+
+  11- have thread IDs, and guard various event-handlers with a test for the thread
+   still being alive. Alternatively have a mechanism to (atomically) remove event-handlers,
+   queued jobs, etc., from their respective queue, set, etc.
+
+  12- enforce atomicity with a global flag \*atomic* or such; when this flag is on,
+   weak yields turn into NOPs and strong yields raise error when it is on.
+   Now we also have to distinguish between weak yields and strong yields.
+
+  13- Have sub-queues, etc., for scheduling.
+
+  14- provide some explicit mechanisms for mutual-exclusion and transactionality.
+
+  15- merge with Erlang-in-Lisp
+
+
+Behaving like an Erlang node
+============================
+
+  * see distel: http://bc.tech.coop/blog/070528.html
+
+  * the Erlang distribution protocol can be found in the source release at
+    erts/emulator/internal_doc/erl_ext_dist.txt
+
+
+Using ancillary data to BSD UNIX sockets
+========================================
+
+If you are send multiple fds in a same sendmsg()
+make sure you send them all in a single CMSG
+because multiple CMSGs of same type are broken on many kernels.
+man cmsg. See SCM_RIGHTS.
diff --git a/farmer.lisp b/farmer.lisp
new file mode 100644 (file)
index 0000000..866572c
--- /dev/null
@@ -0,0 +1,71 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Main loop for the Philip-Jose Farmer
+
+;;; Manage a farm of processes each doing a part of a grand job.
+
+(in-package :philip-jose)
+
+(defparameter *philip-jose-farmer-version* 0)
+
+(defun-request-handler :farmer-version ()
+  (reply *philip-jose-farmer-version*))
+
+(defun-request-handler :square (n)
+  (reply (* n n)))
+
+(defun-request-handler :1+ (n)
+  (reply (1+ n)))
+
+(defun-request-handler :reload ()
+  (reply (asdf:oos 'asdf:load-op *farmer-system*)))
+
+(defun-request-handler :register-client (id &key valid explanation)
+  (logger "~&Registering client~{ ~A~}" id)
+  (reply t))
+
+(defun-request-handler :armageddon ()
+  (flet ((kill-worker-id (worker-id)
+           (destructuring-bind (machine process connect-time) worker-id
+             (declare (ignorable connect-time))
+             (logger "~&Armageddon killing pid ~D on ~A" process machine)
+             (kill-machine-process machine process))))
+    (maphash (lambda (worker-id status)
+               (declare (ignorable status))
+               (kill-worker-id worker-id))
+             *registered-workers*)
+    (maphash (lambda (job-id job)
+               (declare (ignorable job-id))
+               (kill-worker-id (car (job-status job))))
+             *claimed-worker-jobs*))
+  (reply t)
+  (logger "~&Armageddon quitting")
+  (sb-ext:quit))
+
+(defun-job fake (m)
+  (setf *random-state* (make-random-state t))
+  (let ((x (* 1d-3 (random 10000))))
+    (DBG :fake m x)
+    (fsleep x)
+    (values m x)))
+
+
+(defun show-tasks ()
+  (let ((*print-level* 4))
+    (logger "~&scheduled local-tasks: ~S~%" (fifo-head *scheduled-local-tasks*))
+    (logger "~&timed local-tasks: ~S~%" (container-contents *timed-local-tasks*))
+    (logger "~&scheduled worker jobs: ~S~%" (fifo-head *scheduled-worker-jobs*))
+    (logger "~&claimed worker jobs: ~S~%" (hash-table->alist *claimed-worker-jobs*)))
+  nil)
+
+(defun clear-tasks ()
+  (fifo-empty! *scheduled-local-tasks*)
+  (empty-container! *timed-local-tasks*)
+  (fifo-empty! *scheduled-worker-jobs*)
+  (clrhash *claimed-worker-jobs*)
+  nil)
+
+#|
+(trace call-request-handler handle-sexp-request)
+
+(hash-table->alist *request-heads*)
+|#
diff --git a/fork.lisp b/fork.lisp
new file mode 100644 (file)
index 0000000..e36c5f0
--- /dev/null
+++ b/fork.lisp
@@ -0,0 +1,124 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Forking
+
+(in-package :philip-jose)
+
+#-sbcl
+(progn
+  (defvar *expected-children 2.1)
+  (defvar *expected-space* 16000000))
+
+#+sbcl
+(progn
+  ;; Simple heuristic: if we have allocated more than the given ratio
+  ;; of what is allowed between GCs, then trigger the GC.
+  ;; note: can possibly modify parameters and reset in sb-ext:*after-gc-hooks*
+  (defparameter *prefork-allocation-reserve-ratio* .10) ; default ratio: 10%
+  (defun should-i-gc-p ()
+    (let ((available-bytes (- (sb-alien:extern-alien "auto_gc_trigger" sb-alien:long)
+                              (sb-kernel:dynamic-usage)))
+          (allocation-threshhold (sb-ext:bytes-consed-between-gcs)))
+      (< available-bytes (* *prefork-allocation-reserve-ratio* allocation-threshhold)))))
+
+(defun pre-fork-hook ()
+  #+sbcl
+  (when (should-i-gc-p)
+    (sb-ext:gc))
+  nil)
+
+(defun do-fork ()
+  #-sbcl (et:fork)
+  #+sbcl (sb-posix:fork))
+
+(defun post-fork-child-cleanup ()
+  nil)
+
+(defun post-fork-parent-hook (pid)
+  pid)
+
+(defun fork ()
+  (pre-fork-hook)
+  (let ((pid (do-fork)))
+    (if (zerop pid)
+        (post-fork-child-cleanup)
+        (post-fork-parent-hook pid))
+    pid))
+
+
+(defvar *cleanup-pusher* nil
+  "dynamically-scoped function for declaring cleanups")
+(defun push-cleanup (cleanup)
+  (if *cleanup-pusher*
+      (funcall *cleanup-pusher* cleanup)
+      (error "No cleanup pusher declared")))
+(defun call-with-cleanups (thunk)
+  (let ((cleanups nil))
+    (flet ((push-cleanup (cleanup) (push cleanup cleanups)))
+      (unwind-protect
+          (funcall thunk #'push-cleanup)
+       (dolist (cleanup cleanups)
+         (funcall cleanup))))))
+(defun
+
+(defun call-with-foreign-temporaries (thunk)
+  (let (temporaries)
+    (flet ((alloc (s)
+            (push s temporaries)))
+      (unwind-protect
+          (funcall thunk alloc)
+       (loop for s in temporaries do
+             (apply #'cffi:free-converted-object s)))))))
+
+(defmacro with-foreign-temporaries (&body body)
+  (with-gensyms (s)
+    `(call-with-foreign-temporaries
+      (lambda (,s)
+       (macrolet ()
+         ,@body)))))
+
+(cffi:defcenum frobork-kode
+  :return
+  :execve
+  :dup2
+  :close
+  :fchdir
+  :setuid
+  :setgid
+  :signal
+  :setsid
+  :setpgid
+  :tcsetpgrp)
+
+(cffi:defcfun (c-frobork "frobork") :int (kode :pointer))
+
+(defun frobork-convert (v)
+  (typecase v
+    (integer
+     v)
+    (keyword
+     
+    (string
+     (cffi:convert-to-foreign v :string))
+    (integer
+     v)
+
+
+
+(defun frobork (kl)
+  (let ((l (length kl)))
+    (with-foreign-object (kode :int (1+ l))
+      (let ((alloc-state
+            (loop with converted = nil
+                  with state = nil
+                  for i below l
+                  for v in kl
+                  do (multiple-value-setq (converted state)
+                       (frobork-convert v))
+                  (setf (mem-aref kode :int i) converted)
+                  when state
+                  collect state)))
+       (setf (mem-aref kode :int l) 0)
+       (prog1
+           (c-frobork kode)
+         (loop for s in alloc-state
+               do (apply #'cffi:free-converted-object s)))))))
diff --git a/frobork.c b/frobork.c
new file mode 100644 (file)
index 0000000..2bddbb8
--- /dev/null
+++ b/frobork.c
@@ -0,0 +1,143 @@
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+
+enum frobork_kode {
+        FK_RETURN,
+        FK_EXECVE,
+        FK_DUP2,
+        FK_CLOSE,
+        FK_FCHDIR,
+        FK_SETUID,
+        FK_SETGID,
+        FK_SIGNAL,
+        FK_SETSID,
+        FK_SETPGID,
+        FK_TCSETPGRP
+};
+
+
+/*
+ * frobork.c
+ */
+
+#define XX (*kode++)
+#define T(type) ((type)XX)
+#define X T(int)
+#define TRY2(x,y) if((x)==(y)) { goto on_error; } ; break;
+#define TRY(x) TRY2(x,-1)
+#define OBJLEN(x) (x),sizeof(x)
+#define GETX {x=X;}
+
+#if 0 /* support for any of the types we use being larger than an int */
+#define MAXUINTP1 (1ULL+(unsigned long long)((unsigned int)-1))
+#define SIZUI (sizeof(unsigned int))
+#define SIZULL (sizeof(unsigned int))
+
+unsigned long long read_and_increment_pointer (unsigned int**p, unsigned int size) {
+  unsigned long long r;
+  unsigned int *q = *p;
+  unsigned int i = 0;
+  unsigned long long factor = 1;
+
+  if (size <= SIZUI) {
+    r = *q;
+    q++;
+  } else   if (size > SIZULL) {
+      write(2,OBJLEN("frobork: bad int size\n"));
+      exit(43);
+  } else {
+    r = 0;
+    while (i<size) {
+      r += *q*f;
+      f *= MAXUINTP1;
+      q++;
+      i+=SIZUI;
+    }
+  }
+  *p = q;
+  return r;
+}
+#define T(type) ((type)read_and_increment_pointer((unsigned int**)&kode,sizeof(type)))
+#endif
+
+int frobork (int* kode)
+{
+        pid_t pid, p1,p2;
+        int x;
+       sighandler_t sh;
+        char *a,**b,**c;
+
+        pid = fork();
+        if (pid) {
+                return pid;
+        }
+
+        while (1) {
+                switch (X) {
+                case FK_RETURN:
+                        return(0);
+                case FK_EXECVE:
+                        a=T(char *);b=T(char **);c=T(char **);
+                        TRY(execve(a,b,c));
+                        break;
+                case FK_DUP2:
+                        GETX; TRY(dup2(x,X));
+                        break;
+                case FK_CLOSE:
+                        TRY(close(X));
+                        break;
+                case FK_FCHDIR:
+                        TRY(fchdir(X));
+                        break;
+                case FK_SETUID:
+                        TRY(setuid(T(uid_it)));
+                        break;
+                case FK_SETGID:
+                        TRY(setgid(T(gid_it)));
+                        break;
+                case FK_SIGNAL:
+                        GETX;
+                       switch(X) {
+                       case 0: sh = SIG_...;
+                       case 1: sh = SIG_...;
+                       case 2: sh = SIG_...;
+                       }
+                       TRY2(signal(x,y,SIG_ERR);
+                        break;
+                case FK_SETSID:
+                        TRY(setsid());
+                        break;
+                case FK_SETPGID:
+                        p1=T(pid_t);p2=T(pid_t);
+                        TRY(setpgid(p1,p2));
+                        break;
+                case FK_TCSETPGRP:
+                        GETX;TRY(tcsetpgrp(x,T(pid_t)));
+                        break;
+                default:
+                        write(2,OBJLEN("frobork: bad code\n"));
+                        exit(42);
+                }
+        }
+        return 0;
+on_error:
+        perror("frobork");
+        exit(44);
+}
+
+#ifdef TEST_FROBORK
+int main (int argc, char** argv) {
+        int kode[] = {
+                FK_DUP2, 0, 4,
+                FK_CLOSE, 4,
+                FK_SETSID,
+                FK_RETURN
+        };
+        argc++;argv++;
+        printf("pid=%d frobork returned %d\n",getpid(),frobork(kode));
+        return 0;
+}
+#endif
diff --git a/incremental-parsing.lisp b/incremental-parsing.lisp
new file mode 100644 (file)
index 0000000..fba825c
--- /dev/null
@@ -0,0 +1,115 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Main loop for the Philip-Jose Farmer
+
+;;;;; Incremental parser protocol
+
+(in-package :philip-jose)
+
+;;;; The Incremental parser protocol
+;;
+;; This protocol allows for a single-threaded server to monitor many connections at once,
+;; possibly on many ports, with each connection having partial request message that will
+;; either be trivially handled, or lead to spawning after the request is complete.
+;;
+;; An incremental parser function is given a buffer (byte or char array),
+;; the size up to which the buffer is filled, and
+;; the previous (incomplete) state of the incremental parser, or nil on the first attempt.
+;; It returns two values. The first is a boolean that tells if the parse was successful.
+;; If the first value is T, then the second value is the result of the parse.
+;; If the first value is NIL, then the second value is the state of the incremental parser,
+;; from which to resume when the buffer is made more full.
+;;
+;; In a world with any kind of first-class (partial) continuations, either linear or multi-entry,
+;; any direct parser could be made incremental, by having the READ-CHAR primitive return the
+;; partial continuation of the parser to the caller.
+;; Maybe we can hook Screamer or arnesi's call/cc into here.
+
+;; TODO:
+;; * add a buffer management protocol to make the thing work for bigger requests and
+;;  on-going conversations
+
+;;;; Trivial implementations of simple cases
+
+
+;;; Line parser -- the request in complete at the end of the first line
+(defun line-end-code-p (cc)
+  (member cc '(10 13)))
+(defun line-end-char-p (ch)
+  (line-end-code-p (char-code ch)))
+
+(defun trivial-incremental-line-parser (buffer size state)
+  ;; Implements the incremental parser protocol for character input, detecting a full line.
+  (let* ((line-end-pos (position-if #'line-end-char-p buffer :start (or state 0))))
+    (if line-end-pos
+      (values t line-end-pos)
+      (values nil size))))
+
+
+;;; Wrapper that prevents too many retries
+;; Drop (presumably buggy or malevolent) connections with too many packets for the initial request.
+;; As in attempts to flood with small packets, connection with a lot of fragmentation,
+;; malformed request, overlarge request, buffer overflow tentative.
+
+(define-condition too-many-attempts ()
+  ()
+  (:report (lambda (condition stream)
+             (declare (ignore condition))
+            (format stream "Too many attempts in incremental request parsing"))))
+
+(defun incremental-parser-attempt-limiter (max-attempts incremental-parser)
+  (labels ((check (x)
+               (when (>= x max-attempts)
+                 (error 'too-many-attempts)))
+           (try (buffer size state)
+             (multiple-value-bind (count inner-state)
+                 (if state (values (car state) (cdr state)) (values 0 nil))
+               (check count)
+               (multiple-value-bind (successp x)
+                   (funcall incremental-parser buffer size inner-state)
+                 (if successp
+                   (values t x)
+                   (let ((c (1+ count)))
+                     (check c)
+                     (values nil (cons c x))))))))
+    (if (typep max-attempts '(unsigned-byte 16))
+      #'try
+      incremental-parser)))
+
+
+;;; Trivial transformation of a parser into a pseudo incremental parser.
+;; Restarting from the beginning everytime.
+;; It's inefficient and a big DoS target unless combined with the limiter above.
+
+(defun pseudo-incremental-parser-from-reader (reader)
+  ;; Implements the incremental parser protocol for character input, given a reader.
+  ;; Not really incremental, always restart from scratch, so called pseudo.
+  ;; Could do better with call/cc and an open stream protocol.
+  #'(lambda (buffer size state &aux index)
+      (declare (ignore state))
+      (handler-case
+          (values t (cons (with-input-from-string (s buffer :index index :start 0 :end size)
+                           (funcall reader s t))
+                         index))
+       (end-of-file () (values nil nil)))))
+
+
+;;; Use of the Lisp reader as a parser.
+;; NOT FOR USE IN THE WILD! See warning in trivial-sexp-server.lisp
+
+(let ((f (pseudo-incremental-parser-from-reader #'safe-read)))
+  (defun make-incremental-reader (&optional (max-attempts 4))
+    (incremental-parser-attempt-limiter max-attempts f)))
+
+
+(defun incrementally-parsed-request-handler (connection buffer size state &optional (handler #'handle-sexp-request))
+  (with-trivial-logging ()
+    (destructuring-bind (request . index) state
+      (with-open-stream (bufin (make-string-input-stream buffer index size))
+        (with-open-stream (in (make-concatenated-stream bufin connection))
+          (let ((*standard-input* in)
+                (*standard-output* connection))
+            (funcall handler request)))))))
+
+(defun make-incrementally-parsed-request-handler (h)
+  #'(lambda (connection buffer size state)
+      (incrementally-parsed-request-handler connection buffer size state h)))
diff --git a/local-tasks.lisp b/local-tasks.lisp
new file mode 100644 (file)
index 0000000..433ca28
--- /dev/null
@@ -0,0 +1,290 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Manage tasks in the local Lisp image
+;;;; Tasks can be scheduled on a queue,
+;;;; to be processed either immediately or when a given start time arrives.
+(in-package :philip-jose)
+
+;;(declaim (optimize (speed 3) (safety 3) (debug 3)));DEBUG
+
+;; All of this assumes single-threaded execution.
+;; If you want concurrent execution, you need to either modify this code and add locking,
+;; Or to isolate instances that run this code in separate threads that each have a
+;; thread-local binding for each of the special variables used.
+
+
+;;; Tasks
+(defclass local-task ()
+  ((callback :accessor task-callback :initarg :callback)
+   (name :accessor task-name :initarg :name :initform nil)))
+
+(defclass timed-local-task (local-task)
+  ((start-time :accessor task-start-time :initarg :start-time :initform nil)))
+
+(defvar *current-local-task* nil)
+
+
+;;; Scheduled tasks (to executed ASAP in FIFO order)
+
+(defparameter *scheduled-local-tasks*
+  (make-fifo)
+  "tasks that need to be executed locally")
+
+(defun enqueue-immediate-local-task (task)
+  (fifo-enqueue task *scheduled-local-tasks*))
+
+(defun schedule-immediate-local-task (callback &key name)
+  (enqueue-immediate-local-task
+   (make-instance 'local-task
+     :callback callback :name name))
+  nil)
+
+(defun process-local-tasks ()
+  (loop until (fifo-empty-p *scheduled-local-tasks*) do
+    (let ((*current-local-task* (fifo-dequeue *scheduled-local-tasks*)))
+      (funkall (task-callback *current-local-task*)))))
+
+
+;;; Timed tasks
+
+(defun time-sooner-p (t1 t2)
+  (cond
+    ((null t2) nil)
+    ((null t1) t)
+    (t (< t1 t2))))
+
+(defun task-scheduled-sooner-p (task1 task2)
+  (time-sooner-p (task-start-time task1) (task-start-time task2)))
+
+(defparameter *timed-local-tasks*
+  (make-instance 'binary-heap :lessp #'task-scheduled-sooner-p)
+  "tasks to be executed on farmer at a given time. e.g. timeout functions")
+
+(defun schedule-timed-local-task (callback &key start-time name)
+  (insert-item!
+   *timed-local-tasks*
+   (make-instance 'timed-local-task
+     :callback callback :start-time start-time :name name))
+  nil)
+
+(defun next-timed-task-start-time ()
+  (when-bind task
+      (unless (container-empty-p *timed-local-tasks*)
+        (least-item *timed-local-tasks*))
+    (task-start-time task)))
+
+(defun process-timed-task-heap (&optional (time (et:gettimeofday)))
+  (loop for next-start-time = (next-timed-task-start-time)
+    while (and next-start-time (< next-start-time time)) do
+    (enqueue-immediate-local-task (pop-least-item! *timed-local-tasks*))))
+
+(defun schedule-local-task (callback &key start-time name)
+  (if start-time
+      (schedule-timed-local-task callback :start-time start-time :name name)
+      (schedule-immediate-local-task callback :name name))
+  nil)
+
+
+;;; Event-Base
+
+(defun process-events ()
+  (if (iomux::event-base-empty-p *event-base*)
+      (process-no-events)
+      (do-process-events)))
+
+;;(defparameter *event-processing-task*
+;;  (make-instance 'local-task
+;;    :name "Event processor"
+;;    :callback #'process-events))
+;;
+;;(defun reschedule-process-events ()
+;;  (schedule-local-task *event-processing-task*))
+
+(defun process-no-events ()
+  (if-bind time (next-timed-task-start-time)
+    (let ((delay (- time (get-real-time))))
+      (fsleep delay)
+      (process-timed-task-heap))
+    (nothing-to-do)))
+
+(defun nothing-to-do ()
+  (logger "~&Nothing left to do. Quitting.")
+  (throw :exit 0))
+
+(defun do-process-events ()
+  (let* ((time (next-timed-task-start-time))
+         (now (get-real-time))
+         (timeout (when time (max 0 (- time now)))))
+    (event-dispatch *event-base* :timeout timeout :only-once t)))
+
+
+;;; General Loop
+
+(defun task-step ()
+  (process-local-tasks)
+  (process-timed-task-heap)
+  (process-local-tasks)
+  (process-events))
+
+(def*fun task-loop ()
+  (loop (task-step)))
+
+(defun task-steps (&optional (repeat 1))
+  (catch :exit
+    (if repeat
+        (loop :repeat repeat
+          :do (task-step))
+        (task-loop))))
+
+;;; Local tasks as a cooperative threading mechanism
+
+(defun/cc yield-local-task ()
+  (let/cc k (schedule-local-task k)))
+
+(defun/cc sleep/cc (duration)
+  (let/cc k (schedule-timed-local-task k :start-time (+ duration (get-real-time)))))
+
+
+(defun/cc call-in-forks (things &key first-one-later)
+  (loop for thing in (if first-one-later things (cdr things))
+        do (schedule-local-task thing))
+  (unless first-one-later
+    (funcall (wrap (car things)))))
+
+(defmacro do-in-forks (&body body)
+  `(call-in-forks (list ,@(mapcar (lambda (x) `(lambda () ,x)) body))))
+
+
+;; Rendez-vous
+(defparameter *local-task-rendez-vous* nil)
+
+(defun/cc call-with-rendez-vous (thunk)
+  (let/cc k
+    (let ((remaining-tasks 0))
+      (labels ((maybe-exit ()
+                 (when (zerop remaining-tasks)
+                   (kall k)))
+               (one-less ()
+                 (decf remaining-tasks)
+                 (maybe-exit))
+               (one-more (thing)
+                 (incf remaining-tasks)
+                 (schedule-local-task
+                  (lambda () (funkall thing) (one-less)))))
+        (funcall thunk one-more)
+        (maybe-exit)))))
+
+(defun/cc call-with-rendez-vous-after (thunk &optional (rendez-vous *local-task-rendez-vous*))
+  (unless rendez-vous
+    (error "No rendez-vous."))
+  (funcall rendez-vous thunk))
+
+(defmacro with-rendez-vous-after ((&optional (rendez-vous '*local-task-rendez-vous*)) &body body)
+  `(call-with-rendez-vous-after
+    (lambda () ,@body)
+    ,rendez-vous))
+
+(defmacro with-rendez-vous ((&optional (rendez-vous '*local-task-rendez-vous*)) &body body)
+  `(call-with-rendez-vous
+    (lambda (,rendez-vous) ,@body)))
+
+
+;;; Do In Parallel (and join at the end)
+(defun/cc call-in-parallel (thunks)
+  (with-rendez-vous (done)
+    (dolist (thunk thunks)
+      (call-with-rendez-vous-after thunk done))))
+
+(defmacro do-in-parallel (&body body)
+  `(call-in-parallel (list ,@(mapcar (lambda (x) `(lambda () ,x)) body))))
+
+
+;;; Map In Parallel (return a list with results in first-terminates-last order)
+(defun/cc map-in-parallel (function &rest lists)
+  (let ((results ()))
+    (call-in-parallel
+     (apply mapcar
+            #'(lambda (&rest args)
+                #'(lambda () (push (apply function args) results)))
+            lists))
+    results))
+
+
+;; First to finish wins
+(defstruct (local-task-competition
+             (:conc-name local-task-competition-)
+             (:constructor make-local-task-competition (compete maybe-lose maybe-win)))
+  (compete)
+  (maybe-lose)
+  (maybe-win))
+
+(defun/cc call-with-local-task-competition (thunk)
+  (let/cc k
+    (let ((competition-over-p nil))
+      (labels ((maybe-win (&rest r)
+                 (unless competition-over-p
+                   (setf competition-over-p t)
+                   (apply #'kall k r)))
+               (maybe-lose ()
+                 (let/cc k
+                   (unless competition-over-p
+                     (kall k))))
+               (compete (thunk)
+                 (maybe-lose)
+                 (schedule-local-task
+                  (lambda () (multiple-value-call #'maybe-win (funcall thunk))))))
+        (funcall thunk (make-local-task-competition #'compete #'maybe-lose #'maybe-win))))))
+
+(defparameter *local-task-competition* nil)
+
+(defun/cc call-as-local-task-competitor (thunk &optional (competition *local-task-competition*))
+  (unless competition
+    (error "No local task competition."))
+  (funcall (local-task-competition-compete competition) thunk))
+
+(defmacro local-task-competitor
+    ((&optional (competition '*local-task-competition*)) &body body)
+  `(call-as-local-task-competitor (lambda () ,@body) ,competition))
+
+(defun/cc maybe-lose-local-task-competition (&optional (competition *local-task-competition*))
+  (unless competition
+    (error "No local task competition."))
+  (funcall (local-task-competition-maybe-lose competition)))
+
+(defun/cc maybe-win-local-task-competition
+    (&optional (competition *local-task-competition*) &rest r)
+  (unless competition
+    (error "No local task competition."))
+  (apply (local-task-competition-maybe-win competition) r))
+
+(defmacro with-local-task-competition
+    ((&optional (competition '*local-task-competition*)) &body body)
+  `(call-with-local-task-competition
+    (lambda (,competition) ,@body)))
+
+#|;Working test:
+(defun/cc foo (nrepeat char maybe-lose)
+  (loop repeat nrepeat do (yield-local-task) (funcall maybe-lose) (write-char char)) char)
+(schedule-local-task
+ (lambda ()
+   (with-call/cc
+     (DBG (with-local-task-competition (competition)
+            (loop for c across "ABCDEFGHI" do
+              (let ((x c))
+                (local-task-competitor (competition)
+                  (foo (+ 2 (random 100)) x
+                       (lambda () (maybe-lose-local-task-competition competition)))))))))))
+(process-local-tasks)
+|#
+
+;; timeout
+(defun/cc call-with-local-timeout (timeout thunk)
+  (with-local-task-competition (speed-competition)
+    (when timeout
+      (local-task-competitor (speed-competition) (sleep/cc timeout)))
+    (maybe-lose-local-task-competition speed-competition)
+    (maybe-win-local-task-competition
+     speed-competition
+     (funcall thunk speed-competition))))
+
+(defmacro with-local-timeout ((timeout &optional (competition (gensym))) &body body)
+  `(call-with-local-timeout ,timeout (lambda (,competition) (declare (ignorable ,competition)) ,@body)))
diff --git a/logger.lisp b/logger.lisp
new file mode 100644 (file)
index 0000000..6999ec3
--- /dev/null
@@ -0,0 +1,25 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Trivial Logging mechanism
+
+(in-package :philip-jose)
+
+(defun trivial-logger (s fmt &rest args)
+  (with-standard-io-syntax
+    (let ((s (or s *error-output*))
+          (*print-pretty* nil)
+          (*print-readably* nil))
+      (format s "~&")
+      (apply #'format s fmt args)
+      (format s "~&")
+      (finish-output s))))
+
+(def*fun logger (fmt &rest args)
+  (apply #'trivial-logger *error-output* fmt args))
+
+(def*macro with-trivial-logging ((&optional s) &body body)
+  `(let ((*error-output* (or ,s *error-output*)))
+    (handler-bind
+        ((condition #'(lambda (condition)
+                        (logger "~&~S~%" condition)
+                        #+sbcl (ignore-errors (sb-debug:backtrace 100)))))
+      ,@body)))
diff --git a/machines.lisp b/machines.lisp
new file mode 100644 (file)
index 0000000..278315a
--- /dev/null
@@ -0,0 +1,115 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Machines and executing on them...
+
+
+(in-package :philip-jose)
+
+(defparameter *hostname* nil)
+(def*fun gethostname ()
+  (or *hostname* (setf *hostname* (sb-unix:unix-gethostname))))
+
+;;(defparameter *domainname* nil)
+;;(defun getdomainname ()
+;;  (or *domainname* (setf *domainname* (sb-unix:unix-getdomainname))))
+
+(defparameter *localhost-names* nil)
+(def*fun get-localhost-names ()
+  (or *localhost-names* (setf *localhost-names* (compute-localhost-names))))
+(defun compute-localhost-names ()
+  (list nil
+        "localhost"
+        (gethostname)))
+;;;  (concatenate 'string (gethostname) "." (getdomainname))))
+
+(defun machine-localhost-p (m)
+  ;;--- the most correct way would be to resolve and then compare to 127.0.0.1/8 and other local addresses from ifconfig output...
+  (member m (get-localhost-names) :test #'equal))
+
+
+
+(def*fun spawn-process-on (machine &rest process-spec)
+  (if (machine-localhost-p machine)
+    (apply #'spawn-process process-spec)
+    (apply #'spawn-remote-process machine process-spec)))
+
+;;;---*** make less SBCL-dependent?
+#-sbcl (error "Philip-Jose requires SBCL at this point")
+(def*fun spawn-process (program &rest args)
+  (sb-ext:run-program (->string program) (mapcar #'->string args)
+                      :wait nil :pty nil :input nil :output nil :error nil))
+
+(defun ->string (x)
+  (typecase x
+    (string x)
+    (symbol (string-downcase (symbol-name x)))
+    (t (format nil "~A" x))))
+
+(def*fun spawn-remote-process (machine program &rest args)
+  (spawn-process *ssh-path*
+                 machine (command-line-from-strings
+                          (mapcar #'->string (cons program args)))))
+
+(defun kill-machine-process (machine pid)
+  (spawn-process-on machine "/bin/kill" -9 pid))
+
+(defun pts-old-p (p)
+  (let ((mtime (nth-value 11 (et:stat (namestring p)))))
+    (< mtime (- (et:gettimeofday) 3600))))
+
+(defun all-pts-old-p ()
+  (every #'pts-old-p
+         (directory (make-pathname :directory "/dev/pts/" :name :wild))))
+
+(defun low-loadavg-p ()
+  (< (nth-value 2 (read-loadavg)) .38))
+
+(cffi:defcfun getloadavg :int (loadavg :pointer) (nelem :int))
+
+(defun read-loadavg ()
+  (values-list
+   (cffi:with-foreign-object (loadavg :double 3)
+     (getloadavg loadavg 3)
+     (loop for i below 3 collect
+           (cffi:mem-aref loadavg :double i)))))
+
+(defun get-id ()
+  (or *id*
+      (setf *id* (list (gethostname) (iolib-posix:getpid) (get-real-time)))))
+
+(defun validate-and-start-client (server port)
+  (let* ((all-pts-old-p (all-pts-old-p))
+         (low-loadavg-p (low-loadavg-p))
+         (valid (and all-pts-old-p low-loadavg-p)))
+    (simple-client `(:register-client ,(get-id) :valid t ;;,valid
+                     ;; :explanation (:all-pts-old-p ,all-pts-old-p :low-loadavg-p ,low-loadavg-p)
+                     )
+                   server port)
+    (if t ;valid
+        (worker-loop :server server :port port)
+        (logger "~&Client not taking part in the farm: ~S"
+                (list :all-pts-old-p all-pts-old-p :low-loadavg-p low-loadavg-p)))))
+
+(def*fun simple-client (message target port)
+  (let* ((address (ensure-address target))
+         (socket (make-tcp-connection address port)))
+    (unwind-protect
+         (progn
+           (safe-write message :stream socket)
+           (princ +crlf+ socket)
+           (finish-output socket)
+           ;;(shutdown socket :write)
+           (read socket))
+      (close socket))))
+
+(defun spawn-client (machine &optional (server (gethostname)) (port *port*))
+  (logger "~&Spawning client on ~S" machine)
+  (spawn-process-on machine *farmer-path* "client" server port))
+
+(def*fun start-clients ()
+  (spawn-all-clients)
+  nil)
+
+(def*fun spawn-all-clients ()
+  (dolist (machine (process-registry-log))
+    (loop repeat *workers-per-machine* do
+      (spawn-client machine))))
diff --git a/manager.lisp b/manager.lisp
new file mode 100644 (file)
index 0000000..a08fd42
--- /dev/null
@@ -0,0 +1,362 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Trivial Job Management
+(in-package :philip-jose)
+
+#|
+Advice from #lisp: maybe use cells or computed-class -- but not really
+       http://common-lisp.net/project/cells/
+       http://common-lisp.net/project/computed-class/
+lemonodor: Jim Firby's RAP (not free software, but a good read)
+       http://people.cs.uchicago.edu/~firby/raps/index.shtml
+
+Proper interface:
+a little language to dynamically build a dependency graph.
+A node remembers those existing nodes that still need to be completed
+before that node may be undertaken.
+When a node is completed, its dependencies are propagated,
+which may themselves generate more jobs...
+
+And the language is Lisp, thanks to CALL/CC!
+
+|#
+
+
+;;; Global lock to protect state (not used at this time)
+#|
+(defparameter *tracker-lock* (make-lock)
+  "global lock for multithreaded servers")
+
+(defmacro with-tracker-lock (() &body body)
+  `(with-lock-held (*tracker-lock*)
+     ,@body))
+|#
+
+;;; Workers have an id, a known status
+
+(defvar *current-worker*
+  nil
+  "the worker making current request")
+
+(defparameter *registered-workers*
+  (make-hash-table :test 'equal)
+  "workers that have registered a job")
+
+(defun register-worker (id &optional (status :working))
+  (Setf (gethash id *registered-workers*) status))
+
+(defun unregister-worker (id)
+  (remhash id *registered-workers*))
+
+
+;;; Your every worker job
+
+(defclass worker-job (simple-print-object-mixin)
+  ((id :accessor job-id :initarg :id :documentation
+       "nonce to identify the job")
+   (description :accessor job-description :initarg :description :documentation
+                "description of the job, a SEXP to be transmitted to the worker")
+   (status :accessor job-status :initarg :status :initform nil :documentation
+           "Status of the job, NIL if to be dispatched, T if done, a worker ID if being done")
+   (validator :accessor job-validator :initarg :validator :initform t :documentation
+              "thunk to validate whether a job is still needed by the time a worker is ready to claim it.")
+   (on-claim :accessor job-on-claim :initarg :on-claim :initform nil :documentation
+             "hook thunk to call when a worker claims the job")
+   (on-completion :accessor job-on-completion :initarg :on-completion :documentation
+                  "continuation to call when a job returns")))
+
+(defparameter *worker-job-counter* 0)
+
+(defun make-worker-job-id ()
+  `(,@*id* :worker-job ,(incf *worker-job-counter*)))
+
+(defvar *current-worker-job*
+  nil
+  "the worker job currently examined")
+
+;;; Worker jobs can be scheduled or claimed
+
+(defparameter *claimed-worker-jobs*
+  (make-hash-table :test 'equal)
+  "jobs claimed by some worker")
+
+(defun claim-job (job &optional (worker *current-worker*))
+  (setf (gethash (job-id job) *claimed-worker-jobs*) job)
+  (setf (job-status job) (list worker (get-real-time)))
+  (notify-claimed-job job))
+
+(defun notify-claimed-job (job)
+  (funkall (job-on-claim job)))
+
+(defun notify-completed-job (job &rest results)
+  (setf (job-status job) :completed)
+  (apply #'funkall (job-on-completion job) results))
+
+(defun unclaim-job (job)
+  (setf (job-status job) '(:unclaimed))
+  (remhash (job-id job) *claimed-worker-jobs*)
+  nil)
+
+(defparameter *scheduled-worker-jobs*
+  (make-fifo)
+  "jobs that need to be executed by some worker")
+
+(defun format-job (stream job)
+  "Format only the interesting slots of job to stream."
+  (format stream "#<job ~d ~A>"
+          (job-id job)
+          (job-description job)))
+
+(defun enqueue-worker-job (job)
+  (logger "~&Enqueueing ~A" (format-job nil job))
+  (fifo-enqueue job *scheduled-worker-jobs*)
+  job)
+
+(defun issue-worker-job (description &rest keys
+                        &key validator on-claim on-completion)
+  (declare (ignore validator on-claim on-completion))
+  (enqueue-worker-job
+   (apply #'make-instance 'worker-job
+          :id (make-worker-job-id)
+          :description description
+          keys)))
+
+(defun get-next-worker-job ()
+  (loop until (fifo-empty-p *scheduled-worker-jobs*)
+        do (let ((job (fifo-dequeue *scheduled-worker-jobs*)))
+             (when (validate-job job)
+               (logger "~&Issuing ~A" (job-id job))
+               (return job)))))
+
+;;; Global defaults when looking for a job
+
+(defun get-next-worker-job-specification ()
+  (if-bind job (get-next-worker-job)
+    (progn
+      (claim-job job)
+      (job-specification job))
+    (no-job-to-do)))
+
+(defun job-specification (job)
+  `(:job :id ,(job-id job) :description ,(job-description job)))
+
+(defparameter *all-done*
+  nil
+  "when everything is said and done, we'll tell each worker to die in peace")
+
+(defun no-job-to-do ()
+  (if *all-done*
+      (suicide-job)
+      (sleep-job)))
+
+(defun suicide-job (&optional (worker *current-worker*))
+  (unregister-worker worker)
+  '(:die))
+
+(defun sleep-job ()
+  `(:sleep ,*sleep-delay*))
+
+(defgeneric validate-job (job)
+  (:method ((x t))
+    t))
+
+(defmethod validate-job ((job worker-job))
+  (funkall (or (job-validator job) t)))
+
+(defun hand-job-over-to-worker (job &optional (worker *current-worker*))
+  (with-accessors ((id job-id) (desc job-description)
+                   (on-claim job-on-claim) (on-completion job-on-completion))
+      job
+    (register-worker worker id)
+    (claim-job id worker)))
+
+(defun-request-handler worker-request (id &rest keys &key completed results error)
+  (logger "~&Got worker-request from~{ ~A~}, with keys ~S" id keys)
+  (when error
+    (logger "~&Error from worker~{ ~A~}: ~A" id error))
+  (let ((*current-worker* id))
+    (when completed
+      (job-done completed id results))
+    (reply (get-next-worker-job-specification))))
+
+(defun job-done (job-id worker results)
+  (if-bind job (gethash job-id *claimed-worker-jobs*)
+    (destructuring-bind (claimant start-time) (job-status job)
+      (if (equal worker claimant)
+          (logger "~&Job ~D completed by worker ~S in ~D seconds"
+                  (job-id job) worker
+                  (- (get-real-time) start-time))
+          (logger "~&Worker ~S announced he completed job ~A, originally claimed by ~S"
+                  worker (format-job nil job) claimant))
+      (unclaim-job job)
+      (register-worker worker :idle)
+      (apply #'notify-completed-job job results))
+    (logger "~&Worker ~S announced completion of job ~A, that doesn't exist (anymore?)"
+            worker (format-job nil job))))
+
+(defun-request-handler show-job-status ()
+  (reply* :all-done *all-done* :claimed-worker-jobs (hash-table->alist *claimed-worker-jobs*)))
+
+(defun notify-worker-timeout (worker &aux (pity nil))
+  ;; In this Trotskyist/Guevarist approach to management, we simply kill those
+  ;; counter-revolutionary saboteurs who are late at delivering the results
+  ;; expected by the manager. Oh, of course, being good socialists, we never
+  ;; kill any worker. Mind you, the miserable insect is unregistered first, so
+  ;; that not being a worker anymore, it is killed without spilling sacred
+  ;; proletarian blood. Insects are worthless, and multiply anyway.
+  (unregister-worker worker)
+  (destructuring-bind (insect without rights) worker
+    (declare (ignore rights))
+    (kill-machine-process insect without) pity
+    (spawn-client insect)))
+
+
+;;; Advanced Control Flow
+
+(exporting-definitions
+
+(defvar *retry-on-timeout* nil)
+
+(defun/cc issue-worker-job/cc (description &key validator on-claim on-completion
+                                           retry-on-timeout)
+  "Issue one worker job"
+  (if retry-on-timeout
+      (loop
+        for attempts from 1 do
+        (let ((guard t) (worker nil))
+          (multiple-value-bind (successp &optional results)
+              (with-local-task-competition (c)
+                (issue-worker-job
+                   description
+                   :validator (lambda ()
+                                (and guard (funkall (or validator t))))
+                   :on-claim (lambda ()
+                               (funkall
+                                (prog1 on-claim
+                                  (setf on-claim nil
+                                        worker *current-worker*)
+                                  (local-task-competitor (c) (sleep/cc retry-on-timeout) nil))))
+                   :on-completion (lambda (&rest results)
+                                    (setf worker nil)
+                                    (maybe-win-local-task-competition c t results))))
+            ;;(DBG :gah guard successp results attempts)
+            (setf guard nil)
+            (if successp
+                (return (apply on-completion results))
+                (when worker
+                  (logger "~&Timed out:~{ ~A~}" worker)
+                  (notify-worker-timeout worker))))))
+      (issue-worker-job description
+                        :validator validator
+                        :on-claim on-claim
+                        :on-completion on-completion)))
+
+(defun/cc issue-sequential-job (description &key retry-on-timeout validator on-claim on-completion)
+  "Issue a job. When the job is completed, return the values returned by the worker."
+  (let/cc k
+    (issue-worker-job/cc
+     description
+     :retry-on-timeout retry-on-timeout
+     :validator validator
+     :on-claim on-claim
+     :on-completion (if on-completion
+                        (lambda (&rest results)
+                          (apply #'funkall on-completion results)
+                          (apply #'kall k results))
+                        k))))
+
+(defun/cc issue-sequential-job-with-timeout
+    (description &key timeout retry-on-timeout validator on-claim on-completion)
+  "Issue a job. If the job is completed before the timeout, return T and the values returned by the worker,
+otherwise, return NIL"
+  (let ((guard t)
+        (worker nil))
+    (multiple-value-bind (successp &optional results)
+        (with-local-timeout (timeout)
+          (let/cc k
+            (issue-worker-job/cc
+             description
+             :retry-on-timeout retry-on-timeout
+             :validator (lambda () (and guard (funkall (or validator t))))
+             :on-claim (lambda () (setf worker *current-worker*) (funkall on-claim))
+             :on-completion (lambda (&rest results)
+                              (setf worker nil)
+                              (apply #'funkall on-completion results)
+                              (kall k t results)))))
+      (setf guard nil)
+      (when worker
+        (notify-worker-timeout worker))
+      (apply #'values successp results))))
+
+
+;;; Jobs to be run in parallel of each other.
+
+(defun/cc default-reducer (accumulator primary-value &rest other-values)
+  (declare (ignore other-values))
+  (cons primary-value accumulator))
+
+(defun/cc call-with-parallel-job-issuer (thunk &key
+                                               (reducer #'default-reducer)
+                                               (initial-value nil))
+  (let/cc k
+    (let ((remaining-jobs 0)
+          (accumulator initial-value)
+          (thunk-done nil))
+      (labels ((maybe-exit ()
+                 (when (and thunk-done (zerop remaining-jobs))
+                   (kall k accumulator)))
+               (one-less (values)
+                 (setf accumulator (apply reducer accumulator values))
+                 (decf remaining-jobs)
+                 (maybe-exit))
+               (one-more (description &key retry-on-timeout on-claim on-completion)
+                 ;; should we error out when thunk-done is true,
+                 ;; or is it OK for jobs or escaped continuations
+                 ;; to cause more jobs to be queued?
+                 (let/cc k
+                   (incf remaining-jobs)
+                   (issue-worker-job/cc
+                    description
+                    :retry-on-timeout retry-on-timeout
+                    :on-claim (lambda ()
+                                (funkall on-claim)
+                                (kall k))
+                    :on-completion (lambda (&rest values)
+                                     (apply #'funkall on-completion values)
+                                     (one-less values))))))
+        (funcall thunk #'one-more)
+        (setf thunk-done t)
+        (maybe-exit)))))
+
+(defparameter *parallel-job-issuer* nil)
+
+(defun/cc issue-parallel-job (description
+                              &key retry-on-timeout on-claim on-completion
+                              (issuer *parallel-job-issuer*))
+  (unless issuer
+    (error "No parallel job issuer for job ~S" description))
+  (funcall issuer description
+           :retry-on-timeout retry-on-timeout :on-claim on-claim :on-completion on-completion))
+
+(defmacro with-parallel-jobs ((issuer &rest keys &key reducer initial-value) &body body)
+  "BODY will be executed where ISSUER is lexically bound and fbound
+(or if it is NIL, *PARALLEL-JOB-ISSUER* dynamically bound) to a function that will
+issue jobs to be run in parallel then pause until the job is claimed by a worker.
+
+When a worker returns, the REDUCER is applied to the accumulator and the values
+provided by the worker, where the accumulator is initialized with INITIAL-VALUE.
+When the body is fully evaluated and all the jobs issued so far have returned,
+then the value of the accumulator is returned.
+
+The default INITIAL-VALUE is NIL, and the default REDUCER is DEFAULT-REDUCER, which
+conses the primary value returned by each job into a list ordered by last-returned first.
+If you just want to drop those values, use (CONSTANTLY NIL) as the REDUCER."
+  (declare (ignore reducer initial-value))
+  (with-gensyms (args)
+    `(call-with-parallel-job-issuer
+      (lambda (,(or issuer '*parallel-job-issuer*))
+        (labels (,@(when issuer
+                    `(,issuer (&rest ,args)
+                      (apply ,issuer ,args))))
+          ,@body))
+      ,@keys)))
+)
diff --git a/package.lisp b/package.lisp
new file mode 100644 (file)
index 0000000..94943ee
--- /dev/null
@@ -0,0 +1,46 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+
+(in-package :cl-user)
+
+(cl:defpackage :philip-jose
+  (:use :common-lisp
+        :fare-utils
+        :split-sequence :parse-number
+        :net.sockets :cffi :iomux
+        :threads
+        :arnesi :closer-mop)
+  (:shadowing-import-from :arnesi
+      #:aif2 #:acond2 #:aif #:acond #:it ;same anaphoric macros in fare-utils
+      #:let1 #:compose #:if-bind ;also in fare-utils
+      #:join-strings #:copy-array #:strcat #:quit ;also in fare-utils
+      #:_ ;conflicts with fare-utils
+      #:name ;conflicts with net.sockets
+      #:partition ;conflicts with split-sequence
+      )
+  (:export #:*farmer-path* #:*farmer-system* #:*id*
+           #:*local-task-name* #:*max-request-size*
+           #:*parallel-job-issuer* #:*registry-path*
+           #:*retry-on-timeout* #:*ssh-path* #:*startup-actions*
+           #:*usage* #:*workers-per-machine* #:+crlf+ #:+unix-epoch+
+           #:bork #:call-job-handler #:call-request-handler
+           #:call-with-parallel-job-issuer #:collect-slots
+           #:command-line-from-strings #:defun-job
+           #:defun-request-handler #:errexit #:errformat #:exit #:fsleep
+           #:funkall #:get-localhost-names #:get-real-time #:getarg
+           #:gethostname #:issue-parallel-job #:issue-sequential-job
+           #:issue-sequential-job-with-timeout #:issue-worker-job/cc
+           #:logger #:main #:no-more-arg #:no-more-args #:print-object
+           #:program-help #:register-job-handler #:register-request-head
+           #:reply #:reply* #:simple-client #:simple-print-object
+           #:simple-print-object-mixin #:simple-worker-loop
+           #:simple-worker-thing #:slots-to-print #:spawn-all-clients
+           #:spawn-process #:spawn-process-on #:spawn-remote-process
+           #:start-clients #:start-multi-threaded-server
+           #:start-single-threaded-server #:start-test #:startup
+           #:stop-server #:strings-from-command-line
+           #:strings-from-command-line-stream #:task-loop #:unexpected-end-of-text
+           #:with-error-message-handler #:with-parallel-jobs
+           #:with-rendez-vous #:with-rendez-vous-after
+           #:with-trivial-logging))
+
+;;(write (fare-utils:make-defpackage-form :philip-jose :gensym) :pretty t :case :downcase)(terpri)
diff --git a/philip-jose.asd b/philip-jose.asd
new file mode 100644 (file)
index 0000000..86bdb23
--- /dev/null
@@ -0,0 +1,33 @@
+;;;-*- Lisp -*-
+
+(in-package :cl-user)
+
+(proclaim '(optimize (speed 3) (safety 2) (debug 1) (space 3)))
+
+(asdf:oos 'asdf:load-op :cl-launch)
+
+(asdf:defsystem :philip-jose
+  :name "Philip Jose Farmer"
+  :serial nil
+  :depends-on (:cl-launch ;; needed first, for fasl caching(?)
+               :cffi :iolib :io.multiplex :net.dns-client
+               :split-sequence :net-telent-date :parse-number
+               :bordeaux-threads :arnesi :closer-mop
+               :fare-utils)
+  :components ((:file "package")
+              (:file "utilities" :depends-on ("package"))
+              (:file "specials" :depends-on ("package"))
+              (:file "logger" :depends-on ("specials"))
+              (:file "registry" :depends-on ("logger" "utilities"))
+              (:file "trivial-sexp-server" :depends-on ("logger"))
+              (:file "incremental-parsing" :depends-on ("trivial-sexp-server"))
+              (:file "tcp-client" :depends-on ("logger"))
+              (:file "worker" :depends-on ("tcp-client" "utilities" "specials"))
+              (:file "machines" :depends-on ("trivial-command-line" "registry" "worker" "specials"))
+               (:file "trivial-command-line" :depends-on ("logger"))
+              (:file "tcp-server" :depends-on ("trivial-sexp-server" "incremental-parsing"
+                                                                      "logger"))
+              (:file "local-tasks" :depends-on ("logger" "utilities"))
+              (:file "manager" :depends-on
+                      ("tcp-server" "tcp-client" "logger" "utilities" "local-tasks"))
+              (:file "farmer" :depends-on ("machines" "registry" "manager"))))
diff --git a/register-machine b/register-machine
new file mode 100755 (executable)
index 0000000..a456dff
--- /dev/null
@@ -0,0 +1,22 @@
+#!/bin/zsh -f
+
+PROG="${0##*/}"
+case "$PROG" in
+  register-*|ohcreg) FOO=register ;;
+  *) FOO="${PROG%%-*}" ;;
+esac
+
+LOG=/home/fare/off-hours-computations/incoming/log
+
+DATE="$(date -u +"%Y-%m-%d.%H:%M:%S")"
+IPADDRS="$(/sbin/ifconfig | grep "inet addr" | grep -v 127.0.0.1 | \
+          perl -npe 's/^.*addr:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+) .*$/$1/')"
+
+ENTRY="$(echo $FOO $USERNAME $HOST $DATE $IPADDRS ${1+# $*})"
+
+echo "$ENTRY" >> $LOG
+
+echo "The following entry has been logged:"
+echo "$ENTRY"
+echo
+echo "Thank you for participating in the off-hours computation farm."
diff --git a/registry.lisp b/registry.lisp
new file mode 100644 (file)
index 0000000..0ee2769
--- /dev/null
@@ -0,0 +1,51 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Registering machines for use on the farm.
+
+(in-package :philip-jose)
+
+(defparameter *start-time* nil)
+(defun get-start-time ()
+  (or *start-time* (setf *start-time* (get-real-time))))
+
+(defun read-time (time)
+  (date:parse-time time :patterns
+                   '((date::year (date::date-divider) date::month (date::date-divider) date::day
+                      (date::time-divider) date::hour (date::time-divider) (date::minute)
+                      (date::time-divider) (date::secondp)))))
+
+(defun decode-registry-log-line (x)
+  (destructuring-bind (op user machine time-string &rest rest) (split-sequence #\SPACE x)
+    (let ((time (- (read-time time-string) +unix-epoch+))
+          (hash (position "#" rest :test #'string-equal))
+          ips limit args)
+      (if hash
+        (setf ips (subseq rest 0 hash)
+              args (subseq rest (1+ hash))
+              limit (ignore-errors
+                      (+ time
+                         (* (parse-number:parse-positive-real-number x)
+                            (* 24 60 60)))))
+        (setf ips rest))
+      (values op user machine time ips limit args))))
+
+(defparameter *registered-hosts* nil)
+
+(defun process-registry-log ()
+  (let ((*registered-hosts* (make-hash-table :test 'equal)))
+    (with-open-file (s *registry-path*)
+      (process-registry-log-stream s))
+    (mapcar #'car (sort (hash-table->alist *registered-hosts*) #'< :key #'cdr))))
+
+(defun process-registry-log-stream (s)
+  (loop for x = (read-line s nil)
+        while x do
+    (multiple-value-bind (op user machine time ips limit args) (decode-registry-log-line x)
+      (declare (ignore user ips args))
+      (when (and (<= time (get-start-time))
+                 (or (null limit)
+                     (< (get-start-time) limit)))
+        (cond
+          ((string= "register" op)
+           (setf (gethash machine *registered-hosts*) time))
+          ((string= "unregister" op)
+           (remhash machine *registered-hosts*)))))))
diff --git a/specials.lisp b/specials.lisp
new file mode 100644 (file)
index 0000000..61b030c
--- /dev/null
@@ -0,0 +1,59 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Default configuration for the Philip-Jose Farmer
+
+(in-package :philip-jose)
+
+(def*var *farmer-system*
+    :farmer
+  "Name of the system, as visible to the user. Override with the name of your application")
+
+(def*var *farmer-path*
+    "/home/fare/off-hours-computations/bin/farmer"
+  "path of the farmer, to spawn new clients with fork+exec or ssh")
+
+(def*var *registry-path*
+    "/home/fare/off-hours-computations/incoming/log"
+  "path of the registry identifying which machines are part of the farm")
+
+(def*var *ssh-path*
+    "/usr/bin/ssh"
+  "Path to the SSH executable, for fork+exec")
+
+(def*parameter +crlf+
+    (make-array
+     '(2) :element-type 'base-char
+     :initial-contents (list (code-char 13) (code-char 10)))
+  "Constant string for CR+LF, the standard line terminator on the net")
+
+(def*var *id*
+    nil
+  "Identity of the current process: a list with machine name, pid and start time")
+
+(def*var *local-task-name*
+    nil
+  "TODO: use this to give a name to the currently executed task, for debugging purpose")
+
+(def*var *max-request-size*
+    4096  ;; for debugging, use 128. Normally, more like 4096
+  "Maximum size of a request, in bytes")
+;; Note that (1) the response can be of any size, and
+;; (2) at present, we preallocate a buffer for the whole request
+;; instead of making it extensible so don't make it too large without fixing the code.
+
+(def*var *workers-per-machine*
+    4
+  "Number of workers to spawn per machine")
+;; TODO: have a dynamic strategy that takes the number of CPUs and/or the load into account.
+
+(defparameter *sleep-delay*
+  10
+  "duration in seconds to ask workers to sleep when there is no job ready for them yet")
+
+
+;;; Global variables for server stuff
+(defparameter *server* nil)
+(defparameter *port* 6666)
+(defparameter *server-event* nil)
+
+(defparameter *event-base* nil
+  "Base object for I/O multiplexing")
diff --git a/tcp-client.lisp b/tcp-client.lisp
new file mode 100644 (file)
index 0000000..878c9d4
--- /dev/null
@@ -0,0 +1,32 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Trivial TCP client
+
+(in-package :philip-jose)
+
+(defun make-tcp-connection (address port)
+  (let ((socket (make-socket :address-family :internet :type :stream :connect :active
+                             :protocol :default :ipv6 nil)))
+    (connect socket address :port port)
+    socket))
+
+(defun ensure-address (x)
+  (etypecase x
+    (sockaddr x)
+    (string
+     (let ((host (lookup-host x :ipv6 nil)))
+       (first (host-addresses host))))))
+
+
+(def*fun simple-client (message target port)
+  (let* ((address (ensure-address target))
+         (socket (make-tcp-connection address port)))
+    (unwind-protect
+         (progn
+           (safe-write message :stream socket)
+           (princ +crlf+ socket)
+           (finish-output socket)
+           ;;(shutdown socket :write)
+           (read socket))
+      (close socket))))
+
+
diff --git a/tcp-server.lisp b/tcp-server.lisp
new file mode 100644 (file)
index 0000000..3a3b4e2
--- /dev/null
@@ -0,0 +1,187 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Simple TCP server
+
+(in-package :philip-jose)
+
+
+;;;; Generic TCP routines
+
+(defun make-tcp-server-socket (address port)
+  (let ((socket (make-socket :address-family :internet
+                             :type :stream
+                             :connect :passive
+                             :ipv6 nil)))
+    (bind-address socket address :port (or port 0))
+    (socket-listen socket)
+    socket))
+
+(defun call-connection-handler (socket handler &rest args)
+  (with-trivial-logging ()
+    (unwind-protect
+         (let* ((*standard-input* socket)
+                (*standard-output* socket))
+           (apply handler args)
+           (finish-output))
+      (close socket))))
+
+
+;;;; Spawners for a multi-threaded system having one thread per connection.
+(defun make-connection-worker (socket handler &rest args)
+  #'(lambda ()
+      (apply #'call-connection-handler socket handler args)))
+
+(defun connection-spawner (listener-socket handler)
+  #'(lambda (fd evtype)
+      (if (eql evtype :read)
+        (let ((connection (accept-connection listener-socket)))
+          (make-thread (make-connection-worker connection handler)) :name "connection worker")
+        (logger "~&Error on FD ~A" fd))))
+
+(defun add-multi-threaded-server (event-base listener-socket handler)
+  (iomux::add-fd event-base (sockets::socket-fd listener-socket)
+                 :read (connection-spawner listener-socket handler)
+                 :persistent t))
+
+
+;;;; Spawners for a single-threaded system
+;; Generic code that implements an efficient non-blocking single-threaded server
+;; based on an event-loop and incremental message:
+;; maintain incrementally parsed request buffers
+;; and process requests when complete.
+;;
+;; The same single server thread can listen to many event sources at once and
+;; implement many servers using this (or other compatible) protocols.
+;;
+;; Reading bytes (or -- ugly hack -- latin1) would avoid ugly SB-IMPL::END-OF-INPUT-IN-CHARACTER
+;; when a packet is cut in the middle of a multi-byte character representation.
+;; For anything non-trivial, we really want bytes when reading/writing, and a
+;; flexible stream protocol.
+;;
+;; My design choice: I want to minimize the time spent in
+;; the single-threaded network message parser/dispatcher
+;; so although my source code for the parser will have characters and strings,
+;; I will be processing bytes directly the forked/threaded clients can do
+;; octets-to-string on whatever the parser gives them.
+;;
+;; As long as I only use ASCII this will do with BASE-CHAR.
+
+(defun add-connection-handler (event-base connection incremental-parser request-handler
+                                          &key (element-type 'base-char)
+                                          (external-format :iso-8859-1) ;; :default
+                                          (max-buffer-size *max-request-size*))
+  ;;; There is no race condition between setting handle and reading it in close-handler,
+  ;;; because events are not handled asynchronously, but only at the next event-dispatch
+  ;;; so everything we do inside a handler is properly isolated from other handlers
+  ;;; (but not from other threads).
+  (declare (ignorable element-type external-format max-buffer-size)) ;---***
+  (let* ((bytes-p (typep 0 element-type))
+        (initial-element (if bytes-p 0 #\space))
+        (buffer (make-array (list max-buffer-size)
+                            :element-type element-type
+                            :initial-element initial-element))
+        (size 0)
+         (state nil)
+         handle)
+    (labels
+        ((close-handler ()
+           (remove-event event-base handle)
+           (close connection))
+         (handler (fd evtype)
+           (declare (ignorable fd))
+           (handler-case
+               (cond
+                 ((eql evtype :read)
+                  ;;--- cannot read-sequence, because it will block. Have to use read.
+                  (let ((packet-size
+                         (cffi:with-pointer-to-vector-data (buf-ptr buffer)
+                           (et:read fd (cffi:inc-pointer buf-ptr size) (- max-buffer-size size)))))
+                    (incf size packet-size)
+                    (logger "~&Incoming packet from ~A: ~D bytes~%"
+                            (sockets::vector-to-dotted
+                             (sockets::name
+                              (sockets::remote-address connection)))
+                            packet-size)
+                    (multiple-value-bind (x y)
+                        (funcall incremental-parser buffer size state)
+                      (cond
+                        (x
+                         (funcall request-handler connection buffer size y)
+                         (close-handler))
+                        ((= size max-buffer-size)
+                         ;;---*** log an error?
+                         (close-handler))
+                        (t
+                         (setf state y)))))) ; get ready for next packet
+                  (t
+                   (close-handler)))
+                 (condition (c)
+                            (logger "~&~S~%" c)
+                            (close-handler)))))
+         (setf handle (iomux::add-fd event-base (sockets::socket-fd connection)
+                                     :read #'handler :persistent t)))))
+
+(defun add-single-threaded-server (event-base socket incremental-parser request-handler &rest keys
+                               &key element-type external-format max-buffer-size)
+  (declare (ignorable element-type external-format max-buffer-size))
+  (labels
+      ((handler (fd evtype)
+         (declare (ignorable fd))
+         (with-trivial-logging ()
+           (case evtype
+             (:read
+              (let ((connection (accept-connection socket)))
+                (logger "~&Connection opened with ~A~%"
+                        (sockets::vector-to-dotted
+                         (sockets::name (sockets::remote-address connection))))
+                (apply #'add-connection-handler event-base connection incremental-parser request-handler keys)))
+             (otherwise
+              (logger "~&Weird event ~S on ~S~%" evtype fd))))))
+    (iomux::add-fd event-base (sockets::socket-fd socket)
+                   :read #'handler :persistent t)))
+
+
+
+
+
+(defun ensure-event-base ()
+  ; can't put this make-instance form in specials.lisp because compiletime != runtime
+  (if (null *event-base*)
+    (setf *event-base* (make-instance 'iomux:event-base))))
+
+;; A one central server socket
+(defun make-server (&optional (address +ipv4-unspecified+) (port *port*))
+  (when *server*
+    (close *server*))
+  (ensure-event-base)
+  (setf *server* (make-tcp-server-socket address port))
+  (setf *port* (sockets::local-port *server*)))
+
+(def*fun start-multi-threaded-server (&key (handler #'sexp-request-handler)
+                                     client-starter process-events)
+  (make-server)
+  (when client-starter
+    (funcall client-starter))
+  (format *error-output* "Starting server on port ~A~%" *port*)
+  (finish-output *error-output*)
+  (setf *server-event* (add-multi-threaded-server *event-base* *server* handler))
+  (when process-events
+    (event-dispatch *event-base*)))
+
+(def*fun start-single-threaded-server (&key (incremental-parser (make-incremental-reader))
+                                            (handler (make-incrementally-parsed-request-handler #'handle-sexp-request))
+                                            client-starter process-events)
+  (make-server)
+  (when client-starter
+    (funcall client-starter))
+  (format *error-output* "Starting server on port ~A~%" *port*)
+  (finish-output *error-output*)
+  (setf *server-event* (add-single-threaded-server
+                        *event-base* *server* incremental-parser handler))
+  (when process-events
+    (event-dispatch *event-base*)))
+
+(def*fun stop-server ()
+  (close *server*)
+  (remove-event *event-base* *server-event*)
+  (setf *server* nil)
+  (setf *server-event* nil))
diff --git a/test.lisp b/test.lisp
new file mode 100644 (file)
index 0000000..9891e85
--- /dev/null
+++ b/test.lisp
@@ -0,0 +1,69 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Trivial Testing
+
+(in-package :philip-jose)
+
+#|
+(require :asdf) (pushnew "/home/fare/lib/lisp-systems/" asdf:*central-registry* :test (function equal)) (asdf:oos (quote asdf:load-op) :qultivator :verbose t) (in-package :philip-jose) (DBG :foo (simple-client (list 7 8) "localhost" 7))
+
+(unless *server* (start-single-threaded-server :process-events nil))
+(issue-worker-job '(:fake 0) :on-completion (lambda (&rest x) (DBG :fake x))) (task-test 4)
+
+(defvar *keys* nil)
+(setf *keys* (apply (function worker-step) :server "localhost" *keys*))
+|#
+
+(defun run-test (f &rest args)
+  (catch :exit
+    (let (x keys)
+      (with-call/cc
+        (DBG :test (apply f args))
+        (setf x t))
+      (loop until x do (task-step)))))
+
+(defmacro xtest (&body body)
+  `(run-test (with-call/cc (lambda () ,@body))))
+
+(defun task-loop-1 (n)
+  (DBG :tl1 n)
+  (when (plusp n)
+    (schedule-local-task
+     #'(lambda () (task-loop-1 (1- n)))))
+  nil)
+
+(defun test-seq (n)
+  (xtest
+    (loop for i from 1 to n do
+      (DBG :test-seq i (issue-sequential-job `(:fake ,i))))
+    (DBG :test-seq-end)))
+
+(defun test-para (n)
+  (xtest
+    (DBG :tpara)
+    (with-parallel-jobs (i)
+      (loop for i from 1 to n do
+        (DBG :i i)
+        (i `(:fake ,i) :on-completion (lambda (&rest x) (DBG :fake x))))
+      (DBG :no-issue-left))
+    (DBG :z)))
+
+(defun test-timeout (x y)
+  (xtest
+    (with-local-timeout (x c)
+      (sleep/cc y)
+      (maybe-win-local-task-competition c 42))))
+
+#|(defun spawn-workers ()
+  (let ((num-workers (hash-table-count *registered-hosts*)))
+    (loop for x being the hash-keys of *registered-hosts* doing
+         (shell-command (format nil "ssh ~a" x))
+         (|#
+
+#|
+(defun run-client (server)
+  (loop for count from 0 do
+        (multiple-value-bind (start-block num-blocks) (get-next-slice server)
+        ;;---*** finish me
+        ))
+|#
+
diff --git a/trivial-command-line.lisp b/trivial-command-line.lisp
new file mode 100644 (file)
index 0000000..2a8bdac
--- /dev/null
@@ -0,0 +1,240 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;;; Trivial Command-Line server.
+
+;; This provides a simple command-line dispatch framework,
+;; where each set of arguments 
+;; a list of non-whitespace words separated by whitespace (duh!)
+;; where the first word is the operator and other words are arguments.
+
+(in-package :philip-jose)
+
+;;; A bit of parsing and unparsing between shell command-lines and list of shell strings
+(defmacro define-ascii-char-code-predicates (name (x) &body body)
+  ;; assumes ASCII / latin1 / unicode (!)
+  (let* ((code-p-name (conc-symbol "ASCII-" name "-CODE-P"))
+         (char-p-name (conc-symbol "ASCII-" name "-CHAR-P")))
+    `(progn
+      (defun ,code-p-name (,x) ,@body)
+      (defun ,char-p-name (c) (,code-p-name (char-code c))))))
+(define-ascii-char-code-predicates uppercase-letter (x)
+  (<= 65 x 90))
+(define-ascii-char-code-predicates lowercase-letter (x)
+  (<= 97 x 122))
+(define-ascii-char-code-predicates digit (x)
+  (let ((d (- x #x30)))
+    (when (<= 0 d 9)
+      d)))
+(define-ascii-char-code-predicates whitespace (x)
+    (or (<= x 32)
+        (= x 127)
+        (= x #xA0)))
+(define-ascii-char-code-predicates letter (x)
+  (or (ascii-uppercase-letter-code-p x)
+      (ascii-lowercase-letter-code-p x)))
+(define-ascii-char-code-predicates letter-or-digit (x)
+  (or (ascii-letter-code-p x)
+      (ascii-digit-code-p x)))
+(define-ascii-char-code-predicates normal-shell (x)
+  (or (ascii-letter-code-p x)
+      (<= #x2B x #x3A) ; +,-./0123456789:
+      (member x '(#x25 #x3D #x5F)))) ; %=_
+
+(defun normal-shell-string-p (s)
+  (and (stringp s)
+       (every #'ascii-normal-shell-char-p s)))
+
+(defun write-string-single-quoted-stream (string stream)
+  (write-char #\' stream)
+  (loop for c across string do
+        (case c
+          ((#\' #\\) (write-char #\\ stream)))
+        (write-char c stream))
+  (write-char #\' stream))
+
+(def*fun command-line-from-strings (strings &optional stream)
+  (with-output (stream)
+    (loop for (s . rest) on strings do
+          (if (normal-shell-string-p s)
+            (write-string s stream)
+            (write-string-single-quoted-stream s stream))
+          (when rest (write-char #\space stream)))))
+
+(def*fun strings-from-command-line (command-line)
+  (with-input-from-string (s command-line)
+    (strings-from-command-line-stream s)))
+
+(define*-condition unexpected-end-of-text (simple-error) ())
+
+(def*fun strings-from-command-line-stream (command-line-stream)
+  ;; handles quoting and escaping like a shell, but does not handle $ and other subtleties
+  (declare (optimize (speed 3) (safety 2) (debug 1)))
+  (let* ((s (make-string-output-stream))
+         (r nil))
+    (labels
+        ((flush ()
+           (push (get-output-stream-string s) r))
+         (exit ()
+           (return-from strings-from-command-line-stream (nreverse r)))
+         (putc (c)
+           (write-char c s))
+         (getc ()
+           (read-char command-line-stream nil))
+         (ungetc (c)
+           (unread-char c command-line-stream))
+         (eot (x &rest args)
+           (error 'unexpected-end-of-text
+                  :format-control x
+                  :format-arguments args))
+         (skip-spaces ()
+           (loop for c = (getc)
+                 while (and c (ascii-whitespace-char-p c))
+                 finally
+                 (if c
+                   (ungetc c)
+                   (exit)))
+           (get-normal-char))
+         (get-normal-char ()
+           (let ((c (getc)))
+             (case c
+               ((nil) (flush) (exit))
+               (#\\ (get-escaped-char))
+               (#\' (get-single-quoted-char))
+               (#\" (get-double-quoted-char))
+               (otherwise
+                (if
+                  (ascii-whitespace-char-p c)
+                  (progn (flush) (skip-spaces))
+                  (progn (putc c) (get-normal-char)))))))
+         (get-escaped-char ()
+           (let ((c (getc)))
+             (case c
+               ((nil) (eot "Command-line ended where escaped character expected"))
+               (otherwise (putc c) (get-normal-char)))))
+         (get-double-quoted-char ()
+           (let ((c (getc)))
+             (case c
+               ((nil) (eot "Command-line ended inside double quotes"))
+               (#\\ (get-escaped-double-quoted-char))
+               (#\" (get-normal-char))
+               (otherwise (putc c) (get-double-quoted-char)))))
+         (get-escaped-double-quoted-char ()
+           (let ((c (getc)))
+             (case c
+               ((nil) (eot "Command-line ended where escaped double quoted character expected"))
+               (otherwise (putc c) (get-double-quoted-char)))))
+         (get-single-quoted-char ()
+           (let ((c (getc)))
+             (case c
+               ((nil) (eot "Command-line ended inside single quotes"))
+               (#\\ (get-escaped-single-quoted-char))
+               (#\' (get-normal-char))
+               (otherwise (putc c) (get-single-quoted-char)))))
+         (get-escaped-single-quoted-char ()
+           (let ((c (getc)))
+             (case c
+               ((nil) (eot "Command-line ended where escaped character expected"))
+               ((#\\ #\') (putc c) (get-single-quoted-char))
+               (otherwise (putc #\\) (putc c) (get-single-quoted-char))))))
+      (skip-spaces))))
+
+(def*var *usage*
+  "farmer [action] [args...]
+
+For a list of valid actions, try
+  farmer help
+")
+
+(def*parameter *startup-actions*
+  '(("server" start-server "[port]") ; the main server tracks what happens
+    ("client" start-client "server port"); each client is a worker (future plan: spawns and monitors workers)
+    ;;("worker" start-worker "server port id job") ; each worker does a minutes worth of work
+    ("test" start-test) ; test
+    ("status" query-status) ; queries the current status
+    ("help" program-help) ; queries available actions
+    (nil start-server "server"))) ; default: start the server
+
+(def*fun bork (&rest r)
+  (declare (ignore r))
+  (error "bork"))
+
+(def*fun exit (&optional (code 0) &rest r)
+  (declare (ignore r))
+  (throw :exit code))
+
+(def*fun errformat (fmt &rest args)
+  (apply #'format *error-output* fmt args))
+
+(def*fun errexit (code fmt &rest args)
+  (apply #'errformat fmt args)
+  (exit code))
+
+(defun call-argument-getter (x decoder-function default-thunk errmsg)
+  (handler-case
+      (if x
+        (funcall decoder-function x)
+        (funcall default-thunk))
+    (t () (errexit 3 "~&Invalid argument~@[ ~A~]~%~A" errmsg *usage*))))
+
+(def*macro getarg (args decoder &optional default errmsg)
+  `(call-argument-getter
+    (pop ,args)
+    (function ,decoder)
+    #'(lambda () ,default)
+    ,errmsg))
+
+(def*macro no-more-args (args)
+  `(getarg ,args bork nil "(too many arguments)"))
+
+(def*fun start-test (&rest r)
+  (let ((foo (getarg r parse-integer 0)))
+    (no-more-args r)
+    (DBG :test
+         foo
+         (gethostname)
+         (process-registry-log))))
+
+(def*fun program-help (&rest r)
+  (let ((s (if (streamp (car r)) (car r) *error-output*)))
+  (format s "~&Available actions:~%")
+  (loop with d = nil for x in *startup-actions* do
+        (if (car x)
+          (format s "   ~{~A~*~#[~; ~A~]~}~%" x)
+          (setf d (third x)))
+        finally (when d (format s "default action: ~A~%" d)))))
+
+(defun start-server (&rest r)
+  (let* ((port (getarg r parse-integer 6666)))
+    (no-more-args r)
+    (start-single-threaded-server :client-starter #'start-clients)
+    (task-loop)))
+
+(defun start-client (&rest r)
+  (let* ((server (getarg r identity (bork)))
+         (port (getarg r parse-integer (bork))))
+    (no-more-args r)
+    (validate-and-start-client server port)))
+
+(defun start-worker (&rest r)
+  (let* ((server (getarg r identity (bork)))
+         (port (getarg r parse-integer (bork)))
+         (id (getarg r read-from-string (bork)))
+         (job (getarg r read-from-string (bork))))
+    (no-more-args r)
+    ;;---***
+    (DBG :start-worker server port id job)))
+
+(defun query-status (&rest r)
+  (no-more-args r)
+  ;;---***
+  (DBG :query-status))
+
+(def*fun main (args)
+  (let* ((action (pop args))
+         (fun (second (assoc action *startup-actions* :test #'equalp))))
+    (catch :exit
+      (if fun
+        (progn (apply fun args) 0)
+        (errexit 2 "~&Invalid action ~S~%~A" action *usage*)))))
+
+(def*fun startup ()
+  (cl-launch:quit (main cl-launch:*arguments*)))
diff --git a/trivial-sexp-server.lisp b/trivial-sexp-server.lisp
new file mode 100644 (file)
index 0000000..fbea1c6
--- /dev/null
@@ -0,0 +1,55 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Trivial SEXP server.
+
+;; BIG FAT WARNING!!!!
+;; safe-read is subject to DoS attacks whereby attackers force the server
+;; to intern a lot of useless symbols until it runs out of memory.
+;; THIS NOT FOR USE IN THE WILD! Only on safe (virtual) private networks.
+;;
+;; If you want a robust server, you'll have to supply your own robust parser.
+;; And handle all kind of weird issues such as handling character encoding,
+;; not to speak about avoiding extra consing for performance.
+;; I recommend then trying UDF-A.
+
+(in-package :philip-jose)
+
+(defparameter *request-heads* (make-hash-table :test #'equal))
+
+(def*fun register-request-head (x func)
+  (setf (gethash x *request-heads*) func))
+
+(def*macro defun-request-handler (name formals &body body)
+  `(progn
+     (defun ,name ,formals ,@body)
+     (register-request-head ,(conc-keyword name) (function ,name))))
+
+(def*fun call-request-handler (f &rest args)
+  (let ((fun (gethash f *request-heads*)))
+    (if fun
+      (apply fun args)
+      (error "Not a registered request type ~S" f))))
+
+
+(defun handle-sexp-request (request)
+  (if (consp request)
+    (apply #'call-request-handler request)
+    (error "Bad request ~S" request)))
+
+(defun sexp-request-handler ()
+  (handle-sexp-request (safe-read)))
+
+(defun handle-sexp-request-from-stream (i &optional (o i))
+  (let* ((*standard-input* i)
+         (*standard-output* o))
+    (sexp-request-handler)))
+
+(defun handle-sexp-request-from-socket (socket)
+  (handle-sexp-request-from-stream socket))
+
+(def*fun reply (x)
+  (safe-write x)
+  (princ +crlf+)
+  (finish-output))
+
+(def*fun reply* (&rest x)
+  (reply x))
diff --git a/utilities.lisp b/utilities.lisp
new file mode 100644 (file)
index 0000000..5560edd
--- /dev/null
@@ -0,0 +1,79 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Various utilities (to move to some other package...)
+
+(in-package :philip-jose)
+
+(exporting-definitions
+
+;; depends on closer-mop
+(defun collect-slots (object &optional (slot-list t))
+  (loop with class = (class-of object)
+        with slots = (if (eq slot-list t) (compute-slots class) slot-list)
+        for slot in slots
+        for name = (slot-definition-name slot)
+        for iarg = (or (first (slot-definition-initargs slot)) name)
+        nconc (when (slot-boundp object name) `(,iarg ,(slot-value object name)))))
+
+(defun simple-print-object (object stream &key identity (slots t))
+  (with-output (stream)
+    (print-unreadable-object (object stream :type t :identity identity)
+      (write (collect-slots object slots) :stream stream))))
+
+(defgeneric slots-to-print (object)
+  (:method ((object t))
+    t))
+
+(defclass simple-print-object-mixin (standard-object)
+  ())
+
+(defmethod print-object ((object simple-print-object-mixin) stream)
+  (simple-print-object object stream :slots (slots-to-print object)))
+
+;; depends on arnesi
+(defun funkall (f &rest args)
+  (etypecase f
+    (function
+     (apply f args)) ; fast path for functions
+    (boolean
+     f) ; booleans evaluate as constant functions returning self -- must be before symbols
+    ((and symbol (satisfies arnesi::fdefinition/cc))
+     (with-call/cc (apply (arnesi::fdefinition/cc f) args)))
+    (symbol
+     (apply f args))
+    (arnesi::closure/cc
+     (with-call/cc (apply f args)))
+    (cons
+     (apply #'kall f args)))) ; arnesi represents continuations as lists
+
+;; This code should be moved to iolib-posix and exported.
+;; gettime should be renamed get-monotonic-time and exported, too.
+(defun get-real-time ()
+  (multiple-value-bind (sec nsec)
+      (et:clock-get-time et:clock-realtime)
+    (+ sec (* nsec 1d-9))))
+
+(defconstant +unix-epoch+ (encode-universal-time 0 0 0 1 1 1970 0))
+
+(defun fsleep (delay)
+  (when (plusp delay)
+    (multiple-value-bind (i d) (floor delay)
+      (unless (zerop i)
+        ;; TODO: here insert check for overflow...
+        (et::sleep i))
+      (unless (zerop d)
+        (et::usleep (floor (* d 1000000))))))
+  nil)
+
+(defmacro with-error-message-handler ((&rest handler) &body body)
+  (with-gensyms (err)
+    `(handler-bind
+         ((simple-condition (lambda (,err)
+                             (,@handler
+                              (apply #'format nil
+                                     (simple-condition-format-control ,err)
+                                     (simple-condition-format-arguments ,err)))))
+          (t (lambda (,err) (,@handler (format nil "~A" ,err)))))
+       ,@body)))
+
+
+);exporting-definitions
diff --git a/worker.lisp b/worker.lisp
new file mode 100644 (file)
index 0000000..d0b83df
--- /dev/null
@@ -0,0 +1,68 @@
+;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
+;;;; Trivial Job tracker
+(in-package :philip-jose)
+
+;;; Simple worker...
+(defparameter *job-handlers* (make-hash-table :test #'equal))
+
+(def*fun register-job-handler (x func)
+  (setf (gethash x *job-handlers*) func))
+
+(def*macro defun-job (name formals &body body)
+  `(progn
+     (defun ,name ,formals ,@body)
+     (register-job-handler ,(conc-keyword name) #',name)))
+
+(def*fun call-job-handler (f &rest args)
+  (let ((fun (gethash f *job-handlers*)))
+    (if fun
+      (apply fun args)
+      (error "Not a registered response type ~S" f))))
+
+(defun worker-step (&key completed results error (server "localhost") (port *port*))
+  (flet ((result-keys ()
+           (when completed
+             (list* :completed completed
+                    (append
+                     (when results (list :results results))
+                     (when error (list :error error)))))))
+    (let ((work
+           (flet ((handle-error (x)
+                    (logger "~&Error while talking to server: ~A" x)
+                    #+sbcl (sb-impl::backtrace 20 t)
+                    (throw 'wemh nil)))
+             (catch 'wemh
+               (with-error-message-handler (handle-error)
+                 (simple-client
+                  (list* :worker-request (get-id) (result-keys))
+                  server port))))))
+      (setf completed nil results nil error nil)
+      (cond
+        ((null work)
+         (logger "~&No work to do")
+         (fsleep *sleep-delay*))
+        (t         
+         (logger "~&Work to do: ~S" work)
+         ((lambda (x) (apply x (rest work)))
+          (ecase (first work)
+            (:sleep  #'fsleep)
+            (:die #'exit)
+            (:job (lambda (&key id description)
+                    (setf completed id results
+                          (flet ((handle-error (x)
+                                   (logger "~&Error while processing ~S: ~A" description x)
+                                   (setf error x)
+                                   (throw 'wemh nil)))
+                            (catch 'wemh
+                              (with-error-message-handler (handle-error)
+                                (multiple-value-list
+                                 (apply #'call-job-handler description))))))))))))
+      (result-keys))))
+
+(defun worker-loop (&key (server "localhost") (port *port*))
+  (let ((keys nil))
+    (loop
+      (setf keys
+            (handler-case
+                (apply #'worker-step :server server :port port keys))))))
+