aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/utils
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator/utils')
-rw-r--r--guix-build-coordinator/utils/fibers.scm587
-rw-r--r--guix-build-coordinator/utils/timeout.scm81
2 files changed, 140 insertions, 528 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 450c36b..d836ceb 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,4 +1,5 @@
(define-module (guix-build-coordinator utils fibers)
+ #:use-module (srfi srfi-9)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
@@ -12,305 +13,20 @@
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (fibers conditions)
+ #:use-module (knots timeout)
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
- #:export (make-worker-thread-channel
- %worker-thread-default-timeout
- call-with-worker-thread
- worker-thread-timeout-error?
+ #:export (spawn-port-monitoring-fiber
- call-with-sigint
+ make-discrete-priority-queueing-channels
- run-server/patched
-
- spawn-port-monitoring-fiber
-
- letpar&
-
- port-timeout-error?
- port-read-timeout-error?
- port-write-timeout-error?
- with-fibers-timeout
- with-fibers-port-timeouts
-
- make-queueing-channel
- make-discrete-priority-queueing-channels)
+ make-reusable-condition
+ reusable-condition?
+ signal-reusable-condition!
+ reusable-condition-wait)
#:replace (retry-on-error))
-(define %worker-thread-args
- (make-parameter #f))
-
-(define* (make-worker-thread-channel initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- (duration-logger (const #f))
- destructor
- lifetime
- (log-exception? (const #t))
- (expire-on-exception? #f)
- (name "unnamed"))
- "Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
- (define thread-proc-vector
- (make-vector parallelism #f))
-
- (define (initializer/safe)
- (let ((args
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running initializer in worker thread (~A): ~A:\n ~A\n"
- name
- initializer
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- initializer
- (lambda args
- (backtrace))))
- #:unwind? #t)))
-
- (if args
- args
- ;; never give up, just keep retrying
- (begin
- (sleep 1)
- (initializer/safe)))))
-
- (define (destructor/safe args)
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running destructor in worker thread (~A): ~A:\n ~A\n"
- name
- destructor
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply destructor args)
- #t)
- (lambda _
- (backtrace))))
- #:unwind? #t)))
-
- (or success?
- #t
- (begin
- (sleep 1)
- (destructor/safe args)))))
-
- (define (process thread-index channel args)
- (let loop ((current-lifetime lifetime))
- (let ((exception?
- (match (get-message channel)
- (((? channel? reply) sent-time (? procedure? proc))
- (let ((time-delay
- (- (get-internal-real-time)
- sent-time)))
- (delay-logger (/ time-delay
- internal-time-units-per-second))
-
- (let* ((start-time (get-internal-real-time))
- (response
- (with-exception-handler
- (lambda (exn)
- (list 'worker-thread-error
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- exn))
- (lambda ()
- (vector-set! thread-proc-vector
- thread-index
- proc)
- (with-throw-handler #t
- (lambda ()
- (call-with-values
- (lambda ()
- (start-stack
- 'worker-thread
- (apply proc args)))
- (lambda vals
- (cons (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- vals))))
- (lambda args
- (when (match args
- (('%exception exn)
- (log-exception? exn))
- (_ #t))
- (simple-format
- (current-error-port)
- "worker-thread: exception: ~A\n" args)
- (backtrace)))))
- #:unwind? #t)))
- (put-message reply
- response)
-
- (vector-set! thread-proc-vector
- thread-index
- #f)
-
- (match response
- (('worker-thread-error duration _)
- (when duration-logger
- (duration-logger duration proc))
- #t)
- ((duration . _)
- (when duration-logger
- (duration-logger duration proc))
- #f))))))))
- (unless (and expire-on-exception?
- exception?)
- (if (number? current-lifetime)
- (unless (< current-lifetime 0)
- (loop (if current-lifetime
- (- current-lifetime 1)
- #f)))
- (loop #f))))))
-
- (let ((channel (make-channel)))
- (for-each
- (lambda (thread-index)
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append
- name " w t "
- (number->string thread-index))))
- (const #t))
-
- (let init ((args (initializer/safe)))
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "worker-thread-channel: exception: ~A\n" exn))
- (lambda ()
- (parameterize ((%worker-thread-args args))
- (process thread-index channel args)))
- #:unwind? #t)
-
- (when destructor
- (destructor/safe args))
-
- (init (initializer/safe))))))
- (iota parallelism))
-
- (values channel
- thread-proc-vector)))
-
-(define &worker-thread-timeout
- (make-exception-type '&worker-thread-timeout
- &error
- '()))
-
-(define make-worker-thread-timeout-error
- (record-constructor &worker-thread-timeout))
-
-(define worker-thread-timeout-error?
- (record-predicate &worker-thread-timeout))
-
-(define %worker-thread-default-timeout
- (make-parameter 30))
-
-(define* (call-with-worker-thread channel proc #:key duration-logger
- (timeout (%worker-thread-default-timeout)))
- "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
-If already in the worker thread, call PROC immediately."
- (let ((args (%worker-thread-args)))
- (if args
- (call-with-delay-logging proc #:args args)
- (let* ((reply (make-channel))
- (operation-success?
- (perform-operation
- (let ((put
- (wrap-operation
- (put-operation channel
- (list reply
- (get-internal-real-time)
- proc))
- (const #t))))
-
- (if timeout
- (choice-operation
- put
- (wrap-operation (sleep-operation timeout)
- (const #f)))
- put)))))
-
- (unless operation-success?
- (raise-exception
- (make-worker-thread-timeout-error)))
-
- (match (get-message reply)
- (('worker-thread-error duration exn)
- (when duration-logger
- (duration-logger duration))
- (raise-exception exn))
- ((duration . result)
- (when duration-logger
- (duration-logger duration))
- (apply values result)))))))
-
-;; Copied from (fibers web server)
-(define (call-with-sigint thunk cvar)
- (let ((handler #f))
- (dynamic-wind
- (lambda ()
- (set! handler
- (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
- thunk
- (lambda ()
- (if handler
- ;; restore Scheme handler, SIG_IGN or SIG_DFL.
- (sigaction SIGINT (car handler) (cdr handler))
- ;; restore original C handler.
- (sigaction SIGINT #f))))))
-
-;; This variant of run-server from the fibers library supports running
-;; multiple servers within one process.
-(define run-server/patched
- (let ((fibers-web-server-module
- (resolve-module '(fibers web server))))
-
- (define set-nonblocking!
- (module-ref fibers-web-server-module 'set-nonblocking!))
-
- (define make-default-socket
- (module-ref fibers-web-server-module 'make-default-socket))
-
- (define socket-loop
- (module-ref fibers-web-server-module 'socket-loop))
-
- (lambda* (handler
- #:key
- (host #f)
- (family AF_INET)
- (addr (if host
- (inet-pton family host)
- INADDR_LOOPBACK))
- (port 8080)
- (socket (make-default-socket family addr port)))
- ;; We use a large backlog by default. If the server is suddenly hit
- ;; with a number of connections on a small backlog, clients won't
- ;; receive confirmation for their SYN, leading them to retry --
- ;; probably successfully, but with a large latency.
- (listen socket 1024)
- (set-nonblocking! socket)
- (sigaction SIGPIPE SIG_IGN)
- (spawn-fiber (lambda () (socket-loop socket handler))))))
-
(define (spawn-port-monitoring-fiber port error-condition)
(spawn-fiber
(lambda ()
@@ -326,7 +42,7 @@ If already in the worker thread, call PROC immediately."
"port monitoring fiber error-condition unresponsive")
(primitive-exit 1))
(lambda ()
- (with-fibers-port-timeouts
+ (with-port-timeouts
(lambda ()
(let ((sock
(non-blocking-port
@@ -337,218 +53,6 @@ If already in the worker thread, call PROC immediately."
#:unwind? #t)
(sleep 20)))))
-(define (defer-to-fiber thunk)
- (let ((reply (make-channel)))
- (spawn-fiber
- (lambda ()
- (put-message
- reply
- (with-exception-handler
- (lambda (exn)
- (cons 'worker-fiber-error exn))
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "worker fiber: exception: ~A\n"
- exn)
- (backtrace)
- (raise-exception exn))
- (lambda ()
- (call-with-values
- thunk
- (lambda vals
- vals)))))
- #:unwind? #t)))
- #:parallel? #t)
- reply))
-
-(define (fetch-result-of-defered-thunks . reply-channels)
- (let ((responses (map get-message reply-channels)))
- (map
- (match-lambda
- (('worker-thread-error . exn)
- (raise-exception exn))
- (result
- (apply values result)))
- responses)))
-
-(define-syntax parallel-via-fibers
- (lambda (x)
- (syntax-case x ()
- ((_ e0 ...)
- (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
- #'(let ((tmp0 (defer-to-fiber
- (lambda ()
- e0)))
- ...)
- (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
-
-(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
- (call-with-values
- (lambda () (parallel-via-fibers e ...))
- (lambda (v ...)
- b0 b1 ...)))
-
-(define* (with-fibers-timeout thunk #:key timeout on-timeout)
- (let ((channel (make-channel)))
- (spawn-fiber
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (perform-operation
- (choice-operation
- (put-operation channel (cons 'exception exn))
- (sleep-operation timeout))))
- (lambda ()
- (call-with-values thunk
- (lambda vals
- (perform-operation
- (choice-operation
- (put-operation channel vals)
- (sleep-operation timeout))))))
- #:unwind? #t)))
-
- (match (perform-operation
- (choice-operation
- (get-operation channel)
- (wrap-operation (sleep-operation timeout)
- (const 'timeout))))
- ('timeout
- (on-timeout))
- (('exception . exn)
- (raise-exception exn))
- (vals
- (apply values vals)))))
-
-(define &port-timeout
- (make-exception-type '&port-timeout
- &external-error
- '(thunk port)))
-
-(define make-port-timeout-error
- (record-constructor &port-timeout))
-
-(define port-timeout-error?
- (record-predicate &port-timeout))
-
-(define &port-read-timeout
- (make-exception-type '&port-read-timeout
- &port-timeout
- '()))
-
-(define make-port-read-timeout-error
- (record-constructor &port-read-timeout))
-
-(define port-read-timeout-error?
- (record-predicate &port-read-timeout))
-
-(define &port-write-timeout
- (make-exception-type '&port-write-timeout
- &port-timeout
- '()))
-
-(define make-port-write-timeout-error
- (record-constructor &port-write-timeout))
-
-(define port-write-timeout-error?
- (record-predicate &port-write-timeout))
-
-;; These procedure are subject to spurious wakeups.
-
-(define (readable? port)
- "Test if PORT is writable."
- (= 1 (port-poll port "r" 0)))
-
-(define (writable? port)
- "Test if PORT is writable."
- (= 1 (port-poll port "w" 0)))
-
-(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
- (make-base-operation #f
- (lambda _
- (and (ready? port) values))
- (lambda (flag sched resume)
- (define (commit)
- (match (atomic-box-compare-and-swap! flag 'W 'S)
- ('W (resume values))
- ('C (commit))
- ('S #f)))
- (schedule-when-ready
- sched (port-ready-fd port) commit))))
-
-(define (wait-until-port-readable-operation port)
- "Make an operation that will succeed when PORT is readable."
- (unless (input-port? port)
- (error "refusing to wait forever for input on non-input port"))
- (make-wait-operation readable? schedule-task-when-fd-readable port
- port-read-wait-fd
- wait-until-port-readable-operation))
-
-(define (wait-until-port-writable-operation port)
- "Make an operation that will succeed when PORT is writable."
- (unless (output-port? port)
- (error "refusing to wait forever for output on non-output port"))
- (make-wait-operation writable? schedule-task-when-fd-writable port
- port-write-wait-fd
- wait-until-port-writable-operation))
-
-(define* (with-fibers-port-timeouts thunk
- #:key timeout
- (read-timeout timeout)
- (write-timeout timeout))
- (define (no-fibers-wait thunk port mode timeout)
- (define poll-timeout-ms 200)
-
- ;; When the GC runs, it restarts the poll syscall, but the timeout
- ;; remains unchanged! When the timeout is longer than the time
- ;; between the syscall restarting, I think this renders the
- ;; timeout useless. Therefore, this code uses a short timeout, and
- ;; repeatedly calls poll while watching the clock to see if it has
- ;; timed out overall.
- (let ((timeout-internal
- (+ (get-internal-real-time)
- (* internal-time-units-per-second timeout))))
- (let loop ((poll-value
- (port-poll port mode poll-timeout-ms)))
- (if (= poll-value 0)
- (if (> (get-internal-real-time)
- timeout-internal)
- (raise-exception
- (if (string=? mode "r")
- (make-port-read-timeout-error thunk port)
- (make-port-write-timeout-error thunk port)))
- (loop (port-poll port mode poll-timeout-ms)))
- poll-value))))
-
- (parameterize
- ((current-read-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-readable-operation port)
- (wrap-operation
- (sleep-operation read-timeout)
- (lambda ()
- (raise-exception
- (make-port-read-timeout-error thunk port))))))
- (no-fibers-wait thunk port "r" read-timeout))))
- (current-write-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-writable-operation port)
- (wrap-operation
- (sleep-operation write-timeout)
- (lambda ()
- (raise-exception
- (make-port-write-timeout-error thunk port))))))
- (no-fibers-wait thunk port "w" write-timeout)))))
- (thunk)))
-
;; Use the fibers sleep
(define (retry-on-error . args)
(apply
@@ -557,28 +61,6 @@ If already in the worker thread, call PROC immediately."
args
(list #:sleep-impl sleep))))
-(define (make-queueing-channel channel)
- (define queue (make-q))
-
- (let ((queue-channel (make-channel)))
- (spawn-fiber
- (lambda ()
- (while #t
- (if (q-empty? queue)
- (enq! queue
- (perform-operation
- (get-operation queue-channel)))
- (let ((front (q-front queue)))
- (perform-operation
- (choice-operation
- (wrap-operation (get-operation queue-channel)
- (lambda (val)
- (enq! queue val)))
- (wrap-operation (put-operation channel front)
- (lambda _
- (q-pop! queue))))))))))
- queue-channel))
-
(define (make-discrete-priority-queueing-channels channel num-priorities)
(define all-queues
(map (lambda _ (make-q))
@@ -588,6 +70,11 @@ If already in the worker thread, call PROC immediately."
(map (lambda _ (make-channel))
(iota num-priorities)))
+ (define (stats)
+ (map (lambda (queue)
+ `((length . ,(q-length queue))))
+ all-queues))
+
(spawn-fiber
(lambda ()
(while #t
@@ -620,4 +107,48 @@ If already in the worker thread, call PROC immediately."
(enq! queue val))))
all-queues
queue-channels)))))))))))
- (apply values queue-channels))
+ (values (list-copy queue-channels)
+ stats))
+
+(define-record-type <reusable-condition>
+ (%make-reusable-condition atomic-box channel)
+ reusable-condition?
+ (atomic-box reusable-condition-atomic-box)
+ (channel reusable-condition-channel))
+
+(define (make-reusable-condition)
+ (%make-reusable-condition (make-atomic-box #f)
+ (make-channel)))
+
+(define* (signal-reusable-condition! reusable-condition
+ #:optional (scheduler (current-scheduler)))
+ (match (atomic-box-compare-and-swap!
+ (reusable-condition-atomic-box reusable-condition)
+ #f
+ #t)
+ (#f
+ (spawn-fiber
+ (lambda ()
+ (put-message (reusable-condition-channel reusable-condition)
+ #t))
+ scheduler)
+ #t)
+ (#t #f)))
+
+(define* (reusable-condition-wait reusable-condition
+ #:key (timeout #f))
+ (let ((val
+ (if (atomic-box-ref (reusable-condition-atomic-box reusable-condition))
+ #t
+ ;; Not great as this is subject to race conditions, but it should
+ ;; roughly work
+ (if timeout
+ (perform-operation
+ (choice-operation
+ (get-operation (reusable-condition-channel reusable-condition))
+ (wrap-operation (sleep-operation timeout)
+ (const #f))))
+ (get-message (reusable-condition-channel reusable-condition))))))
+ (atomic-box-set! (reusable-condition-atomic-box reusable-condition)
+ #f)
+ val))
diff --git a/guix-build-coordinator/utils/timeout.scm b/guix-build-coordinator/utils/timeout.scm
new file mode 100644
index 0000000..bb133d7
--- /dev/null
+++ b/guix-build-coordinator/utils/timeout.scm
@@ -0,0 +1,81 @@
+(define-module (guix-build-coordinator utils timeout)
+ #:use-module (ice-9 exceptions)
+ #:use-module (ice-9 suspendable-ports)
+ #:use-module ((ice-9 ports internal) #:select (port-poll))
+ #:export (&port-timeout
+ &port-read-timeout
+ &port-write-timeout
+
+ port-timeout-error?
+ port-read-timeout-error?
+ port-write-timeout-error?
+
+ with-port-timeouts))
+
+(define &port-timeout
+ (make-exception-type '&port-timeout
+ &external-error
+ '(port)))
+
+(define make-port-timeout-error
+ (record-constructor &port-timeout))
+
+(define port-timeout-error?
+ (record-predicate &port-timeout))
+
+(define &port-read-timeout
+ (make-exception-type '&port-read-timeout
+ &port-timeout
+ '()))
+
+(define make-port-read-timeout-error
+ (record-constructor &port-read-timeout))
+
+(define port-read-timeout-error?
+ (record-predicate &port-read-timeout))
+
+(define &port-write-timeout
+ (make-exception-type '&port-write-timeout
+ &port-timeout
+ '()))
+
+(define make-port-write-timeout-error
+ (record-constructor &port-write-timeout))
+
+(define port-write-timeout-error?
+ (record-predicate &port-write-timeout))
+
+(define* (with-port-timeouts thunk #:key timeout)
+
+ ;; When the GC runs, it restarts the poll syscall, but the timeout remains
+ ;; unchanged! When the timeout is longer than the time between the syscall
+ ;; restarting, I think this renders the timeout useless. Therefore, this
+ ;; code uses a short timeout, and repeatedly calls poll while watching the
+ ;; clock to see if it has timed out overall.
+ (define poll-timeout-ms 200)
+
+ (define (wait port mode)
+ (let ((timeout-internal
+ (+ (get-internal-real-time)
+ (* internal-time-units-per-second timeout))))
+ (let loop ((poll-value
+ (port-poll port mode poll-timeout-ms)))
+ (if (= poll-value 0)
+ (if (> (get-internal-real-time)
+ timeout-internal)
+ (raise-exception
+ (if (string=? mode "r")
+ (make-port-read-timeout-error port)
+ (make-port-write-timeout-error port)))
+ (loop (port-poll port mode poll-timeout-ms)))
+ poll-value))))
+
+ (parameterize
+ ((current-read-waiter
+ (lambda (port)
+ (wait port "r")))
+ (current-write-waiter
+ (lambda (port)
+ (wait port "w"))))
+ (thunk)))
+