diff options
Diffstat (limited to 'guix-build-coordinator')
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 46 |
1 files changed, 45 insertions, 1 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index a064175..450c36b 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -34,7 +34,8 @@ with-fibers-timeout with-fibers-port-timeouts - make-queueing-channel) + make-queueing-channel + make-discrete-priority-queueing-channels) #:replace (retry-on-error)) (define %worker-thread-args @@ -577,3 +578,46 @@ If already in the worker thread, call PROC immediately." (lambda _ (q-pop! queue)))))))))) queue-channel)) + +(define (make-discrete-priority-queueing-channels channel num-priorities) + (define all-queues + (map (lambda _ (make-q)) + (iota num-priorities))) + + (define queue-channels + (map (lambda _ (make-channel)) + (iota num-priorities))) + + (spawn-fiber + (lambda () + (while #t + (let loop ((queues all-queues)) + (let ((queue (car queues))) + (if (q-empty? queue) + (let ((next (cdr queues))) + (if (null? next) + (perform-operation + (apply + choice-operation + (map (lambda (queue queue-channel) + (wrap-operation (get-operation queue-channel) + (lambda (val) + (enq! queue val)))) + all-queues + queue-channels))) + (loop next))) + (let ((front (q-front queue))) + (perform-operation + (apply + choice-operation + (cons + (wrap-operation (put-operation channel front) + (lambda _ + (q-pop! queue))) + (map (lambda (queue queue-channel) + (wrap-operation (get-operation queue-channel) + (lambda (val) + (enq! queue val)))) + all-queues + queue-channels))))))))))) + (apply values queue-channels)) |