From 638e0442c380f256b4ca08e6144599c66cd1ad29 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 27 Apr 2023 11:49:31 +0200 Subject: Support request timeouts in the thread pool --- guix-data-service/utils.scm | 51 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 12 deletions(-) (limited to 'guix-data-service') diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index c5c89a4..b7124d5 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -32,6 +32,7 @@ prevent-inlining-for-tests thread-pool-channel + thread-pool-request-timeout make-thread-pool-channel parallel-via-thread-pool-channel par-map& @@ -129,30 +130,56 @@ (iota threads)) channel)) +(define &thread-pool-request-timeout + (make-exception-type '&thread-pool-request-timeout + &error + '())) + +(define make-thread-pool-request-timeout-error + (record-constructor &thread-pool-request-timeout)) + +(define thread-pool-request-timeout-error? + (record-predicate &thread-pool-request-timeout)) + (define thread-pool-channel (make-parameter #f)) +(define thread-pool-request-timeout + (make-parameter #f)) + (define (defer-to-thread-pool-channel thunk) (let ((reply (make-channel))) (spawn-fiber (lambda () - (put-message (thread-pool-channel) - (list reply - (get-internal-real-time) - thunk)))) + (let ((val + (perform-operation + (let ((put + (wrap-operation + (put-operation (thread-pool-channel) + (list reply + (get-internal-real-time) + thunk)) + (const 'success)))) + (or + (and=> (thread-pool-request-timeout) + (lambda (timeout) + (choice-operation + put + (wrap-operation (sleep-operation timeout) + (const 'request-timeout))))) + put))))) + (when (eq? val 'request-timeout) + (put-message reply val))))) reply)) -(define (fetch-result-of-defered-thunk reply-channel) - (match (get-message reply-channel) - (('worker-thread-error . exn) - (raise-exception exn)) - (result - (apply values result)))) - (define (fetch-result-of-defered-thunks . reply-channels) - (let ((responses (map get-message reply-channels))) + (let ((responses (map get-message + reply-channels))) (map (match-lambda + ('request-timeout + (raise-exception + (make-thread-pool-request-timeout-error))) (('worker-thread-error . exn) (raise-exception exn)) (result -- cgit v1.2.3