Skip to content
forker.lisp 7.91 KiB
Newer Older
#+xcvb
(module
 (:build-depends-on nil
  :depends-on
  ("/xcvb/driver"
   (:when (:featurep :sbcl)
     (:require :sb-grovel)
     (:require :sb-posix)))))

#+sbcl ;;; SBCL specific fork support
(eval-when (:compile-toplevel :load-toplevel :execute)
  (require :sb-grovel)
  (require :sb-posix))
;; Simple heuristic: if we have allocated more than the given ratio
;; of what is allowed between GCs, then trigger the GC.
;; Note: can possibly modify parameters and reset in sb-ext:*after-gc-hooks*
(defparameter *prefork-allocation-reserve-ratio* .80) ; default ratio: 80%

(defun should-i-gc-p ()
  (let ((available-bytes (- (sb-alien:extern-alien "auto_gc_trigger" sb-alien:long)
                            (sb-kernel:dynamic-usage)))
        (allocation-threshhold (sb-ext:bytes-consed-between-gcs)))
    (< available-bytes (* *prefork-allocation-reserve-ratio* allocation-threshhold))))

(defun posix-fork ()
  (when (should-i-gc-p)
    (sb-ext:gc))
  (sb-posix:fork))

(defun posix-close (x)
  (sb-posix:close x))

(defun posix-setpgrp ()
  (sb-posix:setpgrp))

(defun posix-waitpid (pid options)
  (sb-posix:waitpid pid options))

(eval-when (:compile-toplevel :load-toplevel :execute)
(defun posix-waitpid-options (&key nohang untraced)
  (logior (if nohang sb-unix:wnohang 0)
          (if untraced sb-unix:wuntraced 0))))

(defun posix-wexitstatus (x)
  (sb-posix:wexitstatus x))

(defun posix-pipe ()
  (sb-posix:pipe))

);#+sbcl


#+clozure ;;; Clozure CL specific fork support
(progn

(defun posix-fork ()
  (finish-outputs)
  (ccl:external-call "fork" :int))

(defun posix-close (x)
  (ccl::fd-close x))

(defun posix-setpgrp ()
  (ccl::external-call "setpgrp" :int))

(eval-when (:compile-toplevel :load-toplevel :execute)
(defun posix-waitpid-options (&key nohang untraced)
  (logior (if nohang (load-time-value (read-from-string "#$WNOHANG")) 0)
          (if untraced (load-time-value (read-from-string "#$WUNTRACED")) 0))))

(defun posix-waitpid (pid options)
  (ccl::rlet ((status :signed))
    (let* ((retval (ccl::external-call
                    "waitpid"
                    :signed pid :address status :signed options :signed)))
      (values retval (ccl::pref status :signed)))))

(defun posix-wexitstatus (x)
  (ccl::wexitstatus x))

(defun posix-pipe ()
  (ccl::pipe))

);#+clozure

