diff options
author | Christopher Baines <mail@cbaines.net> | 2024-05-15 17:58:40 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2024-05-15 17:58:40 +0100 |
commit | ff72b37e353ff8d1869467680c309da8f1dc1e7c (patch) | |
tree | 36f43f4ca911e99c42203015246a18d53d04cdc7 | |
parent | 300d706f2cc8425b237770b9fd0e4d7ccceb5328 (diff) | |
download | build-coordinator-ff72b37e353ff8d1869467680c309da8f1dc1e7c.tar build-coordinator-ff72b37e353ff8d1869467680c309da8f1dc1e7c.tar.gz |
Order messages to the writer thread channel
So that they're processed in a first come first served manor.
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 8 | ||||
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 27 |
2 files changed, 34 insertions, 1 deletions
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index e67a940..96751a5 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -358,6 +358,14 @@ PRAGMA optimize;") (define-method (datastore-spawn-fibers (datastore <sqlite-datastore>)) + ;; Queue messages to the writer thread, so that they're handled in a first + ;; come first served manor + (slot-set! + datastore + 'worker-writer-thread-channel + (make-queueing-channel + (slot-ref datastore 'worker-writer-thread-channel))) + (spawn-fiber (lambda () (while #t diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index 4f6431c..293429c 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -1,4 +1,5 @@ (define-module (guix-build-coordinator utils fibers) + #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 atomic) #:use-module (ice-9 threads) @@ -31,7 +32,9 @@ port-read-timeout-error? port-write-timeout-error? with-fibers-timeout - with-fibers-port-timeouts) + with-fibers-port-timeouts + + make-queueing-channel) #:replace (retry-on-error)) (define %worker-thread-args @@ -545,3 +548,25 @@ If already in the worker thread, call PROC immediately." (append args (list #:sleep-impl sleep)))) + +(define (make-queueing-channel channel) + (define queue (make-q)) + + (let ((queue-channel (make-channel))) + (spawn-fiber + (lambda () + (while #t + (if (q-empty? queue) + (enq! queue + (perform-operation + (get-operation queue-channel))) + (let ((front (q-front queue))) + (perform-operation + (choice-operation + (wrap-operation (get-operation queue-channel) + (lambda (val) + (enq! queue val))) + (wrap-operation (put-operation channel front) + (lambda _ + (q-pop! queue)))))))))) + queue-channel)) |