diff options
author | Christopher Baines <mail@cbaines.net> | 2023-03-28 12:03:32 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2023-03-28 12:03:32 +0100 |
commit | 868ac19db4ca7a8e6f95b5029b443f84c15e58cd (patch) | |
tree | 8ab3a13cf090fd3bf1a0741ef28afae202829e9a | |
parent | 4a5b22b2de20211a9df4a62988edfaa25afee990 (diff) | |
download | build-coordinator-868ac19db4ca7a8e6f95b5029b443f84c15e58cd.tar build-coordinator-868ac19db4ca7a8e6f95b5029b443f84c15e58cd.tar.gz |
Add create-thread-pool
This is like create-work-queue, but pulls the jobs, rather than the jobs being
pushed to it. This should work more efficiently for the hooks, where there are
often lots of events to process.
-rw-r--r-- | guix-build-coordinator/utils.scm | 160 |
1 files changed, 160 insertions, 0 deletions
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index fc0d410..30b26fe 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -82,6 +82,7 @@ with-time-logging create-work-queue + create-thread-pool with-timeout reset-timeout @@ -1145,6 +1146,165 @@ References: ~a~%" (values process-job count-jobs count-threads list-jobs))) +(define* (create-thread-pool thread-count-parameter get-job proc + #:key thread-start-delay + (thread-stop-delay + (make-time time-duration 0 0)) + (name "unnamed")) + (let ((pool-mutex (make-mutex)) + (job-available (make-condition-variable)) + (running-job-args (make-hash-table))) + + (define get-thread-count + (cond + ((number? thread-count-parameter) + (const thread-count-parameter)) + (else + thread-count-parameter))) + + (define (count-threads) + (hash-count (const #t) running-job-args)) + + (define (list-jobs) + (append (hash-fold (lambda (key val result) + (or (and val + (cons val result)) + result)) + '() + running-job-args))) + + (define (thread-process-job job-args) + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "job raised exception: ~A\n" + job-args)) + (lambda () + (with-throw-handler #t + (lambda () + (apply proc job-args)) + (lambda (key . args) + (simple-format (current-error-port) + "exception when handling job: ~A ~A\n" + key args) + (backtrace)))) + #:unwind? #t)) + + (define (start-thread thread-index) + (define (too-many-threads?) + (let ((running-jobs-count + (hash-count (lambda (index val) + (list? val)) + running-job-args)) + (desired-thread-count (get-thread-count))) + + (>= running-jobs-count + desired-thread-count))) + + (define (thread-idle-for-too-long? last-job-finished-at) + (time>=? + (time-difference (current-time time-monotonic) + last-job-finished-at) + thread-stop-delay)) + + (define (stop-thread) + (hash-remove! running-job-args + thread-index) + (unlock-mutex pool-mutex)) + + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append "gbc " name " p t " + (number->string thread-index)))) + (const #t)) + + (let loop ((last-job-finished-at (current-time time-monotonic))) + (lock-mutex pool-mutex) + + (if (too-many-threads?) + (stop-thread) + (let* ((running-jobs + (list-jobs)) + (job-args + (or (get-job running-jobs) + ;; #f from wait-condition-variable indicates a timeout + (if (wait-condition-variable + job-available + pool-mutex + (+ 9 (time-second (current-time)))) + (get-job running-jobs) + #f)))) + (if job-args + (begin + (hash-set! running-job-args + thread-index + job-args) + + (unlock-mutex pool-mutex) + (thread-process-job job-args) + + (with-mutex pool-mutex + (hash-set! running-job-args + thread-index + #f)) + + (loop (current-time time-monotonic))) + (if (thread-idle-for-too-long? last-job-finished-at) + (stop-thread) + (begin + (unlock-mutex pool-mutex) + + (loop last-job-finished-at)))))))))) + + (define start-new-threads-if-necessary + (let ((previous-thread-started-at (make-time time-monotonic 0 0))) + (lambda (desired-count) + (let* ((thread-count + (hash-count (const #t) running-job-args)) + (threads-to-start + (- desired-count thread-count))) + (when (> threads-to-start 0) + (for-each + (lambda (thread-index) + (when (eq? (hash-ref running-job-args + thread-index + 'slot-free) + 'slot-free) + (let* ((now (current-time time-monotonic)) + (elapsed (time-difference now + previous-thread-started-at))) + (when (or (eq? #f thread-start-delay) + (time>=? elapsed thread-start-delay)) + (set! previous-thread-started-at now) + (hash-set! running-job-args + thread-index + #f) + (start-thread thread-index))))) + (iota desired-count))))))) + + (if (procedure? thread-count-parameter) + (call-with-new-thread + (lambda () + (lock-mutex pool-mutex) + (while #t + (let ((idle-threads (hash-count (lambda (index val) + (eq? #f val)) + running-job-args))) + (when (= 0 idle-threads) + (start-new-threads-if-necessary (get-thread-count)))) + + (wait-condition-variable + job-available + pool-mutex + (+ 15 (time-second (current-time))))))) + + (start-new-threads-if-necessary (get-thread-count))) + + (values pool-mutex job-available count-threads list-jobs))) + ;; copied from (guix scripts substitute) (define-syntax-rule (with-timeout duration handler body ...) "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY |