aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2024-05-15 17:58:40 +0100
committerChristopher Baines <mail@cbaines.net>2024-05-15 17:58:40 +0100
commitff72b37e353ff8d1869467680c309da8f1dc1e7c (patch)
tree36f43f4ca911e99c42203015246a18d53d04cdc7
parent300d706f2cc8425b237770b9fd0e4d7ccceb5328 (diff)
downloadbuild-coordinator-ff72b37e353ff8d1869467680c309da8f1dc1e7c.tar
build-coordinator-ff72b37e353ff8d1869467680c309da8f1dc1e7c.tar.gz
Order messages to the writer thread channel
So that they're processed in a first come first served manor.
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm8
-rw-r--r--guix-build-coordinator/utils/fibers.scm27
2 files changed, 34 insertions, 1 deletions
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index e67a940..96751a5 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -358,6 +358,14 @@ PRAGMA optimize;")
(define-method (datastore-spawn-fibers
(datastore <sqlite-datastore>))
+ ;; Queue messages to the writer thread, so that they're handled in a first
+ ;; come first served manor
+ (slot-set!
+ datastore
+ 'worker-writer-thread-channel
+ (make-queueing-channel
+ (slot-ref datastore 'worker-writer-thread-channel)))
+
(spawn-fiber
(lambda ()
(while #t
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 4f6431c..293429c 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,4 +1,5 @@
(define-module (guix-build-coordinator utils fibers)
+ #:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
@@ -31,7 +32,9 @@
port-read-timeout-error?
port-write-timeout-error?
with-fibers-timeout
- with-fibers-port-timeouts)
+ with-fibers-port-timeouts
+
+ make-queueing-channel)
#:replace (retry-on-error))
(define %worker-thread-args
@@ -545,3 +548,25 @@ If already in the worker thread, call PROC immediately."
(append
args
(list #:sleep-impl sleep))))
+
+(define (make-queueing-channel channel)
+ (define queue (make-q))
+
+ (let ((queue-channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (if (q-empty? queue)
+ (enq! queue
+ (perform-operation
+ (get-operation queue-channel)))
+ (let ((front (q-front queue)))
+ (perform-operation
+ (choice-operation
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val)))
+ (wrap-operation (put-operation channel front)
+ (lambda _
+ (q-pop! queue))))))))))
+ queue-channel))