From 0ab4f17e57fb3e02a4f9869fe48b498164caa61d Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 28 Mar 2023 12:05:26 +0100 Subject: Change how parallel hook processing works The previous approach was inefficient, since there was a thread that just repeatedly tries to queue every unprocessed hook event for processing. Instead, use a thread pool that pulls events from the database. This still involves some work to not process the same event in different threads, but it should hopefully scale better. --- guix-build-coordinator/coordinator.scm | 161 +++++++++++++++------------------ 1 file changed, 72 insertions(+), 89 deletions(-) diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 87c2345..6741446 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -1037,105 +1037,88 @@ `((event . ,event)))) #:unwind? #t)) - (define (single-thread-process-events mtx condvar event-name handler) - (call-with-new-thread - (lambda () - (lock-mutex mtx) - (while #t - (with-exception-handler - (lambda (exn) - (with-exception-handler - (lambda _ - ;; Things are really going wrong if logging about - ;; the hook processing thread crashing, also raises - ;; an exception, so just try and sleep and hope - ;; things go better next time - (sleep 10)) - (lambda () - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "hook processing thread " event-name - " exception: " exn)) - #:unwind? #t)) - (lambda () - (while #t - (match (datastore-list-unprocessed-hook-events datastore event-name 1) - (() - (wait-condition-variable condvar - mtx - (+ (time-second (current-time)) - wait-timeout-seconds))) - (((id event arguments)) - (process-event id event arguments handler))))) - #:unwind? #t))))) - - (define (work-queue-process-events mtx condvar event-name handler thread-count) - (let-values (((process-job count-jobs - count-threads - list-jobs) - (create-work-queue - thread-count + (define (single-thread-process-events event-name handler) + (let ((mtx (make-mutex)) + (condvar (make-condition-variable))) + (call-with-new-thread + (lambda () + (lock-mutex mtx) + (while #t + (with-exception-handler + (lambda (exn) + (with-exception-handler + (lambda _ + ;; Things are really going wrong if logging about + ;; the hook processing thread crashing, also raises + ;; an exception, so just try and sleep and hope + ;; things go better next time + (sleep 10)) + (lambda () + (log-msg (build-coordinator-logger build-coordinator) + 'CRITICAL + "hook processing thread " event-name + " exception: " exn)) + #:unwind? #t)) + (lambda () + (while #t + (match (datastore-list-unprocessed-hook-events datastore event-name 1) + (() + (wait-condition-variable condvar + mtx + (+ (time-second (current-time)) + wait-timeout-seconds))) + (((id event arguments)) + (process-event id event arguments handler))))) + #:unwind? #t)))) + condvar)) + + (define (work-queue-process-events event-name handler thread-count) + (let-values (((pool-mutex job-available count-threads list-jobs) + (create-thread-pool + (lambda () + (length + (datastore-list-unprocessed-hook-events + datastore + event-name + thread-count))) + (lambda (running-jobs) + (let* ((in-progress-ids + (map car running-jobs)) + (potential-jobs + (map + (match-lambda + ((id _ _) (list id))) + (datastore-list-unprocessed-hook-events + datastore + event-name + (+ 1 (length in-progress-ids)))))) + (find + (match-lambda + ((id) + (not (member id in-progress-ids)))) + potential-jobs))) (lambda (id) (match (datastore-find-unprocessed-hook-event datastore id) (#f #f) ; already processed ((event arguments) - (process-event id event arguments handler))))))) - - (call-with-new-thread - (lambda () - (lock-mutex mtx) - (while #t - (match (datastore-list-unprocessed-hook-events - datastore - event-name - 100000) - (() - (wait-condition-variable condvar - mtx - (+ (time-second (current-time)) - 10))) - (job-args - (let* ((jobs - (list-jobs)) - (running-ids - (make-hash-table (length jobs)))) - (for-each (lambda (id) - (hash-set! running-ids - id - #t)) - (map car jobs)) - - (for-each (match-lambda - ((id _ _) - (unless (hash-ref running-ids id) - (process-job id)))) - job-args)) - (wait-condition-variable condvar - mtx - (+ (time-second (current-time)) - 10))))))))) + (process-event id event arguments handler)))) + #:name (symbol->string event-name)))) + job-available)) (map (match-lambda ((event-name . handler) - (let ((mtx (make-mutex)) - (condvar (make-condition-variable))) - (or (and=> - (assq-ref parallel-hooks event-name) - (lambda (thread-count) - (work-queue-process-events mtx - condvar - event-name - handler - thread-count))) - (single-thread-process-events mtx - condvar - event-name - handler)) - - (cons event-name condvar)))) + (cons event-name + (or (and=> + (assq-ref parallel-hooks event-name) + (lambda (thread-count) + (work-queue-process-events event-name + handler + thread-count))) + (single-thread-process-events event-name + handler))))) (build-coordinator-hooks build-coordinator))) (define (fetch-builds build-coordinator agent systems -- cgit v1.2.3