diff options
Diffstat (limited to 'src/cuirass/utils.scm')
-rw-r--r-- | src/cuirass/utils.scm | 61 |
1 files changed, 48 insertions, 13 deletions
diff --git a/src/cuirass/utils.scm b/src/cuirass/utils.scm index 8eb0ed2..f32e3a1 100644 --- a/src/cuirass/utils.scm +++ b/src/cuirass/utils.scm @@ -106,9 +106,19 @@ delimited continuations and fibers." (make-parameter #f)) (define* (make-worker-thread-channel initializer - #:key (parallelism 1)) + #:key + (parallelism 1) + queue-size + (queue-proc (const #t))) "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." +arguments of the worker thread procedure. This procedure supports deferring +work sent to the worker. If QUEUE-SIZE is set, each work query will be +appended to a queue that will be run once it reaches QUEUE-SIZE elements. + +When that happens, the QUEUE-PROC procedure is called with %WORKER-THREAD-ARGS +and a procedure running the queued work as arguments. The worker thread can +be passed options. When #:FORCE? option is set, the worker runs the sent work +immediately even if QUEUE-SIZE has been set." (parameterize (((@@ (fibers internal) current-fiber) #f)) (let ((channel (make-channel))) (for-each @@ -117,16 +127,37 @@ arguments of the worker thread procedure." (call-with-new-thread (lambda () (parameterize ((%worker-thread-args args)) - (let loop () + (let loop ((queue '())) (match (get-message channel) - (((? channel? reply) . (? procedure? proc)) - (put-message reply - (catch #t - (lambda () - (apply proc args)) - (lambda (key . args) - (cons* 'worker-thread-error key args)))))) - (loop))))))) + (((? channel? reply) options (? procedure? proc)) + (put-message + reply + (catch #t + (lambda () + (cond + ((or (not queue-size) + (assq-ref options #:force?)) + (apply proc args)) + (else + (length queue)))) + (lambda (key . args) + (cons* 'worker-thread-error key args)))) + (let ((new-queue + (cond + ((or (not queue-size) + (assq-ref options #:force?)) + '()) + ((= (1+ (length queue)) queue-size) + (let ((run-queue + (lambda () + (for-each (lambda (thunk) + (apply thunk args)) + (append queue (list proc)))))) + (apply queue-proc (append args (list run-queue))) + '())) + (else + (append queue (list proc)))))) + (loop new-queue)))))))))) (iota parallelism)) channel))) @@ -194,6 +225,7 @@ put-operation until it succeeds." (define* (call-with-worker-thread channel proc #:key + options send-timeout send-timeout-proc receive-timeout @@ -207,12 +239,15 @@ to a worker thread. The same goes for RECEIVE-TIMEOUT and RECEIVE-TIMEOUT-PROC, except that the timer expires if there is no response from the database worker PROC was sent -to." +to. + +OPTIONS are forwarded to the worker thread. See MAKE-WORKER-THREAD-CHANNEL +for a description of the supported options." (let ((args (%worker-thread-args))) (if args (apply proc args) (let* ((reply (make-channel)) - (message (cons reply proc))) + (message (list reply options proc))) (if (and send-timeout (current-fiber)) (put-message-with-timeout channel message #:seconds send-timeout |