aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-09-09 12:33:05 +0100
committerChristopher Baines <mail@cbaines.net>2023-09-12 13:11:00 +0100
commit3b996e52e36a574451cb5a5860116a638062db09 (patch)
treeafe6949bc203d18aa603a9d931be8bc0a45dc2f5
parentc72e141eb45323625d52c505cdf1edccec6fcfc8 (diff)
downloadnar-herder-3b996e52e36a574451cb5a5860116a638062db09.tar
nar-herder-3b996e52e36a574451cb5a5860116a638062db09.tar.gz
Add new fibers timeout utils
This depends on a tweaked version of (fibers io-wakeup), which calls select of the file descriptors, rather than the ports.
-rw-r--r--guix-dev.scm2
-rw-r--r--nar-herder/utils.scm157
2 files changed, 156 insertions, 3 deletions
diff --git a/guix-dev.scm b/guix-dev.scm
index dd5ba72..61e282c 100644
--- a/guix-dev.scm
+++ b/guix-dev.scm
@@ -53,7 +53,7 @@
(inputs
`(("guix" ,guix)
("guile-json" ,guile-json-4)
- ("guile-fibers" ,guile-fibers)
+ ("guile-fibers" ,guile-fibers-1.3)
("guile-gcrypt" ,guile-gcrypt)
("guile-readline" ,guile-readline)
("guile-lzlib" ,guile-lzlib)
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index b841d83..371584e 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -25,21 +25,26 @@
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
+ #:use-module (ice-9 atomic)
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 binary-ports)
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
#:use-module (ice-9 suspendable-ports)
- #:use-module ((ice-9 ports internal) #:select (port-poll))
+ #:use-module ((ice-9 ports internal) #:select (port-read-wait-fd
+ port-write-wait-fd))
#:use-module (web uri)
#:use-module (web http)
#:use-module (web client)
#:use-module (web request)
#:use-module (web response)
#:use-module (fibers)
+ #:use-module (fibers timers)
#:use-module (fibers channels)
+ #:use-module (fibers scheduler)
#:use-module (fibers conditions)
+ #:use-module (fibers operations)
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:export (make-worker-thread-set
@@ -57,7 +62,14 @@
open-socket-for-uri*
call-with-sigint
- run-server/patched))
+ run-server/patched
+
+ timeout-error?
+ with-fibers-timeout
+
+ port-read-timeout-error?
+ port-write-timeout-error?
+ with-fibers-port-timeouts))
(define* (retry-on-error f #:key times delay ignore)
(let loop ((attempt 1))
@@ -627,3 +639,144 @@ If already in the worker thread, call PROC immediately."
(sigaction SIGPIPE SIG_IGN)
(spawn-fiber (lambda () (socket-loop socket handler))))))
+(define &timeout
+ (make-exception-type '&timeout
+ &external-error
+ '(thunk)))
+
+(define make-timeout-error
+ (record-constructor &timeout))
+
+(define timeout-error?
+ (record-predicate &timeout))
+
+(define* (with-fibers-timeout thunk #:key timeout)
+ ;; Maybe there's a way of doing this directly with operations, and
+ ;; without the channel and fiber?
+ (let ((channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (call-with-values
+ (thunk)
+ (lambda vals
+ (perform-operation
+ (choice-operation
+ (put-operation channel vals)
+ ;; I don't know if this is useful to avoid just forever
+ ;; waiting to write to channel, but maybe it is
+ (sleep-operation timeout)))))))
+ (perform-operation
+ (choice-operation
+ (wrap-operation
+ (get-operation channel)
+ apply)
+ (wrap-operation
+ (sleep-operation timeout)
+ (lambda ()
+ (raise-exception
+ (make-timeout-error thunk))))))))
+
+;; These procedure are subject to spurious wakeups.
+
+(define (readable? port)
+ "Test if PORT is writable."
+ (match (select (vector port) #() #() 0)
+ ((#() #() #()) #f)
+ ((#(_) #() #()) #t)))
+
+(define (writable? port)
+ "Test if PORT is writable."
+ (match (select #() (vector port) #() 0)
+ ((#() #() #()) #f)
+ ((#() #(_) #()) #t)))
+
+(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
+ (make-base-operation #f
+ (lambda _
+ (and (ready? (port-ready-fd port)) values))
+ (lambda (flag sched resume)
+ (define (commit)
+ (match (atomic-box-compare-and-swap! flag 'W 'S)
+ ('W (resume values))
+ ('C (commit))
+ ('S #f)))
+ (schedule-when-ready
+ sched (port-ready-fd port) commit))))
+
+(define (wait-until-port-readable-operation port)
+ "Make an operation that will succeed when PORT is readable."
+ (unless (input-port? port)
+ (error "refusing to wait forever for input on non-input port"))
+ (make-wait-operation readable? schedule-task-when-fd-readable port
+ port-read-wait-fd
+ wait-until-port-readable-operation))
+
+(define (wait-until-port-writable-operation port)
+ "Make an operation that will succeed when PORT is writable."
+ (unless (output-port? port)
+ (error "refusing to wait forever for output on non-output port"))
+ (make-wait-operation writable? schedule-task-when-fd-writable port
+ port-write-wait-fd
+ wait-until-port-writable-operation))
+
+
+
+(define &port-timeout
+ (make-exception-type '&port-timeout
+ &external-error
+ '(port)))
+
+(define make-port-timeout-error
+ (record-constructor &port-timeout))
+
+(define port-timeout-error?
+ (record-predicate &port-timeout))
+
+(define &port-read-timeout
+ (make-exception-type '&port-read-timeout
+ &port-timeout
+ '()))
+
+(define make-port-read-timeout-error
+ (record-constructor &port-read-timeout))
+
+(define port-read-timeout-error?
+ (record-predicate &port-read-timeout))
+
+(define &port-write-timeout
+ (make-exception-type '&port-write-timeout
+ &port-timeout
+ '()))
+
+(define make-port-write-timeout-error
+ (record-constructor &port-write-timeout))
+
+(define port-write-timeout-error?
+ (record-predicate &port-write-timeout))
+
+(define* (with-fibers-port-timeouts thunk
+ #:key timeout
+ (read-timeout timeout)
+ (write-timeout timeout))
+ (parameterize
+ ((current-read-waiter
+ (lambda (port)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-readable-operation port)
+ (wrap-operation
+ (sleep-operation read-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-read-timeout-error thunk port))))))))
+ (current-write-waiter
+ (lambda (port)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-writable-operation port)
+ (wrap-operation
+ (sleep-operation write-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-write-timeout-error thunk port)))))))))
+ (thunk)))