From 23504e0f01dc1eae05b307e313ba70faaad84be8 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 30 Jun 2022 18:47:10 +0100 Subject: Support processing hook events in parallel Forcing hooks to be sequential simplifies them, and the implementation, but it doesn't always scale well. I'm particularly thinking about the build-submitted hook and built-success hooks, the processing of which can back up if there's lots of builds being submitted or finishing successfully. This new functionality allows hooks to be processed in parallel, which should allow to manage this more effectively. --- guix-build-coordinator/coordinator.scm | 141 +++++++++++++++++++++------- guix-build-coordinator/datastore.scm | 1 + guix-build-coordinator/datastore/sqlite.scm | 32 +++++++ 3 files changed, 138 insertions(+), 36 deletions(-) (limited to 'guix-build-coordinator') diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 08eecf3..800ae1a 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -21,6 +21,7 @@ (define-module (guix-build-coordinator coordinator) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) + #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) #:use-module (ice-9 ftw) @@ -169,7 +170,8 @@ #:key (update-datastore? #t) (pid-file #f) - (trigger-build-allocation? #t)) + (trigger-build-allocation? #t) + (parallel-hooks '())) (when update-datastore? (datastore-update (build-coordinator-datastore build-coordinator))) @@ -184,7 +186,8 @@ (set-build-coordinator-hook-condvars! build-coordinator - (start-hook-processing-threads build-coordinator)) + (start-hook-processing-threads build-coordinator + parallel-hooks)) (when trigger-build-allocation? (trigger-build-allocation build-coordinator))) @@ -198,11 +201,13 @@ (pid-file #f) (agent-communication-uri %default-agent-uri) (client-communication-uri %default-client-uri) - secret-key-base) + secret-key-base + (parallel-hooks '())) (perform-coordinator-service-startup build-coordinator #:update-datastore? update-datastore? - #:pid-file pid-file) + #:pid-file pid-file + #:parallel-hooks parallel-hooks) ;; Create some worker thread channels, which need to be created prior ;; to run-fibers being called. @@ -674,7 +679,7 @@ #:unwind? #t))) #:parallel? #t)) -(define (start-hook-processing-threads build-coordinator) +(define (start-hook-processing-threads build-coordinator parallel-hooks) (define wait-timeout-seconds (* 60 5)) (define datastore @@ -734,42 +739,106 @@ #:label-values `((event . ,event)))))) + (define (single-thread-process-events mtx condvar event-name handler) + (call-with-new-thread + (lambda () + (lock-mutex mtx) + (while #t + (with-exception-handler + (lambda (exn) + (with-exception-handler + (lambda _ + ;; Things are really going wrong if logging about + ;; the hook processing thread crashing, also raises + ;; an exception, so just try and sleep and hope + ;; things go better next time + (sleep 10)) + (lambda () + (log-msg (build-coordinator-logger build-coordinator) + 'CRITICAL + "hook processing thread " event-name + " exception: " exn)) + #:unwind? #t)) + (lambda () + (while #t + (match (datastore-list-unprocessed-hook-events datastore event-name 1) + (() + (wait-condition-variable condvar + mtx + (+ (time-second (current-time)) + wait-timeout-seconds))) + (((id event arguments)) + (process-event id event arguments handler))))) + #:unwind? #t) + (sleep 10))))) + + (define (work-queue-process-events mtx condvar event-name handler thread-count) + (let-values (((process-job count-jobs + count-threads + list-jobs) + (create-work-queue + thread-count + (lambda (id) + (match (datastore-find-unprocessed-hook-event + datastore + id) + (#f #f) ; already processed + ((event arguments) + (process-event id event arguments handler))))))) + + (call-with-new-thread + (lambda () + (lock-mutex mtx) + (while #t + (match (datastore-list-unprocessed-hook-events + datastore + event-name + 100000) + (() + (wait-condition-variable condvar + mtx + (+ (time-second (current-time)) + 10))) + (job-args + (let* ((jobs + (list-jobs)) + (running-ids + (make-hash-table (length jobs)))) + (for-each (match-lambda + ((id _ _) + (hash-set! running-ids + id + #t))) + jobs) + + (for-each (match-lambda + ((id _ _) + (unless (hash-ref running-ids id) + (process-job id)))) + job-args)) + (wait-condition-variable condvar + mtx + (+ (time-second (current-time)) + 10))))))))) + (map (match-lambda ((event-name . handler) (let ((mtx (make-mutex)) (condvar (make-condition-variable))) - (call-with-new-thread - (lambda () - (lock-mutex mtx) - (while #t - (with-exception-handler - (lambda (exn) - (with-exception-handler - (lambda _ - ;; Things are really going wrong if logging about - ;; the hook processing thread crashing, also raises - ;; an exception, so just try and sleep and hope - ;; things go better next time - (sleep 10)) - (lambda () - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "hook processing thread " event-name - " exception: " exn)) - #:unwind? #t)) - (lambda () - (while #t - (match (datastore-list-unprocessed-hook-events datastore event-name 1) - (() - (wait-condition-variable condvar - mtx - (+ (time-second (current-time)) - wait-timeout-seconds))) - (((id event arguments)) - (process-event id event arguments handler))))) - #:unwind? #t) - (sleep 10)))) + (or (and=> + (assq-ref parallel-hooks event-name) + (lambda (thread-count) + (work-queue-process-events mtx + condvar + event-name + handler + thread-count))) + (single-thread-process-events mtx + condvar + event-name + handler)) + (cons event-name condvar)))) (build-coordinator-hooks build-coordinator))) diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm index 4aa2981..156e637 100644 --- a/guix-build-coordinator/datastore.scm +++ b/guix-build-coordinator/datastore.scm @@ -66,6 +66,7 @@ (re-export datastore-insert-unprocessed-hook-event) (re-export datastore-count-unprocessed-hook-events) (re-export datastore-list-unprocessed-hook-events) +(re-export datastore-find-unprocessed-hook-event) (re-export datastore-delete-unprocessed-hook-event) (re-export datastore-list-agent-builds) (re-export datastore-find-derivation) diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index bbd9c28..a6c93f3 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -80,6 +80,7 @@ datastore-insert-unprocessed-hook-event datastore-count-unprocessed-hook-events datastore-list-unprocessed-hook-events + datastore-find-unprocessed-hook-event datastore-delete-unprocessed-hook-event datastore-list-agent-builds datastore-agent-for-build @@ -2480,6 +2481,37 @@ LIMIT :limit" events))))) +(define-method (datastore-find-unprocessed-hook-event + (datastore ) + id) + (call-with-worker-thread + (slot-ref datastore 'worker-reader-thread-channel) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT event, arguments +FROM unprocessed_hook_events +WHERE id = :id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:id id) + + (let ((result + (match (sqlite-step statement) + (#f #f) + (#(event arguments) + (list (string->symbol event) + (call-with-input-string arguments + (lambda (port) + (read port)))))))) + (sqlite-reset statement) + + result))))) + (define-method (datastore-delete-unprocessed-hook-event (datastore ) id) -- cgit v1.2.3