diff options
Diffstat (limited to 'guix-build-coordinator/utils/fibers.scm')
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 55 |
1 files changed, 44 insertions, 11 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index 5362b18..293429c 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -1,4 +1,5 @@ (define-module (guix-build-coordinator utils fibers) + #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 atomic) #:use-module (ice-9 threads) @@ -15,6 +16,7 @@ #:select (set-thread-name)) #:use-module (guix-build-coordinator utils) #:export (make-worker-thread-channel + %worker-thread-default-timeout call-with-worker-thread worker-thread-timeout-error? @@ -26,8 +28,13 @@ letpar& + port-timeout-error? + port-read-timeout-error? + port-write-timeout-error? with-fibers-timeout - with-fibers-port-timeouts) + with-fibers-port-timeouts + + make-queueing-channel) #:replace (retry-on-error)) (define %worker-thread-args @@ -202,8 +209,11 @@ arguments of the worker thread procedure." (define worker-thread-timeout-error? (record-predicate &worker-thread-timeout)) +(define %worker-thread-default-timeout + (make-parameter 30)) + (define* (call-with-worker-thread channel proc #:key duration-logger - (timeout 30)) + (timeout (%worker-thread-default-timeout))) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." (let ((args (%worker-thread-args))) @@ -306,7 +316,9 @@ If already in the worker thread, call PROC immediately." (lambda () (with-fibers-port-timeouts (lambda () - (let ((sock (socket PF_INET SOCK_STREAM 0))) + (let ((sock + (non-blocking-port + (socket PF_INET SOCK_STREAM 0)))) (connect sock AF_INET INADDR_LOOPBACK port) (close-port sock))) #:timeout 20)) @@ -401,7 +413,7 @@ If already in the worker thread, call PROC immediately." (define &port-timeout (make-exception-type '&port-timeout &external-error - '(port))) + '(thunk port))) (define make-port-timeout-error (record-constructor &port-timeout)) @@ -478,7 +490,7 @@ If already in the worker thread, call PROC immediately." #:key timeout (read-timeout timeout) (write-timeout timeout)) - (define (no-fibers-wait port mode timeout) + (define (no-fibers-wait thunk port mode timeout) (define poll-timeout-ms 200) ;; When the GC runs, it restarts the poll syscall, but the timeout @@ -489,8 +501,7 @@ If already in the worker thread, call PROC immediately." ;; timed out overall. (let ((timeout-internal (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) + (* internal-time-units-per-second timeout)))) (let loop ((poll-value (port-poll port mode poll-timeout-ms))) (if (= poll-value 0) @@ -498,8 +509,8 @@ If already in the worker thread, call PROC immediately." timeout-internal) (raise-exception (if (string=? mode "r") - (make-port-read-timeout-error port) - (make-port-write-timeout-error port))) + (make-port-read-timeout-error thunk port) + (make-port-write-timeout-error thunk port))) (loop (port-poll port mode poll-timeout-ms))) poll-value)))) @@ -515,7 +526,7 @@ If already in the worker thread, call PROC immediately." (lambda () (raise-exception (make-port-read-timeout-error thunk port)))))) - (no-fibers-wait port "r" read-timeout)))) + (no-fibers-wait thunk port "r" read-timeout)))) (current-write-waiter (lambda (port) (if (current-scheduler) @@ -527,7 +538,7 @@ If already in the worker thread, call PROC immediately." (lambda () (raise-exception (make-port-write-timeout-error thunk port)))))) - (no-fibers-wait port "w" write-timeout))))) + (no-fibers-wait thunk port "w" write-timeout))))) (thunk))) ;; Use the fibers sleep @@ -537,3 +548,25 @@ If already in the worker thread, call PROC immediately." (append args (list #:sleep-impl sleep)))) + +(define (make-queueing-channel channel) + (define queue (make-q)) + + (let ((queue-channel (make-channel))) + (spawn-fiber + (lambda () + (while #t + (if (q-empty? queue) + (enq! queue + (perform-operation + (get-operation queue-channel))) + (let ((front (q-front queue))) + (perform-operation + (choice-operation + (wrap-operation (get-operation queue-channel) + (lambda (val) + (enq! queue val))) + (wrap-operation (put-operation channel front) + (lambda _ + (q-pop! queue)))))))))) + queue-channel)) |