diff options
author | Christopher Baines <mail@cbaines.net> | 2024-10-14 13:03:09 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2024-10-14 13:12:41 +0100 |
commit | 2f826346449bcc9dbdd7e5368a366ded68b2d32b (patch) | |
tree | 69368009087032650b1b8f1aa1b86ce7fdd4ea95 /guix-build-coordinator/utils | |
parent | 492d4628887ea726cedc1c2b55743acfe32dfe0c (diff) | |
download | build-coordinator-2f826346449bcc9dbdd7e5368a366ded68b2d32b.tar build-coordinator-2f826346449bcc9dbdd7e5368a366ded68b2d32b.tar.gz |
Add make-discrete-priority-queueing-channels
Diffstat (limited to 'guix-build-coordinator/utils')
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 46 |
1 files changed, 45 insertions, 1 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index a064175..450c36b 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -34,7 +34,8 @@ with-fibers-timeout with-fibers-port-timeouts - make-queueing-channel) + make-queueing-channel + make-discrete-priority-queueing-channels) #:replace (retry-on-error)) (define %worker-thread-args @@ -577,3 +578,46 @@ If already in the worker thread, call PROC immediately." (lambda _ (q-pop! queue)))))))))) queue-channel)) + +(define (make-discrete-priority-queueing-channels channel num-priorities) + (define all-queues + (map (lambda _ (make-q)) + (iota num-priorities))) + + (define queue-channels + (map (lambda _ (make-channel)) + (iota num-priorities))) + + (spawn-fiber + (lambda () + (while #t + (let loop ((queues all-queues)) + (let ((queue (car queues))) + (if (q-empty? queue) + (let ((next (cdr queues))) + (if (null? next) + (perform-operation + (apply + choice-operation + (map (lambda (queue queue-channel) + (wrap-operation (get-operation queue-channel) + (lambda (val) + (enq! queue val)))) + all-queues + queue-channels))) + (loop next))) + (let ((front (q-front queue))) + (perform-operation + (apply + choice-operation + (cons + (wrap-operation (put-operation channel front) + (lambda _ + (q-pop! queue))) + (map (lambda (queue queue-channel) + (wrap-operation (get-operation queue-channel) + (lambda (val) + (enq! queue val)))) + all-queues + queue-channels))))))))))) + (apply values queue-channels)) |