aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-03-28 12:05:26 +0100
committerChristopher Baines <mail@cbaines.net>2023-03-28 12:05:26 +0100
commit0ab4f17e57fb3e02a4f9869fe48b498164caa61d (patch)
treede6f2404b6e8a9486b55ad53944f42dc50a7001f
parent868ac19db4ca7a8e6f95b5029b443f84c15e58cd (diff)
downloadbuild-coordinator-0ab4f17e57fb3e02a4f9869fe48b498164caa61d.tar
build-coordinator-0ab4f17e57fb3e02a4f9869fe48b498164caa61d.tar.gz
Change how parallel hook processing works
The previous approach was inefficient, since there was a thread that just repeatedly tries to queue every unprocessed hook event for processing. Instead, use a thread pool that pulls events from the database. This still involves some work to not process the same event in different threads, but it should hopefully scale better.
-rw-r--r--guix-build-coordinator/coordinator.scm161
1 files changed, 72 insertions, 89 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index 87c2345..6741446 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -1037,105 +1037,88 @@
`((event . ,event))))
#:unwind? #t))
- (define (single-thread-process-events mtx condvar event-name handler)
- (call-with-new-thread
- (lambda ()
- (lock-mutex mtx)
- (while #t
- (with-exception-handler
- (lambda (exn)
- (with-exception-handler
- (lambda _
- ;; Things are really going wrong if logging about
- ;; the hook processing thread crashing, also raises
- ;; an exception, so just try and sleep and hope
- ;; things go better next time
- (sleep 10))
- (lambda ()
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "hook processing thread " event-name
- " exception: " exn))
- #:unwind? #t))
- (lambda ()
- (while #t
- (match (datastore-list-unprocessed-hook-events datastore event-name 1)
- (()
- (wait-condition-variable condvar
- mtx
- (+ (time-second (current-time))
- wait-timeout-seconds)))
- (((id event arguments))
- (process-event id event arguments handler)))))
- #:unwind? #t)))))
-
- (define (work-queue-process-events mtx condvar event-name handler thread-count)
- (let-values (((process-job count-jobs
- count-threads
- list-jobs)
- (create-work-queue
- thread-count
+ (define (single-thread-process-events event-name handler)
+ (let ((mtx (make-mutex))
+ (condvar (make-condition-variable)))
+ (call-with-new-thread
+ (lambda ()
+ (lock-mutex mtx)
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (with-exception-handler
+ (lambda _
+ ;; Things are really going wrong if logging about
+ ;; the hook processing thread crashing, also raises
+ ;; an exception, so just try and sleep and hope
+ ;; things go better next time
+ (sleep 10))
+ (lambda ()
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "hook processing thread " event-name
+ " exception: " exn))
+ #:unwind? #t))
+ (lambda ()
+ (while #t
+ (match (datastore-list-unprocessed-hook-events datastore event-name 1)
+ (()
+ (wait-condition-variable condvar
+ mtx
+ (+ (time-second (current-time))
+ wait-timeout-seconds)))
+ (((id event arguments))
+ (process-event id event arguments handler)))))
+ #:unwind? #t))))
+ condvar))
+
+ (define (work-queue-process-events event-name handler thread-count)
+ (let-values (((pool-mutex job-available count-threads list-jobs)
+ (create-thread-pool
+ (lambda ()
+ (length
+ (datastore-list-unprocessed-hook-events
+ datastore
+ event-name
+ thread-count)))
+ (lambda (running-jobs)
+ (let* ((in-progress-ids
+ (map car running-jobs))
+ (potential-jobs
+ (map
+ (match-lambda
+ ((id _ _) (list id)))
+ (datastore-list-unprocessed-hook-events
+ datastore
+ event-name
+ (+ 1 (length in-progress-ids))))))
+ (find
+ (match-lambda
+ ((id)
+ (not (member id in-progress-ids))))
+ potential-jobs)))
(lambda (id)
(match (datastore-find-unprocessed-hook-event
datastore
id)
(#f #f) ; already processed
((event arguments)
- (process-event id event arguments handler)))))))
-
- (call-with-new-thread
- (lambda ()
- (lock-mutex mtx)
- (while #t
- (match (datastore-list-unprocessed-hook-events
- datastore
- event-name
- 100000)
- (()
- (wait-condition-variable condvar
- mtx
- (+ (time-second (current-time))
- 10)))
- (job-args
- (let* ((jobs
- (list-jobs))
- (running-ids
- (make-hash-table (length jobs))))
- (for-each (lambda (id)
- (hash-set! running-ids
- id
- #t))
- (map car jobs))
-
- (for-each (match-lambda
- ((id _ _)
- (unless (hash-ref running-ids id)
- (process-job id))))
- job-args))
- (wait-condition-variable condvar
- mtx
- (+ (time-second (current-time))
- 10)))))))))
+ (process-event id event arguments handler))))
+ #:name (symbol->string event-name))))
+ job-available))
(map
(match-lambda
((event-name . handler)
- (let ((mtx (make-mutex))
- (condvar (make-condition-variable)))
- (or (and=>
- (assq-ref parallel-hooks event-name)
- (lambda (thread-count)
- (work-queue-process-events mtx
- condvar
- event-name
- handler
- thread-count)))
- (single-thread-process-events mtx
- condvar
- event-name
- handler))
-
- (cons event-name condvar))))
+ (cons event-name
+ (or (and=>
+ (assq-ref parallel-hooks event-name)
+ (lambda (thread-count)
+ (work-queue-process-events event-name
+ handler
+ thread-count)))
+ (single-thread-process-events event-name
+ handler)))))
(build-coordinator-hooks build-coordinator)))
(define (fetch-builds build-coordinator agent systems