aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-03-28 12:03:32 +0100
committerChristopher Baines <mail@cbaines.net>2023-03-28 12:03:32 +0100
commit868ac19db4ca7a8e6f95b5029b443f84c15e58cd (patch)
tree8ab3a13cf090fd3bf1a0741ef28afae202829e9a
parent4a5b22b2de20211a9df4a62988edfaa25afee990 (diff)
downloadbuild-coordinator-868ac19db4ca7a8e6f95b5029b443f84c15e58cd.tar
build-coordinator-868ac19db4ca7a8e6f95b5029b443f84c15e58cd.tar.gz
Add create-thread-pool
This is like create-work-queue, but pulls the jobs, rather than the jobs being pushed to it. This should work more efficiently for the hooks, where there are often lots of events to process.
-rw-r--r--guix-build-coordinator/utils.scm160
1 files changed, 160 insertions, 0 deletions
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index fc0d410..30b26fe 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -82,6 +82,7 @@
with-time-logging
create-work-queue
+ create-thread-pool
with-timeout
reset-timeout
@@ -1145,6 +1146,165 @@ References: ~a~%"
(values process-job count-jobs count-threads list-jobs)))
+(define* (create-thread-pool thread-count-parameter get-job proc
+ #:key thread-start-delay
+ (thread-stop-delay
+ (make-time time-duration 0 0))
+ (name "unnamed"))
+ (let ((pool-mutex (make-mutex))
+ (job-available (make-condition-variable))
+ (running-job-args (make-hash-table)))
+
+ (define get-thread-count
+ (cond
+ ((number? thread-count-parameter)
+ (const thread-count-parameter))
+ (else
+ thread-count-parameter)))
+
+ (define (count-threads)
+ (hash-count (const #t) running-job-args))
+
+ (define (list-jobs)
+ (append (hash-fold (lambda (key val result)
+ (or (and val
+ (cons val result))
+ result))
+ '()
+ running-job-args)))
+
+ (define (thread-process-job job-args)
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "job raised exception: ~A\n"
+ job-args))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply proc job-args))
+ (lambda (key . args)
+ (simple-format (current-error-port)
+ "exception when handling job: ~A ~A\n"
+ key args)
+ (backtrace))))
+ #:unwind? #t))
+
+ (define (start-thread thread-index)
+ (define (too-many-threads?)
+ (let ((running-jobs-count
+ (hash-count (lambda (index val)
+ (list? val))
+ running-job-args))
+ (desired-thread-count (get-thread-count)))
+
+ (>= running-jobs-count
+ desired-thread-count)))
+
+ (define (thread-idle-for-too-long? last-job-finished-at)
+ (time>=?
+ (time-difference (current-time time-monotonic)
+ last-job-finished-at)
+ thread-stop-delay))
+
+ (define (stop-thread)
+ (hash-remove! running-job-args
+ thread-index)
+ (unlock-mutex pool-mutex))
+
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append "gbc " name " p t "
+ (number->string thread-index))))
+ (const #t))
+
+ (let loop ((last-job-finished-at (current-time time-monotonic)))
+ (lock-mutex pool-mutex)
+
+ (if (too-many-threads?)
+ (stop-thread)
+ (let* ((running-jobs
+ (list-jobs))
+ (job-args
+ (or (get-job running-jobs)
+ ;; #f from wait-condition-variable indicates a timeout
+ (if (wait-condition-variable
+ job-available
+ pool-mutex
+ (+ 9 (time-second (current-time))))
+ (get-job running-jobs)
+ #f))))
+ (if job-args
+ (begin
+ (hash-set! running-job-args
+ thread-index
+ job-args)
+
+ (unlock-mutex pool-mutex)
+ (thread-process-job job-args)
+
+ (with-mutex pool-mutex
+ (hash-set! running-job-args
+ thread-index
+ #f))
+
+ (loop (current-time time-monotonic)))
+ (if (thread-idle-for-too-long? last-job-finished-at)
+ (stop-thread)
+ (begin
+ (unlock-mutex pool-mutex)
+
+ (loop last-job-finished-at))))))))))
+
+ (define start-new-threads-if-necessary
+ (let ((previous-thread-started-at (make-time time-monotonic 0 0)))
+ (lambda (desired-count)
+ (let* ((thread-count
+ (hash-count (const #t) running-job-args))
+ (threads-to-start
+ (- desired-count thread-count)))
+ (when (> threads-to-start 0)
+ (for-each
+ (lambda (thread-index)
+ (when (eq? (hash-ref running-job-args
+ thread-index
+ 'slot-free)
+ 'slot-free)
+ (let* ((now (current-time time-monotonic))
+ (elapsed (time-difference now
+ previous-thread-started-at)))
+ (when (or (eq? #f thread-start-delay)
+ (time>=? elapsed thread-start-delay))
+ (set! previous-thread-started-at now)
+ (hash-set! running-job-args
+ thread-index
+ #f)
+ (start-thread thread-index)))))
+ (iota desired-count)))))))
+
+ (if (procedure? thread-count-parameter)
+ (call-with-new-thread
+ (lambda ()
+ (lock-mutex pool-mutex)
+ (while #t
+ (let ((idle-threads (hash-count (lambda (index val)
+ (eq? #f val))
+ running-job-args)))
+ (when (= 0 idle-threads)
+ (start-new-threads-if-necessary (get-thread-count))))
+
+ (wait-condition-variable
+ job-available
+ pool-mutex
+ (+ 15 (time-second (current-time)))))))
+
+ (start-new-threads-if-necessary (get-thread-count)))
+
+ (values pool-mutex job-available count-threads list-jobs)))
+
;; copied from (guix scripts substitute)
(define-syntax-rule (with-timeout duration handler body ...)
"Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY