aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/utils/fibers.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator/utils/fibers.scm')
-rw-r--r--guix-build-coordinator/utils/fibers.scm162
1 files changed, 161 insertions, 1 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index dc3b92e..938d06f 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,9 +1,14 @@
(define-module (guix-build-coordinator utils fibers)
#:use-module (ice-9 match)
+ #:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
+ #:use-module (ice-9 exceptions)
+ #:use-module (ice-9 ports internal)
+ #:use-module (ice-9 suspendable-ports)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
+ #:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (fibers conditions)
#:use-module ((guix build syscalls)
@@ -17,9 +22,12 @@
run-server/patched
+ spawn-port-monitoring-fiber
+
letpar&
- with-fibers-timeout))
+ with-fibers-timeout
+ with-fibers-port-timeouts))
(define %worker-thread-args
(make-parameter #f))
@@ -280,6 +288,26 @@ If already in the worker thread, call PROC immediately."
(sigaction SIGPIPE SIG_IGN)
(spawn-fiber (lambda () (socket-loop socket handler))))))
+(define (spawn-port-monitoring-fiber port error-condition)
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "port monitoring fiber failed to connect to ~A: ~A\n"
+ port exn)
+ (signal-condition! error-condition))
+ (lambda ()
+ (with-fibers-port-timeouts
+ (lambda ()
+ (let ((sock (socket PF_INET SOCK_STREAM 0)))
+ (connect sock AF_INET INADDR_LOOPBACK port)
+ (close-port sock)))
+ #:timeout 20))
+ #:unwind? #t)
+ (sleep 20)))))
+
(define (defer-to-fiber thunk)
(let ((reply (make-channel)))
(spawn-fiber
@@ -364,3 +392,135 @@ If already in the worker thread, call PROC immediately."
(raise-exception exn))
(vals
(apply values vals)))))
+
+(define &port-timeout
+ (make-exception-type '&port-timeout
+ &external-error
+ '(port)))
+
+(define make-port-timeout-error
+ (record-constructor &port-timeout))
+
+(define port-timeout-error?
+ (record-predicate &port-timeout))
+
+(define &port-read-timeout
+ (make-exception-type '&port-read-timeout
+ &port-timeout
+ '()))
+
+(define make-port-read-timeout-error
+ (record-constructor &port-read-timeout))
+
+(define port-read-timeout-error?
+ (record-predicate &port-read-timeout))
+
+(define &port-write-timeout
+ (make-exception-type '&port-write-timeout
+ &port-timeout
+ '()))
+
+(define make-port-write-timeout-error
+ (record-constructor &port-write-timeout))
+
+(define port-write-timeout-error?
+ (record-predicate &port-write-timeout))
+
+;; These procedure are subject to spurious wakeups.
+
+(define (readable? port)
+ "Test if PORT is writable."
+ (match (select (vector port) #() #() 0)
+ ((#() #() #()) #f)
+ ((#(_) #() #()) #t)))
+
+(define (writable? port)
+ "Test if PORT is writable."
+ (match (select #() (vector port) #() 0)
+ ((#() #() #()) #f)
+ ((#() #(_) #()) #t)))
+
+(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
+ (make-base-operation #f
+ (lambda _
+ (and (ready? (port-ready-fd port)) values))
+ (lambda (flag sched resume)
+ (define (commit)
+ (match (atomic-box-compare-and-swap! flag 'W 'S)
+ ('W (resume values))
+ ('C (commit))
+ ('S #f)))
+ (schedule-when-ready
+ sched (port-ready-fd port) commit))))
+
+(define (wait-until-port-readable-operation port)
+ "Make an operation that will succeed when PORT is readable."
+ (unless (input-port? port)
+ (error "refusing to wait forever for input on non-input port"))
+ (make-wait-operation readable? schedule-task-when-fd-readable port
+ port-read-wait-fd
+ wait-until-port-readable-operation))
+
+(define (wait-until-port-writable-operation port)
+ "Make an operation that will succeed when PORT is writable."
+ (unless (output-port? port)
+ (error "refusing to wait forever for output on non-output port"))
+ (make-wait-operation writable? schedule-task-when-fd-writable port
+ port-write-wait-fd
+ wait-until-port-writable-operation))
+
+(define* (with-fibers-port-timeouts thunk
+ #:key timeout
+ (read-timeout timeout)
+ (write-timeout timeout))
+ (define (no-fibers-wait port mode timeout)
+ (define poll-timeout-ms 200)
+
+ ;; When the GC runs, it restarts the poll syscall, but the timeout
+ ;; remains unchanged! When the timeout is longer than the time
+ ;; between the syscall restarting, I think this renders the
+ ;; timeout useless. Therefore, this code uses a short timeout, and
+ ;; repeatedly calls poll while watching the clock to see if it has
+ ;; timed out overall.
+ (let ((timeout-internal
+ (+ (get-internal-real-time)
+ (* internal-time-units-per-second
+ (/ timeout 1000)))))
+ (let loop ((poll-value
+ (port-poll port mode poll-timeout-ms)))
+ (if (= poll-value 0)
+ (if (> (get-internal-real-time)
+ timeout-internal)
+ (raise-exception
+ (if (string=? mode "r")
+ (make-port-read-timeout-error port)
+ (make-port-write-timeout-error port)))
+ (loop (port-poll port mode poll-timeout-ms)))
+ poll-value))))
+
+ (parameterize
+ ((current-read-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-readable-operation port)
+ (wrap-operation
+ (sleep-operation read-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-read-timeout-error thunk port))))))
+ (no-fibers-wait port "r" read-timeout))))
+ (current-write-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-writable-operation port)
+ (wrap-operation
+ (sleep-operation write-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-write-timeout-error thunk port))))))
+ (no-fibers-wait port "w" write-timeout)))))
+ (thunk)))