diff options
author | Christopher Baines <mail@cbaines.net> | 2022-06-30 18:47:10 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2022-06-30 20:50:19 +0100 |
commit | 23504e0f01dc1eae05b307e313ba70faaad84be8 (patch) | |
tree | 0831e28cd26a1bcb46da02580305901154b21471 | |
parent | 70c83f81db4a6dd9ad94d4ec2b3b3be62bd0f467 (diff) | |
download | build-coordinator-23504e0f01dc1eae05b307e313ba70faaad84be8.tar build-coordinator-23504e0f01dc1eae05b307e313ba70faaad84be8.tar.gz |
Support processing hook events in parallel
Forcing hooks to be sequential simplifies them, and the implementation, but it
doesn't always scale well. I'm particularly thinking about the build-submitted
hook and built-success hooks, the processing of which can back up if there's
lots of builds being submitted or finishing successfully.
This new functionality allows hooks to be processed in parallel, which should
allow to manage this more effectively.
-rw-r--r-- | guix-build-coordinator/coordinator.scm | 141 | ||||
-rw-r--r-- | guix-build-coordinator/datastore.scm | 1 | ||||
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 32 | ||||
-rw-r--r-- | scripts/guix-build-coordinator.in | 29 |
4 files changed, 158 insertions, 45 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 08eecf3..800ae1a 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -21,6 +21,7 @@ (define-module (guix-build-coordinator coordinator) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) + #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) #:use-module (ice-9 ftw) @@ -169,7 +170,8 @@ #:key (update-datastore? #t) (pid-file #f) - (trigger-build-allocation? #t)) + (trigger-build-allocation? #t) + (parallel-hooks '())) (when update-datastore? (datastore-update (build-coordinator-datastore build-coordinator))) @@ -184,7 +186,8 @@ (set-build-coordinator-hook-condvars! build-coordinator - (start-hook-processing-threads build-coordinator)) + (start-hook-processing-threads build-coordinator + parallel-hooks)) (when trigger-build-allocation? (trigger-build-allocation build-coordinator))) @@ -198,11 +201,13 @@ (pid-file #f) (agent-communication-uri %default-agent-uri) (client-communication-uri %default-client-uri) - secret-key-base) + secret-key-base + (parallel-hooks '())) (perform-coordinator-service-startup build-coordinator #:update-datastore? update-datastore? - #:pid-file pid-file) + #:pid-file pid-file + #:parallel-hooks parallel-hooks) ;; Create some worker thread channels, which need to be created prior ;; to run-fibers being called. @@ -674,7 +679,7 @@ #:unwind? #t))) #:parallel? #t)) -(define (start-hook-processing-threads build-coordinator) +(define (start-hook-processing-threads build-coordinator parallel-hooks) (define wait-timeout-seconds (* 60 5)) (define datastore @@ -734,42 +739,106 @@ #:label-values `((event . ,event)))))) + (define (single-thread-process-events mtx condvar event-name handler) + (call-with-new-thread + (lambda () + (lock-mutex mtx) + (while #t + (with-exception-handler + (lambda (exn) + (with-exception-handler + (lambda _ + ;; Things are really going wrong if logging about + ;; the hook processing thread crashing, also raises + ;; an exception, so just try and sleep and hope + ;; things go better next time + (sleep 10)) + (lambda () + (log-msg (build-coordinator-logger build-coordinator) + 'CRITICAL + "hook processing thread " event-name + " exception: " exn)) + #:unwind? #t)) + (lambda () + (while #t + (match (datastore-list-unprocessed-hook-events datastore event-name 1) + (() + (wait-condition-variable condvar + mtx + (+ (time-second (current-time)) + wait-timeout-seconds))) + (((id event arguments)) + (process-event id event arguments handler))))) + #:unwind? #t) + (sleep 10))))) + + (define (work-queue-process-events mtx condvar event-name handler thread-count) + (let-values (((process-job count-jobs + count-threads + list-jobs) + (create-work-queue + thread-count + (lambda (id) + (match (datastore-find-unprocessed-hook-event + datastore + id) + (#f #f) ; already processed + ((event arguments) + (process-event id event arguments handler))))))) + + (call-with-new-thread + (lambda () + (lock-mutex mtx) + (while #t + (match (datastore-list-unprocessed-hook-events + datastore + event-name + 100000) + (() + (wait-condition-variable condvar + mtx + (+ (time-second (current-time)) + 10))) + (job-args + (let* ((jobs + (list-jobs)) + (running-ids + (make-hash-table (length jobs)))) + (for-each (match-lambda + ((id _ _) + (hash-set! running-ids + id + #t))) + jobs) + + (for-each (match-lambda + ((id _ _) + (unless (hash-ref running-ids id) + (process-job id)))) + job-args)) + (wait-condition-variable condvar + mtx + (+ (time-second (current-time)) + 10))))))))) + (map (match-lambda ((event-name . handler) (let ((mtx (make-mutex)) (condvar (make-condition-variable))) - (call-with-new-thread - (lambda () - (lock-mutex mtx) - (while #t - (with-exception-handler - (lambda (exn) - (with-exception-handler - (lambda _ - ;; Things are really going wrong if logging about - ;; the hook processing thread crashing, also raises - ;; an exception, so just try and sleep and hope - ;; things go better next time - (sleep 10)) - (lambda () - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "hook processing thread " event-name - " exception: " exn)) - #:unwind? #t)) - (lambda () - (while #t - (match (datastore-list-unprocessed-hook-events datastore event-name 1) - (() - (wait-condition-variable condvar - mtx - (+ (time-second (current-time)) - wait-timeout-seconds))) - (((id event arguments)) - (process-event id event arguments handler))))) - #:unwind? #t) - (sleep 10)))) + (or (and=> + (assq-ref parallel-hooks event-name) + (lambda (thread-count) + (work-queue-process-events mtx + condvar + event-name + handler + thread-count))) + (single-thread-process-events mtx + condvar + event-name + handler)) + (cons event-name condvar)))) (build-coordinator-hooks build-coordinator))) diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm index 4aa2981..156e637 100644 --- a/guix-build-coordinator/datastore.scm +++ b/guix-build-coordinator/datastore.scm @@ -66,6 +66,7 @@ (re-export datastore-insert-unprocessed-hook-event) (re-export datastore-count-unprocessed-hook-events) (re-export datastore-list-unprocessed-hook-events) +(re-export datastore-find-unprocessed-hook-event) (re-export datastore-delete-unprocessed-hook-event) (re-export datastore-list-agent-builds) (re-export datastore-find-derivation) diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index bbd9c28..a6c93f3 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -80,6 +80,7 @@ datastore-insert-unprocessed-hook-event datastore-count-unprocessed-hook-events datastore-list-unprocessed-hook-events + datastore-find-unprocessed-hook-event datastore-delete-unprocessed-hook-event datastore-list-agent-builds datastore-agent-for-build @@ -2480,6 +2481,37 @@ LIMIT :limit" events))))) +(define-method (datastore-find-unprocessed-hook-event + (datastore <sqlite-datastore>) + id) + (call-with-worker-thread + (slot-ref datastore 'worker-reader-thread-channel) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT event, arguments +FROM unprocessed_hook_events +WHERE id = :id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:id id) + + (let ((result + (match (sqlite-step statement) + (#f #f) + (#(event arguments) + (list (string->symbol event) + (call-with-input-string arguments + (lambda (port) + (read port)))))))) + (sqlite-reset statement) + + result))))) + (define-method (datastore-delete-unprocessed-hook-event (datastore <sqlite-datastore>) id) diff --git a/scripts/guix-build-coordinator.in b/scripts/guix-build-coordinator.in index 1fbcad3..c9e7ee9 100644 --- a/scripts/guix-build-coordinator.in +++ b/scripts/guix-build-coordinator.in @@ -327,14 +327,24 @@ arg) (exit 1))) result)))) - (map (lambda (hook) - (option (list (simple-format #f "~A-hook" hook)) #t #f - (lambda (opt name arg result) - (alist-cons (symbol-append hook '-hook) - (read/eval arg) - (alist-delete (symbol-append hook '-hook) - result))))) - %known-hooks))) + (append-map + (lambda (hook) + (list + (option (list (simple-format #f "~A-hook" hook)) #t #f + (lambda (opt name arg result) + (alist-cons (symbol-append hook '-hook) + (read/eval arg) + (alist-delete (symbol-append hook '-hook) + result)))) + (option (list (simple-format #f "~A-hook-threads" hook)) #t #f + (lambda (opt name arg result) + (alist-cons 'parallel-hooks + `((,hook . ,(string->number arg)) + ,@(or (assq-ref result 'parallel-hooks) + '())) + (alist-delete 'parallel-hooks + result)))))) + %known-hooks))) (define %service-option-defaults ;; Alist of default option values @@ -851,4 +861,5 @@ tags: #:update-datastore? (assoc-ref opts 'update-database) #:pid-file (assq-ref opts 'pid-file) #:agent-communication-uri (assq-ref opts 'agent-communication) - #:client-communication-uri (assq-ref opts 'client-communication))))))) + #:client-communication-uri (assq-ref opts 'client-communication) + #:parallel-hooks (assq-ref opts 'parallel-hooks))))))) |