aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2024-10-14 13:03:09 +0100
committerChristopher Baines <mail@cbaines.net>2024-10-14 13:12:41 +0100
commit2f826346449bcc9dbdd7e5368a366ded68b2d32b (patch)
tree69368009087032650b1b8f1aa1b86ce7fdd4ea95 /guix-build-coordinator
parent492d4628887ea726cedc1c2b55743acfe32dfe0c (diff)
downloadbuild-coordinator-2f826346449bcc9dbdd7e5368a366ded68b2d32b.tar
build-coordinator-2f826346449bcc9dbdd7e5368a366ded68b2d32b.tar.gz
Add make-discrete-priority-queueing-channels
Diffstat (limited to 'guix-build-coordinator')
-rw-r--r--guix-build-coordinator/utils/fibers.scm46
1 files changed, 45 insertions, 1 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index a064175..450c36b 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -34,7 +34,8 @@
with-fibers-timeout
with-fibers-port-timeouts
- make-queueing-channel)
+ make-queueing-channel
+ make-discrete-priority-queueing-channels)
#:replace (retry-on-error))
(define %worker-thread-args
@@ -577,3 +578,46 @@ If already in the worker thread, call PROC immediately."
(lambda _
(q-pop! queue))))))))))
queue-channel))
+
+(define (make-discrete-priority-queueing-channels channel num-priorities)
+ (define all-queues
+ (map (lambda _ (make-q))
+ (iota num-priorities)))
+
+ (define queue-channels
+ (map (lambda _ (make-channel))
+ (iota num-priorities)))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let loop ((queues all-queues))
+ (let ((queue (car queues)))
+ (if (q-empty? queue)
+ (let ((next (cdr queues)))
+ (if (null? next)
+ (perform-operation
+ (apply
+ choice-operation
+ (map (lambda (queue queue-channel)
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val))))
+ all-queues
+ queue-channels)))
+ (loop next)))
+ (let ((front (q-front queue)))
+ (perform-operation
+ (apply
+ choice-operation
+ (cons
+ (wrap-operation (put-operation channel front)
+ (lambda _
+ (q-pop! queue)))
+ (map (lambda (queue queue-channel)
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val))))
+ all-queues
+ queue-channels)))))))))))
+ (apply values queue-channels))