summaryrefslogtreecommitdiff
path: root/src/cuirass/utils.scm
diff options
context:
space:
mode:
Diffstat (limited to 'src/cuirass/utils.scm')
-rw-r--r--src/cuirass/utils.scm61
1 files changed, 48 insertions, 13 deletions
diff --git a/src/cuirass/utils.scm b/src/cuirass/utils.scm
index 8eb0ed2..f32e3a1 100644
--- a/src/cuirass/utils.scm
+++ b/src/cuirass/utils.scm
@@ -106,9 +106,19 @@ delimited continuations and fibers."
(make-parameter #f))
(define* (make-worker-thread-channel initializer
- #:key (parallelism 1))
+ #:key
+ (parallelism 1)
+ queue-size
+ (queue-proc (const #t)))
"Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
+arguments of the worker thread procedure. This procedure supports deferring
+work sent to the worker. If QUEUE-SIZE is set, each work query will be
+appended to a queue that will be run once it reaches QUEUE-SIZE elements.
+
+When that happens, the QUEUE-PROC procedure is called with %WORKER-THREAD-ARGS
+and a procedure running the queued work as arguments. The worker thread can
+be passed options. When #:FORCE? option is set, the worker runs the sent work
+immediately even if QUEUE-SIZE has been set."
(parameterize (((@@ (fibers internal) current-fiber) #f))
(let ((channel (make-channel)))
(for-each
@@ -117,16 +127,37 @@ arguments of the worker thread procedure."
(call-with-new-thread
(lambda ()
(parameterize ((%worker-thread-args args))
- (let loop ()
+ (let loop ((queue '()))
(match (get-message channel)
- (((? channel? reply) . (? procedure? proc))
- (put-message reply
- (catch #t
- (lambda ()
- (apply proc args))
- (lambda (key . args)
- (cons* 'worker-thread-error key args))))))
- (loop)))))))
+ (((? channel? reply) options (? procedure? proc))
+ (put-message
+ reply
+ (catch #t
+ (lambda ()
+ (cond
+ ((or (not queue-size)
+ (assq-ref options #:force?))
+ (apply proc args))
+ (else
+ (length queue))))
+ (lambda (key . args)
+ (cons* 'worker-thread-error key args))))
+ (let ((new-queue
+ (cond
+ ((or (not queue-size)
+ (assq-ref options #:force?))
+ '())
+ ((= (1+ (length queue)) queue-size)
+ (let ((run-queue
+ (lambda ()
+ (for-each (lambda (thunk)
+ (apply thunk args))
+ (append queue (list proc))))))
+ (apply queue-proc (append args (list run-queue)))
+ '()))
+ (else
+ (append queue (list proc))))))
+ (loop new-queue))))))))))
(iota parallelism))
channel)))
@@ -194,6 +225,7 @@ put-operation until it succeeds."
(define* (call-with-worker-thread channel proc
#:key
+ options
send-timeout
send-timeout-proc
receive-timeout
@@ -207,12 +239,15 @@ to a worker thread.
The same goes for RECEIVE-TIMEOUT and RECEIVE-TIMEOUT-PROC, except that the
timer expires if there is no response from the database worker PROC was sent
-to."
+to.
+
+OPTIONS are forwarded to the worker thread. See MAKE-WORKER-THREAD-CHANNEL
+for a description of the supported options."
(let ((args (%worker-thread-args)))
(if args
(apply proc args)
(let* ((reply (make-channel))
- (message (cons reply proc)))
+ (message (list reply options proc)))
(if (and send-timeout (current-fiber))
(put-message-with-timeout channel message
#:seconds send-timeout