diff options
author | Christopher Baines <mail@cbaines.net> | 2025-02-10 12:03:29 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2025-02-10 12:04:35 +0000 |
commit | f5f50341ae37b98fb7ffbbf77873da921cbd6d1d (patch) | |
tree | eee6396ab2d5c554c53722135ce0088f0ca953bc | |
parent | 68968ce66d2900dceaaba9e4a62867c8a232ff7b (diff) | |
download | build-coordinator-f5f50341ae37b98fb7ffbbf77873da921cbd6d1d.tar build-coordinator-f5f50341ae37b98fb7ffbbf77873da921cbd6d1d.tar.gz |
Alter how parallel hooks are implemented
Use fibers and a knots thread pool, rather than a cusotm thread pool. I'm
hoping this'll be more efficient and easier to debug when it's not working.
-rw-r--r-- | guix-build-coordinator/coordinator.scm | 206 |
1 files changed, 123 insertions, 83 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index a47f61a..4086621 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -479,11 +479,6 @@ build-coordinator (make-build-allocator-thread build-coordinator)) - (set-build-coordinator-hook-condvars! - build-coordinator - (start-hook-processing-threads build-coordinator - parallel-hooks)) - (when trigger-build-allocation? (trigger-build-allocation build-coordinator))) @@ -562,6 +557,11 @@ (datastore-spawn-fibers (build-coordinator-datastore build-coordinator)) + (set-build-coordinator-hook-condvars! + build-coordinator + (start-hook-processing-threads build-coordinator + parallel-hooks)) + (set-build-coordinator-background-job-conditions! build-coordinator (start-background-job-processing-fibers build-coordinator)) @@ -1042,7 +1042,15 @@ (and=> (assoc-ref (build-coordinator-hook-condvars build-coordinator) event-name) (lambda (condvar) - (signal-condition-variable condvar) + (cond + ((condition-variable? condvar) + (signal-condition-variable condvar)) + ((reusable-condition? condvar) + (signal-reusable-condition! condvar)) + (else + (error + (simple-format #f "unrecognised condvar ~A" + condvar)))) #t))) (define (update-build-allocation-plan build-coordinator) @@ -1455,84 +1463,116 @@ (define (single-thread-process-events event-name handler) (let ((mtx (make-mutex)) (condvar (make-condition-variable))) - (call-with-new-thread + (call-with-default-io-waiters (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (symbol->string event-name))) - (const #t)) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (symbol->string event-name))) + (const #t)) + + (lock-mutex mtx) + (while #t + (with-exception-handler + (lambda (exn) + (with-exception-handler + (lambda _ + ;; Things are really going wrong if logging about + ;; the hook processing thread crashing, also raises + ;; an exception, so just try and sleep and hope + ;; things go better next time + (sleep 10)) + (lambda () + (log-msg (build-coordinator-logger build-coordinator) + 'CRITICAL + "hook processing thread " event-name + " exception: " exn)) + #:unwind? #t) + (sleep 10)) + (lambda () + (with-throw-handler #t + (lambda () + (while #t + (match (datastore-list-unprocessed-hook-events datastore event-name 1) + (() + (wait-condition-variable condvar mtx)) + (((id event arguments)) + (process-event id event arguments handler))))) + (lambda (key . args) + (log-msg + (build-coordinator-logger build-coordinator) + 'CRITICAL + "error in " event-name " hook processing thread: " key " " args) + (backtrace)))) + #:unwind? #t)))))) + condvar)) - (lock-mutex mtx) - (while #t - (with-exception-handler - (lambda (exn) - (with-exception-handler - (lambda _ - ;; Things are really going wrong if logging about - ;; the hook processing thread crashing, also raises - ;; an exception, so just try and sleep and hope - ;; things go better next time - (sleep 10)) - (lambda () - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "hook processing thread " event-name - " exception: " exn)) - #:unwind? #t) - (sleep 10)) - (lambda () - (with-throw-handler #t + (define (thread-pool-process-events event-name handler thread-count) + (let ((thread-pool + (make-thread-pool thread-count)) + (reusable-condition + (make-reusable-condition)) + (coordination-channel + (make-channel))) + + (spawn-fiber + (lambda () + (let loop ((running-ids '())) + (match (get-message coordination-channel) + (('process id event arguments) + (if (member id running-ids) + ;; Ignore already running jobs + (loop running-ids) + (begin + (spawn-fiber + (lambda () + (call-with-thread + thread-pool + (lambda () + (process-event id event arguments handler)) + ;; TODO Make this the default through knots + #:timeout #f) + (put-message coordination-channel + (list 'finished id)))) + (loop (cons id running-ids))))) + (('finished id) + (when (< (length running-ids) + (* 2 thread-count)) + (signal-reusable-condition! reusable-condition)) + (loop (delete id running-ids))) + (('count-running reply) + (let ((count (length running-ids))) + (spawn-fiber (lambda () - (while #t - (match (datastore-list-unprocessed-hook-events datastore event-name 1) - (() - (wait-condition-variable condvar mtx)) - (((id event arguments)) - (process-event id event arguments handler))))) - (lambda (key . args) - (log-msg - (build-coordinator-logger build-coordinator) - 'CRITICAL - "error in " event-name " hook processing thread: " key " " args) - (backtrace)))) - #:unwind? #t)))) - condvar)) + (put-message reply count)))) + (loop running-ids)))))) - (define (work-queue-process-events event-name handler thread-count) - (let-values (((pool-mutex job-available count-threads list-jobs) - (create-thread-pool - (lambda () - (max - 1 - (length - (datastore-list-unprocessed-hook-events - datastore - event-name - thread-count)))) - (lambda (running-jobs) - (let* ((in-progress-ids - (map car running-jobs)) - (potential-jobs - (datastore-list-unprocessed-hook-events - datastore - event-name - (+ 1 (length in-progress-ids)))) - (job - (find - (match-lambda - ((id rest ...) - (not (member id in-progress-ids)))) - potential-jobs))) - (log-msg - (build-coordinator-logger build-coordinator) - 'DEBUG - event-name " work queue, got job " job) - job)) - (lambda (id event arguments) - (process-event id event arguments handler)) - #:name (symbol->string event-name)))) - job-available)) + (spawn-fiber + (lambda () + (while #t + (let ((count + (let ((channel (make-channel))) + (put-message coordination-channel + (list 'count-running channel)) + (get-message channel)))) + (when (< count (* 2 thread-count)) + (log-msg (build-coordinator-logger build-coordinator) + 'DEBUG "submitting batch of " event-name " hook events") + (for-each + (match-lambda + ((id event arguments) + (put-message coordination-channel + (list 'process id event arguments)))) + (datastore-list-unprocessed-hook-events + datastore + event-name + (* 20 thread-count))))) + (reusable-condition-wait reusable-condition + #:timeout 60)))) + + reusable-condition)) (map (match-lambda @@ -1541,9 +1581,9 @@ (or (and=> (assq-ref parallel-hooks event-name) (lambda (thread-count) - (work-queue-process-events event-name - handler - thread-count))) + (thread-pool-process-events event-name + handler + thread-count))) (single-thread-process-events event-name handler))))) (build-coordinator-hooks build-coordinator))) |