#+clisp ;;; CLISP specific fork support
(progn

(defun posix-fork ()
  (linux:fork))

(defun posix-close (x)
  (linux:close x))

(defun posix-setpgrp ()
  (posix:setpgrp))

(defun posix-waitpid (pid options)
  (multiple-value-list (apply #'posix:wait :pid pid options)))

(eval-when (:compile-toplevel :load-toplevel :execute)
(defun posix-waitpid-options (&rest keys &key nohang untraced)
  (declare (ignore nohang untraced))
  keys))

(defun posix-wexitstatus (x)
  (if (eq :exited (second x))
    (third x)
    (cons (second x) (third x))))

(defun posix-pipe ()
  (linux:pipe))

);#+clisp

(defun xdelete (sequence item &rest keys)
  (apply #'delete item sequence keys))
(define-modify-macro deletef (item &rest keys)
  xdelete "Delete item from list")

;;; Do we claim to catch all forks? Probably.
;;; Then we need a hook per process.
;;(defvar *subprocesses-lock* (make-lock))
(defvar *subprocesses* (make-hash-table :test 'equal))
(defvar *only-reap-registered-subprocesses* nil)
(defun add-subprocess (pid handler)
  (setf (gethash pid *subprocesses*) handler))
(defun reap-subprocess (pid status)
  (multiple-value-bind (handler foundp) (gethash pid *subprocesses*)
    (unless foundp
      (error "Trying to reap unregistered PID ~D" pid))
    (remhash pid *subprocesses*)
    (funcall handler status)))
(defun maybe-reap-specified-subprocess (pidspec options)
  (multiple-value-bind (pid status) (posix-waitpid pidspec options)
    (unless (zerop pid)
      (reap-subprocess pid status)
      t)))
(defun maybe-reap-one-subprocess (&key nohang)
  (unless (zerop (hash-table-count *subprocesses*))
    (if *only-reap-registered-subprocesses*
      (loop :with options = (posix-waitpid-options :nohang t)
        :thereis (loop :for pid :being :the :hash-keys :of *subprocesses*
                   :thereis (maybe-reap-specified-subprocess pid options))
        :until nohang :do (sleep .1))
      (maybe-reap-specified-subprocess -1 (posix-waitpid-options :nohang nohang)))))
(defun reap-subprocesses (&key nohang)
  (loop :while (maybe-reap-one-subprocess :nohang nohang)))

(defun initialize-child-process (thunk)
  (setf *subprocesses* (make-hash-table :test 'equal))
  ;; run atfork equivalent hooks?
  ;; reset stack, free resources from other threads
  (unwind-protect
       (progn
         (funcall thunk)
         (quit 0))
    (quit 99)))

(defun in-fork (in-fork &optional (reaper (constantly nil)))
  (finish-outputs)
  (let ((pid (posix-fork)))
    (cond
      ((zerop pid) ;; child
       (initialize-child-process in-fork))
      (t
       (add-subprocess pid reaper)
       pid))))

(defun read-eval-loop (&optional (i *standard-input*))
  (format *error-output* "~&Starting read-eval-loop at ~36R" (get-universal-time))
  (loop :with eof = '#:eof
    #|:with size = 4096
    :with buffer = (make-array size :element-type 'character)
    :for start = 0 :then newstart
    :for end = 0 :then newend
    :for position = (- end start)
    :for () = (replace buffer buffer :start1 0 :end1 position :start2 start :end2 end)
    :for newend = (read-sequence buffer i :start position :end size)
    :for (form newstart) =
    (multiple-value-list (read-from-string buffer nil eof :start 0 :end newend))|#
    :for form = (read-preserving-whitespace i nil eof)
    (format *error-output* "~&At ~36R received form~%  ~S~%" (get-universal-time) form)
    (finish-outputs)
    (let ((values (multiple-value-list (eval form))))
      (format *error-output* "~&; At ~36R it returned~%~{  ~S~%~}" (get-universal-time) values))
    (finish-outputs))
  (format *error-output* "~&; EOF at ~36R~%" (get-universal-time))
  (finish-outputs))
(defun work-on-job (id inpath outpath errpath commands close)
  (when close (close *error-output*))
  (with-open-file (*error-output* errpath :direction :output
                                  :if-exists :append :if-does-not-exist :create)
    (when close
      (setf *stderr* *error-output*)
      (close *standard-output*))
    (with-open-file (*standard-output* outpath :direction :output
                                       :if-exists :overwrite :if-does-not-exist :create)
      (write-sequence (format nil "(:ready ~S)~%" id) *standard-output*) ; handshake
      (finish-outputs)
      (when close (close *standard-input*))
      (with-open-file (*standard-input* inpath
                                        :direction :input :if-does-not-exist :error)
        (let ((*trace-output* *error-output*))
          (run-commands commands))))))
(defun fork-job (id inpath outpath errpath &rest commands)
  ;; NB: we open the new input/output pipes from the parent,
  ;; and close them right after the fork,
  ;; so that closing of the pipe be a signal to the controller
  ;; that the controlled process is dead.
  ;; Note that the controller process will have to open the pipe
  ;; in non-blocking mode, and to select on it.
  (format *error-output* "Forked process ~D to run job ~S ~S ~S ~S~{ ~S~}~%"
          (in-fork
           (lambda ()
             (work-on-job id inpath outpath errpath commands t))) ;; t
          id inpath outpath errpath commands)
  (finish-outputs))

(defun execute-job (id inpath outpath errpath &rest commands)
  (work-on-job id inpath outpath errpath commands nil)
  (format *error-output* "Terminating process at ~36R" (get-universal-time))
  (let ((*standard-output* *error-output*))
    (format t "~&Running job ~S at ~36R:~%  ~{ ~S~}~%" id (get-universal-time) commands)
    (format t "~&Done with job ~S at ~36R~%" id (get-universal-time)))
  (write-sequence (format nil "(:done ~S)~%" id) *standard-output*)
  (finish-outputs))