aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator')
-rw-r--r--guix-build-coordinator/agent.scm41
-rw-r--r--guix-build-coordinator/client-communication.scm21
-rw-r--r--guix-build-coordinator/coordinator.scm66
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm8
-rw-r--r--guix-build-coordinator/hooks.scm190
-rw-r--r--guix-build-coordinator/utils.scm57
-rw-r--r--guix-build-coordinator/utils/fibers.scm65
7 files changed, 294 insertions, 154 deletions
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index 8144947..7c00ca1 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -46,7 +46,7 @@
#:use-module (guix base32)
#:use-module (guix serialization)
#:use-module ((guix build syscalls)
- #:select (set-thread-name))
+ #:select (set-thread-name free-disk-space))
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator agent-messaging)
#:use-module (guix-build-coordinator agent-messaging abstract)
@@ -77,16 +77,22 @@
derivation-substitute-urls
non-derivation-substitute-urls
metrics-file
- max-1min-load-average)
+ max-1min-load-average
+ timestamp-log-output?)
(define lgr (make <logger>))
(define port-log (make <port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
- (format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str))))
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
+ (format #f "~a(~5a): ~a~%"
+ (if timestamp-log-output?
+ (strftime "%F %H:%M:%S " (localtime
+ (second args)))
+ "")
+ (first args)
+ (third args)))))
(define metrics-enabled?
(and (not (string-null? metrics-file))
@@ -513,12 +519,21 @@
'())
(lambda ()
- (fetch-builds-for-agent
- coordinator-interface
- systems
- (+ (max current-threads 1)
- (count-post-build-jobs))
- #:log (build-log-procedure lgr)))
+ (let ((free-space (free-disk-space "/gnu/store")))
+ (if (< free-space (* 2 (expt 2 30))) ; 2G
+ (begin
+ (log-msg
+ lgr 'WARN
+ "low space on /gnu/store, "
+ "not fetching new builds")
+ (sleep 30)
+ '())
+ (fetch-builds-for-agent
+ coordinator-interface
+ systems
+ (+ (max current-threads 1)
+ (count-post-build-jobs))
+ #:log (build-log-procedure lgr)))))
#:unwind? #t))
(new-builds
(remove (lambda (build)
diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm
index c0118aa..2ea7ce2 100644
--- a/guix-build-coordinator/client-communication.scm
+++ b/guix-build-coordinator/client-communication.scm
@@ -151,7 +151,8 @@
(alist-delete
'end-time
build-details))
- ,@(if (assq-ref build-details 'processed)
+ ,@(if (or (assq-ref build-details 'processed)
+ (assq-ref build-details 'canceled))
'()
(datastore-find-unprocessed-build-entry datastore uuid))
(created-at . ,(or (and=>
@@ -483,15 +484,17 @@
'WARN
"exception substituting derivation " derivation-file
": " exn)
-
- (if (null? (or substitute-urls '()))
- ;; Try again
- (read-drv/substitute derivation-file)
- (read-derivation-through-substitutes
- derivation-file
- substitute-urls)))
+ (raise-exception exn))
(lambda ()
- (read-drv/substitute derivation-file))
+ (with-throw-handler #t
+ (lambda ()
+ (retry-on-error
+ (lambda ()
+ (read-drv/substitute derivation-file))
+ #:times 5
+ #:delay 5))
+ (lambda args
+ (backtrace))))
#:unwind? #t))
,@(let ((priority (assoc-ref body "priority")))
(if priority
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index 3830d88..9e5987f 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -50,6 +50,7 @@
#:use-module (prometheus)
#:use-module ((guix build syscalls)
#:select (set-thread-name))
+ #:use-module (guix store)
#:use-module (guix derivations)
#:use-module (guix build utils)
#:use-module (guix-build-coordinator utils)
@@ -363,7 +364,8 @@
(not (client-error? exn))))))
hooks
(allocation-strategy
- basic-build-allocation-strategy))
+ basic-build-allocation-strategy)
+ (timestamp-log-output? #t))
(and (or (list? hooks)
(begin
(simple-format
@@ -394,11 +396,16 @@
(port-log (make <custom-port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
- (format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str))))
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
+ (format #f "~a(~5a): ~a~%"
+ (if timestamp-log-output?
+ (strftime "%F %H:%M:%S " (localtime
+ (second args)))
+ "")
+ (first args)
+ (third args)))))
(build-coordinator
(make-build-coordinator-record datastore
hooks
@@ -420,6 +427,11 @@
;; The logger assumes this
(set-port-encoding! (current-output-port) "UTF-8")
+ ;; Work around my broken with-store/non-blocking in Guix
+ (let ((socket-file (%daemon-socket-uri)))
+ (%daemon-socket-uri
+ (string-append "file://" socket-file)))
+
(with-exception-handler
(lambda (exn)
(simple-format #t "failed enabling core dumps: ~A\n" exn))
@@ -441,14 +453,14 @@
(lambda (scheduler port)
(display "#<scheduler>" port)))
- (when update-datastore?
- (datastore-update (build-coordinator-datastore build-coordinator)))
-
(when pid-file
(call-with-output-file pid-file
(lambda (port)
(simple-format port "~A\n" (getpid)))))
+ (when update-datastore?
+ (datastore-update (build-coordinator-datastore build-coordinator)))
+
(set-build-coordinator-allocator-thread!
build-coordinator
(make-build-allocator-thread build-coordinator))
@@ -585,8 +597,8 @@
finished?)
(wait finished?))
- #:hz 10
- #:parallelism 2))
+ #:hz 0
+ #:parallelism 1))
finished?)))))
(define* (submit-build build-coordinator derivation-file
@@ -619,9 +631,19 @@
(derivation
(if derivation-exists-in-database?
#f ; unnecessary to fetch derivation
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file))))
+ ;; Bit of a hack, but offload reading the derivation to a
+ ;; thread so that it doesn't block the fibers thread, since
+ ;; local I/O doesn't cooperate with fibers
+ (datastore-call-with-transaction
+ datastore
+ (lambda _
+ (with-fibers-port-timeouts
+ (lambda ()
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
+ #:timeout 240))
+ #:readonly? #t)))
(system
(or system-from-database
@@ -740,9 +762,19 @@
(unless (datastore-find-derivation datastore derivation-file)
(datastore-store-derivation
datastore
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file))))
+ ;; Bit of a hack, but offload reading the derivation to a thread so
+ ;; that it doesn't block the fibers thread, since local I/O doesn't
+ ;; cooperate with fibers
+ (datastore-call-with-transaction
+ datastore
+ (lambda _
+ (with-fibers-port-timeouts
+ (lambda ()
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
+ #:timeout 30))
+ #:readonly? #t)))
(let ((related-derivations-lacking-builds
(if ensure-all-related-derivation-outputs-have-builds?
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index e67a940..96751a5 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -358,6 +358,14 @@ PRAGMA optimize;")
(define-method (datastore-spawn-fibers
(datastore <sqlite-datastore>))
+ ;; Queue messages to the writer thread, so that they're handled in a first
+ ;; come first served manor
+ (slot-set!
+ datastore
+ 'worker-writer-thread-channel
+ (make-queueing-channel
+ (slot-ref datastore 'worker-writer-thread-channel)))
+
(spawn-fiber
(lambda ()
(while #t
diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm
index bf4b576..3eeb299 100644
--- a/guix-build-coordinator/hooks.scm
+++ b/guix-build-coordinator/hooks.scm
@@ -32,8 +32,9 @@
#:use-module (guix config)
#:use-module (guix derivations)
#:use-module (guix serialization)
- #:use-module ((guix utils) #:select (default-keyword-arguments))
- #:use-module (guix build utils)
+ #:use-module ((guix utils) #:select (default-keyword-arguments
+ call-with-decompressed-port))
+ #:use-module ((guix build utils) #:select (dump-port mkdir-p))
#:use-module (guix-build-coordinator config)
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator datastore)
@@ -81,6 +82,20 @@
build-id agent-id)
(current-error-port))))
+(define* (default-nar-compressions #:key output-filename nar-size
+ source-compression source-size)
+ (define MiB (* (expt 2 20) 1.))
+
+ (if (eq? 'none source-compression)
+ ;; If the agent didn't compress the nar, don't change that here
+ (list 'none)
+ (let* ((compression-proportion
+ (/ source-size nar-size)))
+ (if (or (> compression-proportion 0.95)
+ (< nar-size (* 0.5 MiB)))
+ '(none)
+ (list source-compression)))))
+
(define* (build-success-publish-hook
publish-directory
#:key
@@ -94,7 +109,8 @@
post-publish-hook
combined-post-publish-hook
(publish-referenced-derivation-source-files? #t)
- derivation-substitute-urls)
+ derivation-substitute-urls
+ (nar-compressions-proc default-nar-compressions))
(mkdir-p (string-append publish-directory "/nar/lzip"))
(lambda (build-coordinator build-id)
@@ -218,8 +234,7 @@
nar-filename))
#:unwind? #t))
- (cons narinfo-filename
- nar-filename)))))
+ narinfo-filename))))
referenced-source-files))))))
(define (process-output drv-name output)
@@ -230,12 +245,6 @@
(nar-location
(build-output-file-location datastore build-id
output-name))
- (nar-filename
- (string-append "nar/lzip/"
- (basename output-filename)))
- (nar-destination
- (string-append publish-directory "/"
- nar-filename))
(narinfo-filename
(string-append (string-take (basename output-filename) 32)
".narinfo"))
@@ -246,8 +255,78 @@
(if (skip-publishing-proc narinfo-filename narinfo-directory)
#f
- (begin
- (copy-file nar-location nar-destination)
+ (let ((compressions
+ (nar-compressions-proc
+ #:output-filename output-filename
+ #:nar-size (assq-ref output 'size)
+ ;; TODO Don't hardcode this
+ #:source-compression 'lzip
+ #:source-size (stat:size (stat nar-location #f)))))
+
+ (for-each
+ (lambda (compression)
+ (if (or (and (pair? compression)
+ (eq? (car compression) 'lzip))
+ (eq? compression 'lzip))
+ ;; TODO If the agents start uploading uncompressed files
+ ;; or files compressed differently, this might not be
+ ;; right
+ (let ((nar-destination
+ (string-append publish-directory "/"
+ "nar/lzip/"
+ (basename output-filename))))
+ (copy-file nar-location nar-destination))
+ (let* ((target-compression
+ (if (pair? compression)
+ (car compression)
+ compression))
+ ;; TODO This logic should sit elsewhere
+ (nar-destination
+ (string-append
+ publish-directory "/"
+ "nar/"
+ (if (eq? compression 'none)
+ ""
+ (string-append
+ (symbol->string target-compression) "/"))
+ (basename output-filename)))
+ (temporary-destination
+ (string-append nar-destination ".tmp")))
+ (mkdir-p (dirname temporary-destination))
+ (call-with-input-file nar-location
+ (lambda (source-port)
+ (call-with-decompressed-port
+ ;; TODO Don't assume the source compression
+ 'lzip
+ source-port
+ (lambda (decompressed-source-port)
+ (let ((call-with-compressed-output-port*
+ (match target-compression
+ ('gzip
+ (@ (zlib) call-with-gzip-output-port))
+ ('lzip
+ (@ (lzlib) call-with-lzip-output-port))
+ ('zstd
+ (@ (zstd) call-with-zstd-output-port))
+ ('none
+ (lambda (port proc)
+ (proc port)
+ (close-port port))))))
+ (when (file-exists? temporary-destination)
+ (delete-file temporary-destination))
+ (apply
+ call-with-compressed-output-port*
+ (open-output-file temporary-destination
+ #:binary #t)
+ (lambda (compressed-port)
+ (dump-port decompressed-source-port
+ compressed-port))
+ (if (pair? compression)
+ (cdr compression)
+ '())))))))
+ (rename-file temporary-destination
+ nar-destination))))
+ compressions)
(call-with-output-file narinfo-location
(lambda (port)
@@ -257,7 +336,23 @@
(assq-ref output 'size)
(vector->list
(assq-ref output 'references))
- `((lzip ,(stat:size (stat nar-location #f))))
+ (map
+ (lambda (compression-details)
+ (let* ((compression
+ (if (pair? compression-details)
+ (car compression-details)
+ compression-details))
+ ;; TODO This logic should sit elsewhere
+ (file (string-append
+ publish-directory "/"
+ "nar/"
+ (if (eq? compression 'none)
+ ""
+ (string-append
+ (symbol->string compression) "/"))
+ (basename output-filename))))
+ (list compression (stat:size (stat file #f)))))
+ compressions)
#:system (datastore-find-derivation-system
datastore
drv-name)
@@ -276,18 +371,16 @@
(raise-exception exn))
(lambda ()
(post-publish-hook publish-directory
- narinfo-filename
- nar-filename))
+ narinfo-filename))
#:unwind? #t))
- (cons narinfo-filename
- nar-filename)))))
+ narinfo-filename))))
(let* ((build-details
(datastore-find-build datastore build-id))
(drv-name
(assq-ref build-details 'derivation-name))
- (narinfos-and-nars
+ (narinfos
(append
(if publish-referenced-derivation-source-files?
(process-referenced-derivation-source-files drv-name)
@@ -297,23 +390,22 @@
(process-output drv-name output))
(datastore-list-build-outputs datastore build-id)))))
(when (and combined-post-publish-hook
- (not (null? narinfos-and-nars)))
+ (not (null? narinfos)))
(with-exception-handler
(lambda (exn)
;; Rollback narinfo creation, to make this more
;; transactional
(for-each
- (match-lambda
- ((narinfo-filename . _)
- (delete-file
- (string-append
- narinfo-directory "/" narinfo-filename))))
- narinfos-and-nars)
+ (lambda
+ (narinfo-filename)
+ (delete-file
+ (string-append
+ narinfo-directory "/" narinfo-filename)))
+ narinfos)
(raise-exception exn))
(lambda ()
- (combined-post-publish-hook publish-directory
- narinfos-and-nars))
+ (combined-post-publish-hook publish-directory narinfos))
#:unwind? #t)))))
(define* (build-success-s3-publish-hook
@@ -528,24 +620,30 @@
(unless (eq? source-compression recompress-to)
(when (file-exists? tmp-output-log-file)
(delete-file tmp-output-log-file))
- (with-timeout timeout
- (raise-exception
- (make-exception-with-message "timeout recompressing log file"))
- (call-with-compressed-input-file
- source-log-file
- source-compression
- (lambda (input-port)
- (call-with-compressed-output-file
- tmp-output-log-file
- recompress-to
- (lambda (output-port)
- (dump-port input-port output-port))))))
+ (with-port-timeouts
+ (lambda ()
+ (call-with-compressed-input-file
+ source-log-file
+ source-compression
+ (lambda (input-port)
+ (call-with-compressed-output-file
+ tmp-output-log-file
+ recompress-to
+ (lambda (output-port)
+ (dump-port input-port output-port))))))
+ #:timeout timeout)
(rename-file tmp-output-log-file
output-log-file)
(delete-file source-log-file)))))
-(define (default-build-missing-inputs-hook build-coordinator
- build-id missing-inputs)
+(define* (default-build-missing-inputs-hook
+ build-coordinator
+ build-id missing-inputs
+ #:key (submit-build? (lambda* (missing-input
+ #:key successful-builds
+ pending-builds)
+ (and (null? successful-builds)
+ (null? pending-builds)))))
(define datastore
(build-coordinator-datastore build-coordinator))
@@ -579,8 +677,9 @@
(not (assq-ref build-details 'processed))
(not (assq-ref build-details 'canceled))))
builds-for-output)))
- (if (and (null? successful-builds)
- (null? pending-builds))
+ (if (submit-build? missing-input
+ #:successful-builds successful-builds
+ #:pending-builds pending-builds)
(begin
(simple-format #t
"submitting build for ~A\n"
@@ -590,8 +689,7 @@
#:tags (datastore-fetch-build-tags
datastore
build-id)))
- (simple-format #t "~A builds exist for ~A, skipping\n"
- (length builds-for-output)
+ (simple-format #t "skipping submitting build for ~A\n"
missing-input)))
(begin
(simple-format (current-error-port)
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index a549f20..0a3383c 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -39,9 +39,6 @@
#:use-module ((guix http-client)
#:select (http-fetch))
#:use-module (guix serialization)
- #:use-module ((guix build download)
- #:select ((open-connection-for-uri
- . guix:open-connection-for-uri)))
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:use-module (guix scripts substitute)
@@ -66,7 +63,7 @@
read-derivation-from-file*
- with-store/non-blocking
+ non-blocking-port
substitute-derivation
read-derivation-through-substitutes
@@ -86,9 +83,6 @@
create-work-queue
create-thread-pool
- with-timeout
- reset-timeout
-
throttle
get-load-average
@@ -399,23 +393,6 @@
(fcntl port F_SETFL (logior O_NONBLOCK flags)))
port))
-(define (ensure-non-blocking-store-connection store)
- "Mark the file descriptor that backs STORE, a <store-connection>, as
-O_NONBLOCK."
- (match (store-connection-socket store)
- ((? file-port? port)
- (non-blocking-port port))
- (_ #f)))
-
-(define-syntax-rule (with-store/non-blocking store exp ...)
- "Like 'with-store', bind STORE to a connection to the store, but ensure that
-said connection is non-blocking (O_NONBLOCK). Evaluate EXP... in that
-context."
- (with-store store
- (ensure-non-blocking-store-connection store)
- (let ()
- exp ...)))
-
(define* (substitute-derivation store
derivation-name
#:key substitute-urls)
@@ -487,10 +464,9 @@ context."
(match (assoc-ref cache key)
(#f
(let ((socket
- (guix:open-connection-for-uri
+ (open-socket-for-uri*
uri
- #:verify-certificate? verify-certificate?
- #:timeout timeout)))
+ #:verify-certificate? verify-certificate?)))
(set! cache (alist-cons key socket cache))
socket))
(socket
@@ -591,9 +567,9 @@ context."
(define* (store-item->recutils compression file-size)
(let ((url (encode-and-join-uri-path
`(,@(split-and-decode-uri-path nar-path)
- ,@(if compression
- (list (symbol->string compression))
- '())
+ ,@(if (eq? compression 'none)
+ '()
+ (list (symbol->string compression)))
,(basename store-path)))))
(format #f "URL: ~a~%Compression: ~a~%~@[FileSize: ~a~%~]"
url
@@ -1268,27 +1244,6 @@ References: ~a~%"
(values pool-mutex job-available count-threads list-jobs)))
-;; copied from (guix scripts substitute)
-(define-syntax-rule (with-timeout duration handler body ...)
- "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY
-again."
- (begin
- (sigaction SIGALRM
- (lambda (signum)
- (sigaction SIGALRM SIG_DFL)
- handler))
- (alarm duration)
- (call-with-values
- (lambda ()
- body ...)
- (lambda result
- (alarm 0)
- (sigaction SIGALRM SIG_DFL)
- (apply values result)))))
-
-(define (reset-timeout duration)
- (alarm duration))
-
(define (throttle min-duration thunk)
(let ((next-min-runtime 0))
(lambda ()
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 5362b18..2c7307f 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,4 +1,5 @@
(define-module (guix-build-coordinator utils fibers)
+ #:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
@@ -15,6 +16,7 @@
#:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
#:export (make-worker-thread-channel
+ %worker-thread-default-timeout
call-with-worker-thread
worker-thread-timeout-error?
@@ -26,8 +28,13 @@
letpar&
+ port-timeout-error?
+ port-read-timeout-error?
+ port-write-timeout-error?
with-fibers-timeout
- with-fibers-port-timeouts)
+ with-fibers-port-timeouts
+
+ make-queueing-channel)
#:replace (retry-on-error))
(define %worker-thread-args
@@ -202,8 +209,11 @@ arguments of the worker thread procedure."
(define worker-thread-timeout-error?
(record-predicate &worker-thread-timeout))
+(define %worker-thread-default-timeout
+ (make-parameter 30))
+
(define* (call-with-worker-thread channel proc #:key duration-logger
- (timeout 30))
+ (timeout (%worker-thread-default-timeout)))
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
(let ((args (%worker-thread-args)))
@@ -306,7 +316,9 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(with-fibers-port-timeouts
(lambda ()
- (let ((sock (socket PF_INET SOCK_STREAM 0)))
+ (let ((sock
+ (non-blocking-port
+ (socket PF_INET SOCK_STREAM 0))))
(connect sock AF_INET INADDR_LOOPBACK port)
(close-port sock)))
#:timeout 20))
@@ -401,7 +413,7 @@ If already in the worker thread, call PROC immediately."
(define &port-timeout
(make-exception-type '&port-timeout
&external-error
- '(port)))
+ '(thunk port)))
(define make-port-timeout-error
(record-constructor &port-timeout))
@@ -435,20 +447,16 @@ If already in the worker thread, call PROC immediately."
(define (readable? port)
"Test if PORT is writable."
- (match (select (vector port) #() #() 0)
- ((#() #() #()) #f)
- ((#(_) #() #()) #t)))
+ (= 1 (port-poll port "r" 0)))
(define (writable? port)
"Test if PORT is writable."
- (match (select #() (vector port) #() 0)
- ((#() #() #()) #f)
- ((#() #(_) #()) #t)))
+ (= 1 (port-poll port "w" 0)))
(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
(make-base-operation #f
(lambda _
- (and (ready? (port-ready-fd port)) values))
+ (and (ready? port) values))
(lambda (flag sched resume)
(define (commit)
(match (atomic-box-compare-and-swap! flag 'W 'S)
@@ -478,7 +486,7 @@ If already in the worker thread, call PROC immediately."
#:key timeout
(read-timeout timeout)
(write-timeout timeout))
- (define (no-fibers-wait port mode timeout)
+ (define (no-fibers-wait thunk port mode timeout)
(define poll-timeout-ms 200)
;; When the GC runs, it restarts the poll syscall, but the timeout
@@ -489,8 +497,7 @@ If already in the worker thread, call PROC immediately."
;; timed out overall.
(let ((timeout-internal
(+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
+ (* internal-time-units-per-second timeout))))
(let loop ((poll-value
(port-poll port mode poll-timeout-ms)))
(if (= poll-value 0)
@@ -498,8 +505,8 @@ If already in the worker thread, call PROC immediately."
timeout-internal)
(raise-exception
(if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
+ (make-port-read-timeout-error thunk port)
+ (make-port-write-timeout-error thunk port)))
(loop (port-poll port mode poll-timeout-ms)))
poll-value))))
@@ -515,7 +522,7 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(raise-exception
(make-port-read-timeout-error thunk port))))))
- (no-fibers-wait port "r" read-timeout))))
+ (no-fibers-wait thunk port "r" read-timeout))))
(current-write-waiter
(lambda (port)
(if (current-scheduler)
@@ -527,7 +534,7 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(raise-exception
(make-port-write-timeout-error thunk port))))))
- (no-fibers-wait port "w" write-timeout)))))
+ (no-fibers-wait thunk port "w" write-timeout)))))
(thunk)))
;; Use the fibers sleep
@@ -537,3 +544,25 @@ If already in the worker thread, call PROC immediately."
(append
args
(list #:sleep-impl sleep))))
+
+(define (make-queueing-channel channel)
+ (define queue (make-q))
+
+ (let ((queue-channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (if (q-empty? queue)
+ (enq! queue
+ (perform-operation
+ (get-operation queue-channel)))
+ (let ((front (q-front queue)))
+ (perform-operation
+ (choice-operation
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val)))
+ (wrap-operation (put-operation channel front)
+ (lambda _
+ (q-pop! queue))))))))))
+ queue-channel))