Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
;;;; Trivial Job Management
(in-package :philip-jose)
#|
Advice from #lisp: maybe use cells or computed-class -- but not really
http://common-lisp.net/project/cells/
http://common-lisp.net/project/computed-class/
lemonodor: Jim Firby's RAP (not free software, but a good read)
http://people.cs.uchicago.edu/~firby/raps/index.shtml
Proper interface:
a little language to dynamically build a dependency graph.
A node remembers those existing nodes that still need to be completed
before that node may be undertaken.
When a node is completed, its dependencies are propagated,
which may themselves generate more jobs...
And the language is Lisp, thanks to CALL/CC!
|#
;;; Global lock to protect state (not used at this time)
#|
(defparameter *tracker-lock* (make-lock)
"global lock for multithreaded servers")
(defmacro with-tracker-lock (() &body body)
`(with-lock-held (*tracker-lock*)
,@body))
|#
;;; Workers have an id, a known status
(defvar *current-worker*
nil
"the worker making current request")
(defparameter *registered-workers*
(make-hash-table :test 'equal)
"workers that have registered a job")
(defun register-worker (id &optional (status :working))
(Setf (gethash id *registered-workers*) status))
(defun unregister-worker (id)
(remhash id *registered-workers*))
;;; Your every worker job
(defclass worker-job (simple-print-object-mixin)
((id :accessor job-id :initarg :id :documentation
"nonce to identify the job")
(description :accessor job-description :initarg :description :documentation
"description of the job, a SEXP to be transmitted to the worker")
(status :accessor job-status :initarg :status :initform nil :documentation
"Status of the job, NIL if to be dispatched, T if done, a worker ID if being done")
(validator :accessor job-validator :initarg :validator :initform t :documentation
"thunk to validate whether a job is still needed by the time a worker is ready to claim it.")
(on-claim :accessor job-on-claim :initarg :on-claim :initform nil :documentation
"hook thunk to call when a worker claims the job")
(on-completion :accessor job-on-completion :initarg :on-completion :documentation
"continuation to call when a job returns")))
(defparameter *worker-job-counter* 0)
(defun make-worker-job-id ()
`(,@*id* :worker-job ,(incf *worker-job-counter*)))
(defvar *current-worker-job*
nil
"the worker job currently examined")
;;; Worker jobs can be scheduled or claimed
(defparameter *claimed-worker-jobs*
(make-hash-table :test 'equal)
"jobs claimed by some worker")
(defun claim-job (job &optional (worker *current-worker*))
(setf (gethash (job-id job) *claimed-worker-jobs*) job)
(setf (job-status job) (list worker (get-real-time)))
(notify-claimed-job job))
(defun notify-claimed-job (job)
(funkall (job-on-claim job)))
(defun notify-completed-job (job &rest results)
(setf (job-status job) :completed)
(apply #'funkall (job-on-completion job) results))
(defun unclaim-job (job)
(setf (job-status job) '(:unclaimed))
(remhash (job-id job) *claimed-worker-jobs*)
nil)
(defparameter *scheduled-worker-jobs*
(make-fifo)
"jobs that need to be executed by some worker")
(defun format-job (stream job)
"Format only the interesting slots of job to stream."
(format stream "#<job ~d ~A>"
(job-id job)
(job-description job)))
(defun enqueue-worker-job (job)
(logger "~&Enqueueing ~A" (format-job nil job))
(fifo-enqueue job *scheduled-worker-jobs*)
job)
(defun issue-worker-job (description &rest keys
&key validator on-claim on-completion)
(declare (ignore validator on-claim on-completion))
(enqueue-worker-job
(apply #'make-instance 'worker-job
:id (make-worker-job-id)
:description description
keys)))
(defun get-next-worker-job ()
(loop until (fifo-empty-p *scheduled-worker-jobs*)
do (let ((job (fifo-dequeue *scheduled-worker-jobs*)))
(when (validate-job job)
(logger "~&Issuing ~A" (job-id job))
(return job)))))
;;; Global defaults when looking for a job
(defun get-next-worker-job-specification ()
(if-bind job (get-next-worker-job)
(progn
(claim-job job)
(job-specification job))
(no-job-to-do)))
(defun job-specification (job)
`(:job :id ,(job-id job) :description ,(job-description job)))
(defparameter *all-done*
nil
"when everything is said and done, we'll tell each worker to die in peace")
(defun no-job-to-do ()
(if *all-done*
(suicide-job)
(sleep-job)))
(defun suicide-job (&optional (worker *current-worker*))
(unregister-worker worker)
'(:die))
(defun sleep-job ()
`(:sleep ,*sleep-delay*))
(defgeneric validate-job (job)
(:method ((x t))
t))
(defmethod validate-job ((job worker-job))
(funkall (or (job-validator job) t)))
(defun hand-job-over-to-worker (job &optional (worker *current-worker*))
(with-accessors ((id job-id) (desc job-description)
(on-claim job-on-claim) (on-completion job-on-completion))
job
(register-worker worker id)
(claim-job id worker)))
(defun-request-handler worker-request (id &rest keys &key completed results error)
(logger "~&Got worker-request from~{ ~A~}, with keys ~S" id keys)
(when error
(logger "~&Error from worker~{ ~A~}: ~A" id error))
(let ((*current-worker* id))
(when completed
(job-done completed id results))
(reply (get-next-worker-job-specification))))
(defun job-done (job-id worker results)
(if-bind job (gethash job-id *claimed-worker-jobs*)
(destructuring-bind (claimant start-time) (job-status job)
(if (equal worker claimant)
(logger "~&Job ~D completed by worker ~S in ~D seconds"
(job-id job) worker
(- (get-real-time) start-time))
(logger "~&Worker ~S announced he completed job ~A, originally claimed by ~S"
worker (format-job nil job) claimant))
(unclaim-job job)
(register-worker worker :idle)
(apply #'notify-completed-job job results))
(logger "~&Worker ~S announced completion of job ~A, that doesn't exist (anymore?)"
worker (format-job nil job))))
(defun-request-handler show-job-status ()
(reply* :all-done *all-done* :claimed-worker-jobs (hash-table->alist *claimed-worker-jobs*)))
(defun notify-worker-timeout (worker &aux (pity nil))
;; In this Trotskyist/Guevarist approach to management, we simply kill those
;; counter-revolutionary saboteurs who are late at delivering the results
;; expected by the manager. Oh, of course, being good socialists, we never
;; kill any worker. Mind you, the miserable insect is unregistered first, so
;; that not being a worker anymore, it is killed without spilling sacred
;; proletarian blood. Insects are worthless, and multiply anyway.
(unregister-worker worker)
(destructuring-bind (insect without rights) worker
(declare (ignore rights))
(kill-machine-process insect without) pity
(spawn-client insect)))
;;; Advanced Control Flow
(exporting-definitions
(defvar *retry-on-timeout* nil)
(defun/cc issue-worker-job/cc (description &key validator on-claim on-completion
retry-on-timeout)
"Issue one worker job"
(if retry-on-timeout
(loop
for attempts from 1 do
(let ((guard t) (worker nil))
(multiple-value-bind (successp &optional results)
(with-local-task-competition (c)
(issue-worker-job
description
:validator (lambda ()
(and guard (funkall (or validator t))))
:on-claim (lambda ()
(funkall
(prog1 on-claim
(setf on-claim nil
worker *current-worker*)
(local-task-competitor (c) (sleep/cc retry-on-timeout) nil))))
:on-completion (lambda (&rest results)
(setf worker nil)
(maybe-win-local-task-competition c t results))))
;;(DBG :gah guard successp results attempts)
(setf guard nil)
(if successp
(return (apply on-completion results))
(when worker
(logger "~&Timed out:~{ ~A~}" worker)
(notify-worker-timeout worker))))))
(issue-worker-job description
:validator validator
:on-claim on-claim
:on-completion on-completion)))
(defun/cc issue-sequential-job (description &key retry-on-timeout validator on-claim on-completion)
"Issue a job. When the job is completed, return the values returned by the worker."
(let/cc k
(issue-worker-job/cc
description
:retry-on-timeout retry-on-timeout
:validator validator
:on-claim on-claim
:on-completion (if on-completion
(lambda (&rest results)
(apply #'funkall on-completion results)
(apply #'kall k results))
k))))
(defun/cc issue-sequential-job-with-timeout
(description &key timeout retry-on-timeout validator on-claim on-completion)
"Issue a job. If the job is completed before the timeout, return T and the values returned by the worker,
otherwise, return NIL"
(let ((guard t)
(worker nil))
(multiple-value-bind (successp &optional results)
(with-local-timeout (timeout)
(let/cc k
(issue-worker-job/cc
description
:retry-on-timeout retry-on-timeout
:validator (lambda () (and guard (funkall (or validator t))))
:on-claim (lambda () (setf worker *current-worker*) (funkall on-claim))
:on-completion (lambda (&rest results)
(setf worker nil)
(apply #'funkall on-completion results)
(kall k t results)))))
(setf guard nil)
(when worker
(notify-worker-timeout worker))
(apply #'values successp results))))
;;; Jobs to be run in parallel of each other.
(defun/cc default-reducer (accumulator primary-value &rest other-values)
(declare (ignore other-values))
(cons primary-value accumulator))
(defun/cc call-with-parallel-job-issuer (thunk &key
(reducer #'default-reducer)
(initial-value nil))
(let/cc k
(let ((remaining-jobs 0)
(accumulator initial-value)
(thunk-done nil))
(labels ((maybe-exit ()
(when (and thunk-done (zerop remaining-jobs))
(kall k accumulator)))
(one-less (values)
(setf accumulator (apply reducer accumulator values))
(decf remaining-jobs)
(maybe-exit))
(one-more (description &key retry-on-timeout on-claim on-completion)
;; should we error out when thunk-done is true,
;; or is it OK for jobs or escaped continuations
;; to cause more jobs to be queued?
(let/cc k
(incf remaining-jobs)
(issue-worker-job/cc
description
:retry-on-timeout retry-on-timeout
:on-claim (lambda ()
(funkall on-claim)
(kall k))
:on-completion (lambda (&rest values)
(apply #'funkall on-completion values)
(one-less values))))))
(funcall thunk #'one-more)
(setf thunk-done t)
(maybe-exit)))))
(defparameter *parallel-job-issuer* nil)
(defun/cc issue-parallel-job (description
&key retry-on-timeout on-claim on-completion
(issuer *parallel-job-issuer*))
(unless issuer
(error "No parallel job issuer for job ~S" description))
(funcall issuer description
:retry-on-timeout retry-on-timeout :on-claim on-claim :on-completion on-completion))
(defmacro with-parallel-jobs ((issuer &rest keys &key reducer initial-value) &body body)
"BODY will be executed where ISSUER is lexically bound and fbound
(or if it is NIL, *PARALLEL-JOB-ISSUER* dynamically bound) to a function that will
issue jobs to be run in parallel then pause until the job is claimed by a worker.
When a worker returns, the REDUCER is applied to the accumulator and the values
provided by the worker, where the accumulator is initialized with INITIAL-VALUE.
When the body is fully evaluated and all the jobs issued so far have returned,
then the value of the accumulator is returned.
The default INITIAL-VALUE is NIL, and the default REDUCER is DEFAULT-REDUCER, which
conses the primary value returned by each job into a list ordered by last-returned first.
If you just want to drop those values, use (CONSTANTLY NIL) as the REDUCER."
(declare (ignore reducer initial-value))
(with-gensyms (args)
`(call-with-parallel-job-issuer
(lambda (,(or issuer '*parallel-job-issuer*))
(labels (,@(when issuer
`(,issuer (&rest ,args)
(apply ,issuer ,args))))
,@body))
,@keys)))
)