aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-05-20 11:38:46 +0100
committerChristopher Baines <mail@cbaines.net>2023-05-20 11:51:29 +0100
commite5ba65d106e7aacc325a06d87a5e19df4e44813d (patch)
tree3f42c83de81f12012b292831dfccbf88f1446d63
parent01ca366eb547dec12ab34b5f6bb2c8ae97e40287 (diff)
downloadnar-herder-e5ba65d106e7aacc325a06d87a5e19df4e44813d.tar
nar-herder-e5ba65d106e7aacc325a06d87a5e19df4e44813d.tar.gz
Port work queue improvements from the build coordinator
-rw-r--r--nar-herder/utils.scm110
1 files changed, 71 insertions, 39 deletions
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index 50c6cf9..5374fbe 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -40,6 +40,8 @@
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers conditions)
+ #:use-module ((guix build syscalls)
+ #:select (set-thread-name))
#:export (make-worker-thread-set
call-with-worker-thread
@@ -127,7 +129,9 @@
(define* (create-work-queue thread-count-parameter proc
#:key thread-start-delay
(thread-stop-delay
- (make-time time-duration 0 0)))
+ (make-time time-duration 0 0))
+ (name "unnamed")
+ priority<?)
(let ((queue (make-q))
(queue-mutex (make-mutex))
(job-available (make-condition-variable))
@@ -147,11 +151,26 @@
(else
thread-count-parameter)))
- (define (process-job . args)
- (with-mutex queue-mutex
- (enq! queue args)
- (start-new-threads-if-necessary (get-thread-count))
- (signal-condition-variable job-available)))
+ (define process-job
+ (if priority<?
+ (lambda* (args #:key priority)
+ (with-mutex queue-mutex
+ (enq! queue (cons priority args))
+ (set-car!
+ queue
+ (stable-sort! (car queue)
+ (lambda (a b)
+ (priority<?
+ (car a)
+ (car b)))))
+ (sync-q! queue)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))
+ (lambda args
+ (with-mutex queue-mutex
+ (enq! queue args)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))))
(define (count-threads)
(with-mutex queue-mutex
@@ -166,11 +185,12 @@
(define (list-jobs)
(with-mutex queue-mutex
- (append (list-copy
- (car queue))
+ (append (if priority<?
+ (map cdr (car queue))
+ (list-copy (car queue)))
(hash-fold (lambda (key val result)
- (or (and val
- (cons val result))
+ (if val
+ (cons val result)
result))
'()
running-job-args))))
@@ -179,16 +199,17 @@
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
- "job raised exception: ~A\n"
- job-args))
+ "~A work queue, job raised exception ~A: ~A\n"
+ name job-args exn))
(lambda ()
(with-throw-handler #t
(lambda ()
(apply proc job-args))
(lambda (key . args)
- (simple-format (current-error-port)
- "exception when handling job: ~A ~A\n"
- key args)
+ (simple-format
+ (current-error-port)
+ "~A thread pool, exception when handling job: ~A ~A\n"
+ name key args)
(backtrace))))
#:unwind? #t))
@@ -216,6 +237,13 @@
(call-with-new-thread
(lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append name " q t "
+ (number->string thread-index))))
+ (const #t))
+
(let loop ((last-job-finished-at (current-time time-monotonic)))
(lock-mutex queue-mutex)
@@ -232,9 +260,13 @@
;; the job in the mean time
(if (q-empty? queue)
#f
- (deq! queue))
+ (if priority<?
+ (cdr (deq! queue))
+ (deq! queue)))
#f)
- (deq! queue))))
+ (if priority<?
+ (cdr (deq! queue))
+ (deq! queue)))))
(if job-args
(begin
@@ -262,28 +294,28 @@
(define start-new-threads-if-necessary
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
(lambda (desired-count)
- (let* ((thread-count
- (hash-count (const #t) running-job-args))
- (threads-to-start
- (- desired-count thread-count)))
- (when (> threads-to-start 0)
- (for-each
- (lambda (thread-index)
- (when (eq? (hash-ref running-job-args
- thread-index
- 'slot-free)
- 'slot-free)
- (let* ((now (current-time time-monotonic))
- (elapsed (time-difference now
- previous-thread-started-at)))
- (when (or (eq? #f thread-start-delay)
- (time>=? elapsed thread-start-delay))
- (set! previous-thread-started-at now)
- (hash-set! running-job-args
- thread-index
- #f)
- (start-thread thread-index)))))
- (iota desired-count)))))))
+ (let* ((thread-count
+ (hash-count (const #t) running-job-args))
+ (threads-to-start
+ (- desired-count thread-count)))
+ (when (> threads-to-start 0)
+ (for-each
+ (lambda (thread-index)
+ (when (eq? (hash-ref running-job-args
+ thread-index
+ 'slot-free)
+ 'slot-free)
+ (let* ((now (current-time time-monotonic))
+ (elapsed (time-difference now
+ previous-thread-started-at)))
+ (when (or (eq? #f thread-start-delay)
+ (time>=? elapsed thread-start-delay))
+ (set! previous-thread-started-at now)
+ (hash-set! running-job-args
+ thread-index
+ #f)
+ (start-thread thread-index)))))
+ (iota desired-count)))))))
(if (procedure? thread-count-parameter)
(call-with-new-thread