diff options
author | Christopher Baines <mail@cbaines.net> | 2023-09-09 12:33:05 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2023-09-12 13:11:00 +0100 |
commit | 3b996e52e36a574451cb5a5860116a638062db09 (patch) | |
tree | afe6949bc203d18aa603a9d931be8bc0a45dc2f5 | |
parent | c72e141eb45323625d52c505cdf1edccec6fcfc8 (diff) | |
download | nar-herder-3b996e52e36a574451cb5a5860116a638062db09.tar nar-herder-3b996e52e36a574451cb5a5860116a638062db09.tar.gz |
Add new fibers timeout utils
This depends on a tweaked version of (fibers io-wakeup), which calls
select of the file descriptors, rather than the ports.
-rw-r--r-- | guix-dev.scm | 2 | ||||
-rw-r--r-- | nar-herder/utils.scm | 157 |
2 files changed, 156 insertions, 3 deletions
diff --git a/guix-dev.scm b/guix-dev.scm index dd5ba72..61e282c 100644 --- a/guix-dev.scm +++ b/guix-dev.scm @@ -53,7 +53,7 @@ (inputs `(("guix" ,guix) ("guile-json" ,guile-json-4) - ("guile-fibers" ,guile-fibers) + ("guile-fibers" ,guile-fibers-1.3) ("guile-gcrypt" ,guile-gcrypt) ("guile-readline" ,guile-readline) ("guile-lzlib" ,guile-lzlib) diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index b841d83..371584e 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -25,21 +25,26 @@ #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) + #:use-module (ice-9 atomic) #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (ice-9 suspendable-ports) - #:use-module ((ice-9 ports internal) #:select (port-poll)) + #:use-module ((ice-9 ports internal) #:select (port-read-wait-fd + port-write-wait-fd)) #:use-module (web uri) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (fibers) + #:use-module (fibers timers) #:use-module (fibers channels) + #:use-module (fibers scheduler) #:use-module (fibers conditions) + #:use-module (fibers operations) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:export (make-worker-thread-set @@ -57,7 +62,14 @@ open-socket-for-uri* call-with-sigint - run-server/patched)) + run-server/patched + + timeout-error? + with-fibers-timeout + + port-read-timeout-error? + port-write-timeout-error? + with-fibers-port-timeouts)) (define* (retry-on-error f #:key times delay ignore) (let loop ((attempt 1)) @@ -627,3 +639,144 @@ If already in the worker thread, call PROC immediately." (sigaction SIGPIPE SIG_IGN) (spawn-fiber (lambda () (socket-loop socket handler)))))) +(define &timeout + (make-exception-type '&timeout + &external-error + '(thunk))) + +(define make-timeout-error + (record-constructor &timeout)) + +(define timeout-error? + (record-predicate &timeout)) + +(define* (with-fibers-timeout thunk #:key timeout) + ;; Maybe there's a way of doing this directly with operations, and + ;; without the channel and fiber? + (let ((channel (make-channel))) + (spawn-fiber + (lambda () + (call-with-values + (thunk) + (lambda vals + (perform-operation + (choice-operation + (put-operation channel vals) + ;; I don't know if this is useful to avoid just forever + ;; waiting to write to channel, but maybe it is + (sleep-operation timeout))))))) + (perform-operation + (choice-operation + (wrap-operation + (get-operation channel) + apply) + (wrap-operation + (sleep-operation timeout) + (lambda () + (raise-exception + (make-timeout-error thunk)))))))) + +;; These procedure are subject to spurious wakeups. + +(define (readable? port) + "Test if PORT is writable." + (match (select (vector port) #() #() 0) + ((#() #() #()) #f) + ((#(_) #() #()) #t))) + +(define (writable? port) + "Test if PORT is writable." + (match (select #() (vector port) #() 0) + ((#() #() #()) #f) + ((#() #(_) #()) #t))) + +(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) + (make-base-operation #f + (lambda _ + (and (ready? (port-ready-fd port)) values)) + (lambda (flag sched resume) + (define (commit) + (match (atomic-box-compare-and-swap! flag 'W 'S) + ('W (resume values)) + ('C (commit)) + ('S #f))) + (schedule-when-ready + sched (port-ready-fd port) commit)))) + +(define (wait-until-port-readable-operation port) + "Make an operation that will succeed when PORT is readable." + (unless (input-port? port) + (error "refusing to wait forever for input on non-input port")) + (make-wait-operation readable? schedule-task-when-fd-readable port + port-read-wait-fd + wait-until-port-readable-operation)) + +(define (wait-until-port-writable-operation port) + "Make an operation that will succeed when PORT is writable." + (unless (output-port? port) + (error "refusing to wait forever for output on non-output port")) + (make-wait-operation writable? schedule-task-when-fd-writable port + port-write-wait-fd + wait-until-port-writable-operation)) + + + +(define &port-timeout + (make-exception-type '&port-timeout + &external-error + '(port))) + +(define make-port-timeout-error + (record-constructor &port-timeout)) + +(define port-timeout-error? + (record-predicate &port-timeout)) + +(define &port-read-timeout + (make-exception-type '&port-read-timeout + &port-timeout + '())) + +(define make-port-read-timeout-error + (record-constructor &port-read-timeout)) + +(define port-read-timeout-error? + (record-predicate &port-read-timeout)) + +(define &port-write-timeout + (make-exception-type '&port-write-timeout + &port-timeout + '())) + +(define make-port-write-timeout-error + (record-constructor &port-write-timeout)) + +(define port-write-timeout-error? + (record-predicate &port-write-timeout)) + +(define* (with-fibers-port-timeouts thunk + #:key timeout + (read-timeout timeout) + (write-timeout timeout)) + (parameterize + ((current-read-waiter + (lambda (port) + (perform-operation + (choice-operation + (wait-until-port-readable-operation port) + (wrap-operation + (sleep-operation read-timeout) + (lambda () + (raise-exception + (make-port-read-timeout-error thunk port)))))))) + (current-write-waiter + (lambda (port) + (perform-operation + (choice-operation + (wait-until-port-writable-operation port) + (wrap-operation + (sleep-operation write-timeout) + (lambda () + (raise-exception + (make-port-write-timeout-error thunk port))))))))) + (thunk))) |