aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-build-coordinator/utils/fibers.scm46
1 files changed, 45 insertions, 1 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index a064175..450c36b 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -34,7 +34,8 @@
with-fibers-timeout
with-fibers-port-timeouts
- make-queueing-channel)
+ make-queueing-channel
+ make-discrete-priority-queueing-channels)
#:replace (retry-on-error))
(define %worker-thread-args
@@ -577,3 +578,46 @@ If already in the worker thread, call PROC immediately."
(lambda _
(q-pop! queue))))))))))
queue-channel))
+
+(define (make-discrete-priority-queueing-channels channel num-priorities)
+ (define all-queues
+ (map (lambda _ (make-q))
+ (iota num-priorities)))
+
+ (define queue-channels
+ (map (lambda _ (make-channel))
+ (iota num-priorities)))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let loop ((queues all-queues))
+ (let ((queue (car queues)))
+ (if (q-empty? queue)
+ (let ((next (cdr queues)))
+ (if (null? next)
+ (perform-operation
+ (apply
+ choice-operation
+ (map (lambda (queue queue-channel)
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val))))
+ all-queues
+ queue-channels)))
+ (loop next)))
+ (let ((front (q-front queue)))
+ (perform-operation
+ (apply
+ choice-operation
+ (cons
+ (wrap-operation (put-operation channel front)
+ (lambda _
+ (q-pop! queue)))
+ (map (lambda (queue queue-channel)
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val))))
+ all-queues
+ queue-channels)))))))))))
+ (apply values queue-channels))