aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2022-06-30 18:47:10 +0100
committerChristopher Baines <mail@cbaines.net>2022-06-30 20:50:19 +0100
commit23504e0f01dc1eae05b307e313ba70faaad84be8 (patch)
tree0831e28cd26a1bcb46da02580305901154b21471
parent70c83f81db4a6dd9ad94d4ec2b3b3be62bd0f467 (diff)
downloadbuild-coordinator-23504e0f01dc1eae05b307e313ba70faaad84be8.tar
build-coordinator-23504e0f01dc1eae05b307e313ba70faaad84be8.tar.gz
Support processing hook events in parallel
Forcing hooks to be sequential simplifies them, and the implementation, but it doesn't always scale well. I'm particularly thinking about the build-submitted hook and built-success hooks, the processing of which can back up if there's lots of builds being submitted or finishing successfully. This new functionality allows hooks to be processed in parallel, which should allow to manage this more effectively.
-rw-r--r--guix-build-coordinator/coordinator.scm141
-rw-r--r--guix-build-coordinator/datastore.scm1
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm32
-rw-r--r--scripts/guix-build-coordinator.in29
4 files changed, 158 insertions, 45 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index 08eecf3..800ae1a 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -21,6 +21,7 @@
(define-module (guix-build-coordinator coordinator)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
+ #:use-module (srfi srfi-11)
#:use-module (srfi srfi-19)
#:use-module (srfi srfi-26)
#:use-module (ice-9 ftw)
@@ -169,7 +170,8 @@
#:key
(update-datastore? #t)
(pid-file #f)
- (trigger-build-allocation? #t))
+ (trigger-build-allocation? #t)
+ (parallel-hooks '()))
(when update-datastore?
(datastore-update (build-coordinator-datastore build-coordinator)))
@@ -184,7 +186,8 @@
(set-build-coordinator-hook-condvars!
build-coordinator
- (start-hook-processing-threads build-coordinator))
+ (start-hook-processing-threads build-coordinator
+ parallel-hooks))
(when trigger-build-allocation?
(trigger-build-allocation build-coordinator)))
@@ -198,11 +201,13 @@
(pid-file #f)
(agent-communication-uri %default-agent-uri)
(client-communication-uri %default-client-uri)
- secret-key-base)
+ secret-key-base
+ (parallel-hooks '()))
(perform-coordinator-service-startup
build-coordinator
#:update-datastore? update-datastore?
- #:pid-file pid-file)
+ #:pid-file pid-file
+ #:parallel-hooks parallel-hooks)
;; Create some worker thread channels, which need to be created prior
;; to run-fibers being called.
@@ -674,7 +679,7 @@
#:unwind? #t)))
#:parallel? #t))
-(define (start-hook-processing-threads build-coordinator)
+(define (start-hook-processing-threads build-coordinator parallel-hooks)
(define wait-timeout-seconds (* 60 5))
(define datastore
@@ -734,42 +739,106 @@
#:label-values
`((event . ,event))))))
+ (define (single-thread-process-events mtx condvar event-name handler)
+ (call-with-new-thread
+ (lambda ()
+ (lock-mutex mtx)
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (with-exception-handler
+ (lambda _
+ ;; Things are really going wrong if logging about
+ ;; the hook processing thread crashing, also raises
+ ;; an exception, so just try and sleep and hope
+ ;; things go better next time
+ (sleep 10))
+ (lambda ()
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "hook processing thread " event-name
+ " exception: " exn))
+ #:unwind? #t))
+ (lambda ()
+ (while #t
+ (match (datastore-list-unprocessed-hook-events datastore event-name 1)
+ (()
+ (wait-condition-variable condvar
+ mtx
+ (+ (time-second (current-time))
+ wait-timeout-seconds)))
+ (((id event arguments))
+ (process-event id event arguments handler)))))
+ #:unwind? #t)
+ (sleep 10)))))
+
+ (define (work-queue-process-events mtx condvar event-name handler thread-count)
+ (let-values (((process-job count-jobs
+ count-threads
+ list-jobs)
+ (create-work-queue
+ thread-count
+ (lambda (id)
+ (match (datastore-find-unprocessed-hook-event
+ datastore
+ id)
+ (#f #f) ; already processed
+ ((event arguments)
+ (process-event id event arguments handler)))))))
+
+ (call-with-new-thread
+ (lambda ()
+ (lock-mutex mtx)
+ (while #t
+ (match (datastore-list-unprocessed-hook-events
+ datastore
+ event-name
+ 100000)
+ (()
+ (wait-condition-variable condvar
+ mtx
+ (+ (time-second (current-time))
+ 10)))
+ (job-args
+ (let* ((jobs
+ (list-jobs))
+ (running-ids
+ (make-hash-table (length jobs))))
+ (for-each (match-lambda
+ ((id _ _)
+ (hash-set! running-ids
+ id
+ #t)))
+ jobs)
+
+ (for-each (match-lambda
+ ((id _ _)
+ (unless (hash-ref running-ids id)
+ (process-job id))))
+ job-args))
+ (wait-condition-variable condvar
+ mtx
+ (+ (time-second (current-time))
+ 10)))))))))
+
(map
(match-lambda
((event-name . handler)
(let ((mtx (make-mutex))
(condvar (make-condition-variable)))
- (call-with-new-thread
- (lambda ()
- (lock-mutex mtx)
- (while #t
- (with-exception-handler
- (lambda (exn)
- (with-exception-handler
- (lambda _
- ;; Things are really going wrong if logging about
- ;; the hook processing thread crashing, also raises
- ;; an exception, so just try and sleep and hope
- ;; things go better next time
- (sleep 10))
- (lambda ()
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "hook processing thread " event-name
- " exception: " exn))
- #:unwind? #t))
- (lambda ()
- (while #t
- (match (datastore-list-unprocessed-hook-events datastore event-name 1)
- (()
- (wait-condition-variable condvar
- mtx
- (+ (time-second (current-time))
- wait-timeout-seconds)))
- (((id event arguments))
- (process-event id event arguments handler)))))
- #:unwind? #t)
- (sleep 10))))
+ (or (and=>
+ (assq-ref parallel-hooks event-name)
+ (lambda (thread-count)
+ (work-queue-process-events mtx
+ condvar
+ event-name
+ handler
+ thread-count)))
+ (single-thread-process-events mtx
+ condvar
+ event-name
+ handler))
+
(cons event-name condvar))))
(build-coordinator-hooks build-coordinator)))
diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm
index 4aa2981..156e637 100644
--- a/guix-build-coordinator/datastore.scm
+++ b/guix-build-coordinator/datastore.scm
@@ -66,6 +66,7 @@
(re-export datastore-insert-unprocessed-hook-event)
(re-export datastore-count-unprocessed-hook-events)
(re-export datastore-list-unprocessed-hook-events)
+(re-export datastore-find-unprocessed-hook-event)
(re-export datastore-delete-unprocessed-hook-event)
(re-export datastore-list-agent-builds)
(re-export datastore-find-derivation)
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index bbd9c28..a6c93f3 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -80,6 +80,7 @@
datastore-insert-unprocessed-hook-event
datastore-count-unprocessed-hook-events
datastore-list-unprocessed-hook-events
+ datastore-find-unprocessed-hook-event
datastore-delete-unprocessed-hook-event
datastore-list-agent-builds
datastore-agent-for-build
@@ -2480,6 +2481,37 @@ LIMIT :limit"
events)))))
+(define-method (datastore-find-unprocessed-hook-event
+ (datastore <sqlite-datastore>)
+ id)
+ (call-with-worker-thread
+ (slot-ref datastore 'worker-reader-thread-channel)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT event, arguments
+FROM unprocessed_hook_events
+WHERE id = :id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:id id)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#f #f)
+ (#(event arguments)
+ (list (string->symbol event)
+ (call-with-input-string arguments
+ (lambda (port)
+ (read port))))))))
+ (sqlite-reset statement)
+
+ result)))))
+
(define-method (datastore-delete-unprocessed-hook-event
(datastore <sqlite-datastore>)
id)
diff --git a/scripts/guix-build-coordinator.in b/scripts/guix-build-coordinator.in
index 1fbcad3..c9e7ee9 100644
--- a/scripts/guix-build-coordinator.in
+++ b/scripts/guix-build-coordinator.in
@@ -327,14 +327,24 @@
arg)
(exit 1)))
result))))
- (map (lambda (hook)
- (option (list (simple-format #f "~A-hook" hook)) #t #f
- (lambda (opt name arg result)
- (alist-cons (symbol-append hook '-hook)
- (read/eval arg)
- (alist-delete (symbol-append hook '-hook)
- result)))))
- %known-hooks)))
+ (append-map
+ (lambda (hook)
+ (list
+ (option (list (simple-format #f "~A-hook" hook)) #t #f
+ (lambda (opt name arg result)
+ (alist-cons (symbol-append hook '-hook)
+ (read/eval arg)
+ (alist-delete (symbol-append hook '-hook)
+ result))))
+ (option (list (simple-format #f "~A-hook-threads" hook)) #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'parallel-hooks
+ `((,hook . ,(string->number arg))
+ ,@(or (assq-ref result 'parallel-hooks)
+ '()))
+ (alist-delete 'parallel-hooks
+ result))))))
+ %known-hooks)))
(define %service-option-defaults
;; Alist of default option values
@@ -851,4 +861,5 @@ tags:
#:update-datastore? (assoc-ref opts 'update-database)
#:pid-file (assq-ref opts 'pid-file)
#:agent-communication-uri (assq-ref opts 'agent-communication)
- #:client-communication-uri (assq-ref opts 'client-communication)))))))
+ #:client-communication-uri (assq-ref opts 'client-communication)
+ #:parallel-hooks (assq-ref opts 'parallel-hooks)))))))