aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/utils/fibers.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator/utils/fibers.scm')
-rw-r--r--guix-build-coordinator/utils/fibers.scm55
1 files changed, 44 insertions, 11 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 5362b18..293429c 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,4 +1,5 @@
(define-module (guix-build-coordinator utils fibers)
+ #:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
@@ -15,6 +16,7 @@
#:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
#:export (make-worker-thread-channel
+ %worker-thread-default-timeout
call-with-worker-thread
worker-thread-timeout-error?
@@ -26,8 +28,13 @@
letpar&
+ port-timeout-error?
+ port-read-timeout-error?
+ port-write-timeout-error?
with-fibers-timeout
- with-fibers-port-timeouts)
+ with-fibers-port-timeouts
+
+ make-queueing-channel)
#:replace (retry-on-error))
(define %worker-thread-args
@@ -202,8 +209,11 @@ arguments of the worker thread procedure."
(define worker-thread-timeout-error?
(record-predicate &worker-thread-timeout))
+(define %worker-thread-default-timeout
+ (make-parameter 30))
+
(define* (call-with-worker-thread channel proc #:key duration-logger
- (timeout 30))
+ (timeout (%worker-thread-default-timeout)))
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
(let ((args (%worker-thread-args)))
@@ -306,7 +316,9 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(with-fibers-port-timeouts
(lambda ()
- (let ((sock (socket PF_INET SOCK_STREAM 0)))
+ (let ((sock
+ (non-blocking-port
+ (socket PF_INET SOCK_STREAM 0))))
(connect sock AF_INET INADDR_LOOPBACK port)
(close-port sock)))
#:timeout 20))
@@ -401,7 +413,7 @@ If already in the worker thread, call PROC immediately."
(define &port-timeout
(make-exception-type '&port-timeout
&external-error
- '(port)))
+ '(thunk port)))
(define make-port-timeout-error
(record-constructor &port-timeout))
@@ -478,7 +490,7 @@ If already in the worker thread, call PROC immediately."
#:key timeout
(read-timeout timeout)
(write-timeout timeout))
- (define (no-fibers-wait port mode timeout)
+ (define (no-fibers-wait thunk port mode timeout)
(define poll-timeout-ms 200)
;; When the GC runs, it restarts the poll syscall, but the timeout
@@ -489,8 +501,7 @@ If already in the worker thread, call PROC immediately."
;; timed out overall.
(let ((timeout-internal
(+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
+ (* internal-time-units-per-second timeout))))
(let loop ((poll-value
(port-poll port mode poll-timeout-ms)))
(if (= poll-value 0)
@@ -498,8 +509,8 @@ If already in the worker thread, call PROC immediately."
timeout-internal)
(raise-exception
(if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
+ (make-port-read-timeout-error thunk port)
+ (make-port-write-timeout-error thunk port)))
(loop (port-poll port mode poll-timeout-ms)))
poll-value))))
@@ -515,7 +526,7 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(raise-exception
(make-port-read-timeout-error thunk port))))))
- (no-fibers-wait port "r" read-timeout))))
+ (no-fibers-wait thunk port "r" read-timeout))))
(current-write-waiter
(lambda (port)
(if (current-scheduler)
@@ -527,7 +538,7 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(raise-exception
(make-port-write-timeout-error thunk port))))))
- (no-fibers-wait port "w" write-timeout)))))
+ (no-fibers-wait thunk port "w" write-timeout)))))
(thunk)))
;; Use the fibers sleep
@@ -537,3 +548,25 @@ If already in the worker thread, call PROC immediately."
(append
args
(list #:sleep-impl sleep))))
+
+(define (make-queueing-channel channel)
+ (define queue (make-q))
+
+ (let ((queue-channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (if (q-empty? queue)
+ (enq! queue
+ (perform-operation
+ (get-operation queue-channel)))
+ (let ((front (q-front queue)))
+ (perform-operation
+ (choice-operation
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val)))
+ (wrap-operation (put-operation channel front)
+ (lambda _
+ (q-pop! queue))))))))))
+ queue-channel))