aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator')
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm8
-rw-r--r--guix-build-coordinator/utils/fibers.scm27
2 files changed, 34 insertions, 1 deletions
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index e67a940..96751a5 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -358,6 +358,14 @@ PRAGMA optimize;")
(define-method (datastore-spawn-fibers
(datastore <sqlite-datastore>))
+ ;; Queue messages to the writer thread, so that they're handled in a first
+ ;; come first served manor
+ (slot-set!
+ datastore
+ 'worker-writer-thread-channel
+ (make-queueing-channel
+ (slot-ref datastore 'worker-writer-thread-channel)))
+
(spawn-fiber
(lambda ()
(while #t
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 4f6431c..293429c 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,4 +1,5 @@
(define-module (guix-build-coordinator utils fibers)
+ #:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
@@ -31,7 +32,9 @@
port-read-timeout-error?
port-write-timeout-error?
with-fibers-timeout
- with-fibers-port-timeouts)
+ with-fibers-port-timeouts
+
+ make-queueing-channel)
#:replace (retry-on-error))
(define %worker-thread-args
@@ -545,3 +548,25 @@ If already in the worker thread, call PROC immediately."
(append
args
(list #:sleep-impl sleep))))
+
+(define (make-queueing-channel channel)
+ (define queue (make-q))
+
+ (let ((queue-channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (if (q-empty? queue)
+ (enq! queue
+ (perform-operation
+ (get-operation queue-channel)))
+ (let ((front (q-front queue)))
+ (perform-operation
+ (choice-operation
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val)))
+ (wrap-operation (put-operation channel front)
+ (lambda _
+ (q-pop! queue))))))))))
+ queue-channel))