diff options
author | Christopher Baines <mail@cbaines.net> | 2023-05-20 11:38:46 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2023-05-20 11:51:29 +0100 |
commit | e5ba65d106e7aacc325a06d87a5e19df4e44813d (patch) | |
tree | 3f42c83de81f12012b292831dfccbf88f1446d63 | |
parent | 01ca366eb547dec12ab34b5f6bb2c8ae97e40287 (diff) | |
download | nar-herder-e5ba65d106e7aacc325a06d87a5e19df4e44813d.tar nar-herder-e5ba65d106e7aacc325a06d87a5e19df4e44813d.tar.gz |
Port work queue improvements from the build coordinator
-rw-r--r-- | nar-herder/utils.scm | 110 |
1 files changed, 71 insertions, 39 deletions
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index 50c6cf9..5374fbe 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -40,6 +40,8 @@ #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers conditions) + #:use-module ((guix build syscalls) + #:select (set-thread-name)) #:export (make-worker-thread-set call-with-worker-thread @@ -127,7 +129,9 @@ (define* (create-work-queue thread-count-parameter proc #:key thread-start-delay (thread-stop-delay - (make-time time-duration 0 0))) + (make-time time-duration 0 0)) + (name "unnamed") + priority<?) (let ((queue (make-q)) (queue-mutex (make-mutex)) (job-available (make-condition-variable)) @@ -147,11 +151,26 @@ (else thread-count-parameter))) - (define (process-job . args) - (with-mutex queue-mutex - (enq! queue args) - (start-new-threads-if-necessary (get-thread-count)) - (signal-condition-variable job-available))) + (define process-job + (if priority<? + (lambda* (args #:key priority) + (with-mutex queue-mutex + (enq! queue (cons priority args)) + (set-car! + queue + (stable-sort! (car queue) + (lambda (a b) + (priority<? + (car a) + (car b))))) + (sync-q! queue) + (start-new-threads-if-necessary (get-thread-count)) + (signal-condition-variable job-available))) + (lambda args + (with-mutex queue-mutex + (enq! queue args) + (start-new-threads-if-necessary (get-thread-count)) + (signal-condition-variable job-available))))) (define (count-threads) (with-mutex queue-mutex @@ -166,11 +185,12 @@ (define (list-jobs) (with-mutex queue-mutex - (append (list-copy - (car queue)) + (append (if priority<? + (map cdr (car queue)) + (list-copy (car queue))) (hash-fold (lambda (key val result) - (or (and val - (cons val result)) + (if val + (cons val result) result)) '() running-job-args)))) @@ -179,16 +199,17 @@ (with-exception-handler (lambda (exn) (simple-format (current-error-port) - "job raised exception: ~A\n" - job-args)) + "~A work queue, job raised exception ~A: ~A\n" + name job-args exn)) (lambda () (with-throw-handler #t (lambda () (apply proc job-args)) (lambda (key . args) - (simple-format (current-error-port) - "exception when handling job: ~A ~A\n" - key args) + (simple-format + (current-error-port) + "~A thread pool, exception when handling job: ~A ~A\n" + name key args) (backtrace)))) #:unwind? #t)) @@ -216,6 +237,13 @@ (call-with-new-thread (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append name " q t " + (number->string thread-index)))) + (const #t)) + (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex queue-mutex) @@ -232,9 +260,13 @@ ;; the job in the mean time (if (q-empty? queue) #f - (deq! queue)) + (if priority<? + (cdr (deq! queue)) + (deq! queue))) #f) - (deq! queue)))) + (if priority<? + (cdr (deq! queue)) + (deq! queue))))) (if job-args (begin @@ -262,28 +294,28 @@ (define start-new-threads-if-necessary (let ((previous-thread-started-at (make-time time-monotonic 0 0))) (lambda (desired-count) - (let* ((thread-count - (hash-count (const #t) running-job-args)) - (threads-to-start - (- desired-count thread-count))) - (when (> threads-to-start 0) - (for-each - (lambda (thread-index) - (when (eq? (hash-ref running-job-args - thread-index - 'slot-free) - 'slot-free) - (let* ((now (current-time time-monotonic)) - (elapsed (time-difference now - previous-thread-started-at))) - (when (or (eq? #f thread-start-delay) - (time>=? elapsed thread-start-delay)) - (set! previous-thread-started-at now) - (hash-set! running-job-args - thread-index - #f) - (start-thread thread-index))))) - (iota desired-count))))))) + (let* ((thread-count + (hash-count (const #t) running-job-args)) + (threads-to-start + (- desired-count thread-count))) + (when (> threads-to-start 0) + (for-each + (lambda (thread-index) + (when (eq? (hash-ref running-job-args + thread-index + 'slot-free) + 'slot-free) + (let* ((now (current-time time-monotonic)) + (elapsed (time-difference now + previous-thread-started-at))) + (when (or (eq? #f thread-start-delay) + (time>=? elapsed thread-start-delay)) + (set! previous-thread-started-at now) + (hash-set! running-job-args + thread-index + #f) + (start-thread thread-index))))) + (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread |