aboutsummaryrefslogtreecommitdiff
path: root/guix-data-service
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-04-27 11:49:31 +0200
committerChristopher Baines <mail@cbaines.net>2023-04-27 14:58:44 +0200
commit638e0442c380f256b4ca08e6144599c66cd1ad29 (patch)
treeb414e777e819e4f8cb1aa0fe7c8a3e0cce5d0b2f /guix-data-service
parent5bb7cf0c1c2e7d9a140c205b7341f91b1021aca5 (diff)
downloaddata-service-638e0442c380f256b4ca08e6144599c66cd1ad29.tar
data-service-638e0442c380f256b4ca08e6144599c66cd1ad29.tar.gz
Support request timeouts in the thread pool
Diffstat (limited to 'guix-data-service')
-rw-r--r--guix-data-service/utils.scm51
1 files changed, 39 insertions, 12 deletions
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index c5c89a4..b7124d5 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -32,6 +32,7 @@
prevent-inlining-for-tests
thread-pool-channel
+ thread-pool-request-timeout
make-thread-pool-channel
parallel-via-thread-pool-channel
par-map&
@@ -129,30 +130,56 @@
(iota threads))
channel))
+(define &thread-pool-request-timeout
+ (make-exception-type '&thread-pool-request-timeout
+ &error
+ '()))
+
+(define make-thread-pool-request-timeout-error
+ (record-constructor &thread-pool-request-timeout))
+
+(define thread-pool-request-timeout-error?
+ (record-predicate &thread-pool-request-timeout))
+
(define thread-pool-channel
(make-parameter #f))
+(define thread-pool-request-timeout
+ (make-parameter #f))
+
(define (defer-to-thread-pool-channel thunk)
(let ((reply (make-channel)))
(spawn-fiber
(lambda ()
- (put-message (thread-pool-channel)
- (list reply
- (get-internal-real-time)
- thunk))))
+ (let ((val
+ (perform-operation
+ (let ((put
+ (wrap-operation
+ (put-operation (thread-pool-channel)
+ (list reply
+ (get-internal-real-time)
+ thunk))
+ (const 'success))))
+ (or
+ (and=> (thread-pool-request-timeout)
+ (lambda (timeout)
+ (choice-operation
+ put
+ (wrap-operation (sleep-operation timeout)
+ (const 'request-timeout)))))
+ put)))))
+ (when (eq? val 'request-timeout)
+ (put-message reply val)))))
reply))
-(define (fetch-result-of-defered-thunk reply-channel)
- (match (get-message reply-channel)
- (('worker-thread-error . exn)
- (raise-exception exn))
- (result
- (apply values result))))
-
(define (fetch-result-of-defered-thunks . reply-channels)
- (let ((responses (map get-message reply-channels)))
+ (let ((responses (map get-message
+ reply-channels)))
(map
(match-lambda
+ ('request-timeout
+ (raise-exception
+ (make-thread-pool-request-timeout-error)))
(('worker-thread-error . exn)
(raise-exception exn))
(result