aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2024-01-19 10:39:45 +0000
committerChristopher Baines <mail@cbaines.net>2024-01-19 10:41:51 +0000
commitdc04b747048638a753bd044646306fcdd33c241a (patch)
treec1b3a65f0e3e05a527f81715a2af0d15c47035eb
parenta9fccb34184c91f5ef29ed0de69185e7191e9a9e (diff)
downloadbuild-coordinator-dc04b747048638a753bd044646306fcdd33c241a.tar
build-coordinator-dc04b747048638a753bd044646306fcdd33c241a.tar.gz
Add inbuilt port monitoring
There seems to be some issue which can lead to the client and agent ports no longer being listened on. I've got no idea how to track this down, so just try and monitor for it and kill the process if it happens.
-rw-r--r--guix-build-coordinator/coordinator.scm7
-rw-r--r--guix-build-coordinator/utils/fibers.scm162
2 files changed, 168 insertions, 1 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index ca579f2..14dd160 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -577,6 +577,13 @@
(uri-port client-communication-uri)
build-coordinator)
+ ;; Guile seems to just stop listening on ports, so try to
+ ;; monitor that internally and just quit if it happens
+ (spawn-port-monitoring-fiber (uri-port agent-communication-uri)
+ finished?)
+ (spawn-port-monitoring-fiber (uri-port client-communication-uri)
+ finished?)
+
(wait finished?))
#:hz 10
#:parallelism 2))
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index dc3b92e..938d06f 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,9 +1,14 @@
(define-module (guix-build-coordinator utils fibers)
#:use-module (ice-9 match)
+ #:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
+ #:use-module (ice-9 exceptions)
+ #:use-module (ice-9 ports internal)
+ #:use-module (ice-9 suspendable-ports)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
+ #:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (fibers conditions)
#:use-module ((guix build syscalls)
@@ -17,9 +22,12 @@
run-server/patched
+ spawn-port-monitoring-fiber
+
letpar&
- with-fibers-timeout))
+ with-fibers-timeout
+ with-fibers-port-timeouts))
(define %worker-thread-args
(make-parameter #f))
@@ -280,6 +288,26 @@ If already in the worker thread, call PROC immediately."
(sigaction SIGPIPE SIG_IGN)
(spawn-fiber (lambda () (socket-loop socket handler))))))
+(define (spawn-port-monitoring-fiber port error-condition)
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "port monitoring fiber failed to connect to ~A: ~A\n"
+ port exn)
+ (signal-condition! error-condition))
+ (lambda ()
+ (with-fibers-port-timeouts
+ (lambda ()
+ (let ((sock (socket PF_INET SOCK_STREAM 0)))
+ (connect sock AF_INET INADDR_LOOPBACK port)
+ (close-port sock)))
+ #:timeout 20))
+ #:unwind? #t)
+ (sleep 20)))))
+
(define (defer-to-fiber thunk)
(let ((reply (make-channel)))
(spawn-fiber
@@ -364,3 +392,135 @@ If already in the worker thread, call PROC immediately."
(raise-exception exn))
(vals
(apply values vals)))))
+
+(define &port-timeout
+ (make-exception-type '&port-timeout
+ &external-error
+ '(port)))
+
+(define make-port-timeout-error
+ (record-constructor &port-timeout))
+
+(define port-timeout-error?
+ (record-predicate &port-timeout))
+
+(define &port-read-timeout
+ (make-exception-type '&port-read-timeout
+ &port-timeout
+ '()))
+
+(define make-port-read-timeout-error
+ (record-constructor &port-read-timeout))
+
+(define port-read-timeout-error?
+ (record-predicate &port-read-timeout))
+
+(define &port-write-timeout
+ (make-exception-type '&port-write-timeout
+ &port-timeout
+ '()))
+
+(define make-port-write-timeout-error
+ (record-constructor &port-write-timeout))
+
+(define port-write-timeout-error?
+ (record-predicate &port-write-timeout))
+
+;; These procedure are subject to spurious wakeups.
+
+(define (readable? port)
+ "Test if PORT is writable."
+ (match (select (vector port) #() #() 0)
+ ((#() #() #()) #f)
+ ((#(_) #() #()) #t)))
+
+(define (writable? port)
+ "Test if PORT is writable."
+ (match (select #() (vector port) #() 0)
+ ((#() #() #()) #f)
+ ((#() #(_) #()) #t)))
+
+(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
+ (make-base-operation #f
+ (lambda _
+ (and (ready? (port-ready-fd port)) values))
+ (lambda (flag sched resume)
+ (define (commit)
+ (match (atomic-box-compare-and-swap! flag 'W 'S)
+ ('W (resume values))
+ ('C (commit))
+ ('S #f)))
+ (schedule-when-ready
+ sched (port-ready-fd port) commit))))
+
+(define (wait-until-port-readable-operation port)
+ "Make an operation that will succeed when PORT is readable."
+ (unless (input-port? port)
+ (error "refusing to wait forever for input on non-input port"))
+ (make-wait-operation readable? schedule-task-when-fd-readable port
+ port-read-wait-fd
+ wait-until-port-readable-operation))
+
+(define (wait-until-port-writable-operation port)
+ "Make an operation that will succeed when PORT is writable."
+ (unless (output-port? port)
+ (error "refusing to wait forever for output on non-output port"))
+ (make-wait-operation writable? schedule-task-when-fd-writable port
+ port-write-wait-fd
+ wait-until-port-writable-operation))
+
+(define* (with-fibers-port-timeouts thunk
+ #:key timeout
+ (read-timeout timeout)
+ (write-timeout timeout))
+ (define (no-fibers-wait port mode timeout)
+ (define poll-timeout-ms 200)
+
+ ;; When the GC runs, it restarts the poll syscall, but the timeout
+ ;; remains unchanged! When the timeout is longer than the time
+ ;; between the syscall restarting, I think this renders the
+ ;; timeout useless. Therefore, this code uses a short timeout, and
+ ;; repeatedly calls poll while watching the clock to see if it has
+ ;; timed out overall.
+ (let ((timeout-internal
+ (+ (get-internal-real-time)
+ (* internal-time-units-per-second
+ (/ timeout 1000)))))
+ (let loop ((poll-value
+ (port-poll port mode poll-timeout-ms)))
+ (if (= poll-value 0)
+ (if (> (get-internal-real-time)
+ timeout-internal)
+ (raise-exception
+ (if (string=? mode "r")
+ (make-port-read-timeout-error port)
+ (make-port-write-timeout-error port)))
+ (loop (port-poll port mode poll-timeout-ms)))
+ poll-value))))
+
+ (parameterize
+ ((current-read-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-readable-operation port)
+ (wrap-operation
+ (sleep-operation read-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-read-timeout-error thunk port))))))
+ (no-fibers-wait port "r" read-timeout))))
+ (current-write-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-writable-operation port)
+ (wrap-operation
+ (sleep-operation write-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-write-timeout-error thunk port))))))
+ (no-fibers-wait port "w" write-timeout)))))
+ (thunk)))