aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2025-02-10 12:03:29 +0000
committerChristopher Baines <mail@cbaines.net>2025-02-10 12:04:35 +0000
commitf5f50341ae37b98fb7ffbbf77873da921cbd6d1d (patch)
treeeee6396ab2d5c554c53722135ce0088f0ca953bc
parent68968ce66d2900dceaaba9e4a62867c8a232ff7b (diff)
downloadbuild-coordinator-f5f50341ae37b98fb7ffbbf77873da921cbd6d1d.tar
build-coordinator-f5f50341ae37b98fb7ffbbf77873da921cbd6d1d.tar.gz
Alter how parallel hooks are implemented
Use fibers and a knots thread pool, rather than a cusotm thread pool. I'm hoping this'll be more efficient and easier to debug when it's not working.
-rw-r--r--guix-build-coordinator/coordinator.scm206
1 files changed, 123 insertions, 83 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index a47f61a..4086621 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -479,11 +479,6 @@
build-coordinator
(make-build-allocator-thread build-coordinator))
- (set-build-coordinator-hook-condvars!
- build-coordinator
- (start-hook-processing-threads build-coordinator
- parallel-hooks))
-
(when trigger-build-allocation?
(trigger-build-allocation build-coordinator)))
@@ -562,6 +557,11 @@
(datastore-spawn-fibers
(build-coordinator-datastore build-coordinator))
+ (set-build-coordinator-hook-condvars!
+ build-coordinator
+ (start-hook-processing-threads build-coordinator
+ parallel-hooks))
+
(set-build-coordinator-background-job-conditions!
build-coordinator
(start-background-job-processing-fibers build-coordinator))
@@ -1042,7 +1042,15 @@
(and=> (assoc-ref (build-coordinator-hook-condvars build-coordinator)
event-name)
(lambda (condvar)
- (signal-condition-variable condvar)
+ (cond
+ ((condition-variable? condvar)
+ (signal-condition-variable condvar))
+ ((reusable-condition? condvar)
+ (signal-reusable-condition! condvar))
+ (else
+ (error
+ (simple-format #f "unrecognised condvar ~A"
+ condvar))))
#t)))
(define (update-build-allocation-plan build-coordinator)
@@ -1455,84 +1463,116 @@
(define (single-thread-process-events event-name handler)
(let ((mtx (make-mutex))
(condvar (make-condition-variable)))
- (call-with-new-thread
+ (call-with-default-io-waiters
(lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (symbol->string event-name)))
- (const #t))
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (symbol->string event-name)))
+ (const #t))
+
+ (lock-mutex mtx)
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (with-exception-handler
+ (lambda _
+ ;; Things are really going wrong if logging about
+ ;; the hook processing thread crashing, also raises
+ ;; an exception, so just try and sleep and hope
+ ;; things go better next time
+ (sleep 10))
+ (lambda ()
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "hook processing thread " event-name
+ " exception: " exn))
+ #:unwind? #t)
+ (sleep 10))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (while #t
+ (match (datastore-list-unprocessed-hook-events datastore event-name 1)
+ (()
+ (wait-condition-variable condvar mtx))
+ (((id event arguments))
+ (process-event id event arguments handler)))))
+ (lambda (key . args)
+ (log-msg
+ (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "error in " event-name " hook processing thread: " key " " args)
+ (backtrace))))
+ #:unwind? #t))))))
+ condvar))
- (lock-mutex mtx)
- (while #t
- (with-exception-handler
- (lambda (exn)
- (with-exception-handler
- (lambda _
- ;; Things are really going wrong if logging about
- ;; the hook processing thread crashing, also raises
- ;; an exception, so just try and sleep and hope
- ;; things go better next time
- (sleep 10))
- (lambda ()
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "hook processing thread " event-name
- " exception: " exn))
- #:unwind? #t)
- (sleep 10))
- (lambda ()
- (with-throw-handler #t
+ (define (thread-pool-process-events event-name handler thread-count)
+ (let ((thread-pool
+ (make-thread-pool thread-count))
+ (reusable-condition
+ (make-reusable-condition))
+ (coordination-channel
+ (make-channel)))
+
+ (spawn-fiber
+ (lambda ()
+ (let loop ((running-ids '()))
+ (match (get-message coordination-channel)
+ (('process id event arguments)
+ (if (member id running-ids)
+ ;; Ignore already running jobs
+ (loop running-ids)
+ (begin
+ (spawn-fiber
+ (lambda ()
+ (call-with-thread
+ thread-pool
+ (lambda ()
+ (process-event id event arguments handler))
+ ;; TODO Make this the default through knots
+ #:timeout #f)
+ (put-message coordination-channel
+ (list 'finished id))))
+ (loop (cons id running-ids)))))
+ (('finished id)
+ (when (< (length running-ids)
+ (* 2 thread-count))
+ (signal-reusable-condition! reusable-condition))
+ (loop (delete id running-ids)))
+ (('count-running reply)
+ (let ((count (length running-ids)))
+ (spawn-fiber
(lambda ()
- (while #t
- (match (datastore-list-unprocessed-hook-events datastore event-name 1)
- (()
- (wait-condition-variable condvar mtx))
- (((id event arguments))
- (process-event id event arguments handler)))))
- (lambda (key . args)
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "error in " event-name " hook processing thread: " key " " args)
- (backtrace))))
- #:unwind? #t))))
- condvar))
+ (put-message reply count))))
+ (loop running-ids))))))
- (define (work-queue-process-events event-name handler thread-count)
- (let-values (((pool-mutex job-available count-threads list-jobs)
- (create-thread-pool
- (lambda ()
- (max
- 1
- (length
- (datastore-list-unprocessed-hook-events
- datastore
- event-name
- thread-count))))
- (lambda (running-jobs)
- (let* ((in-progress-ids
- (map car running-jobs))
- (potential-jobs
- (datastore-list-unprocessed-hook-events
- datastore
- event-name
- (+ 1 (length in-progress-ids))))
- (job
- (find
- (match-lambda
- ((id rest ...)
- (not (member id in-progress-ids))))
- potential-jobs)))
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'DEBUG
- event-name " work queue, got job " job)
- job))
- (lambda (id event arguments)
- (process-event id event arguments handler))
- #:name (symbol->string event-name))))
- job-available))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let ((count
+ (let ((channel (make-channel)))
+ (put-message coordination-channel
+ (list 'count-running channel))
+ (get-message channel))))
+ (when (< count (* 2 thread-count))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG "submitting batch of " event-name " hook events")
+ (for-each
+ (match-lambda
+ ((id event arguments)
+ (put-message coordination-channel
+ (list 'process id event arguments))))
+ (datastore-list-unprocessed-hook-events
+ datastore
+ event-name
+ (* 20 thread-count)))))
+ (reusable-condition-wait reusable-condition
+ #:timeout 60))))
+
+ reusable-condition))
(map
(match-lambda
@@ -1541,9 +1581,9 @@
(or (and=>
(assq-ref parallel-hooks event-name)
(lambda (thread-count)
- (work-queue-process-events event-name
- handler
- thread-count)))
+ (thread-pool-process-events event-name
+ handler
+ thread-count)))
(single-thread-process-events event-name
handler)))))
(build-coordinator-hooks build-coordinator)))