aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-build-coordinator/coordinator.scm141
-rw-r--r--guix-build-coordinator/datastore.scm1
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm32
-rw-r--r--scripts/guix-build-coordinator.in29
4 files changed, 158 insertions, 45 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index 08eecf3..800ae1a 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -21,6 +21,7 @@
(define-module (guix-build-coordinator coordinator)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
+ #:use-module (srfi srfi-11)
#:use-module (srfi srfi-19)
#:use-module (srfi srfi-26)
#:use-module (ice-9 ftw)
@@ -169,7 +170,8 @@
#:key
(update-datastore? #t)
(pid-file #f)
- (trigger-build-allocation? #t))
+ (trigger-build-allocation? #t)
+ (parallel-hooks '()))
(when update-datastore?
(datastore-update (build-coordinator-datastore build-coordinator)))
@@ -184,7 +186,8 @@
(set-build-coordinator-hook-condvars!
build-coordinator
- (start-hook-processing-threads build-coordinator))
+ (start-hook-processing-threads build-coordinator
+ parallel-hooks))
(when trigger-build-allocation?
(trigger-build-allocation build-coordinator)))
@@ -198,11 +201,13 @@
(pid-file #f)
(agent-communication-uri %default-agent-uri)
(client-communication-uri %default-client-uri)
- secret-key-base)
+ secret-key-base
+ (parallel-hooks '()))
(perform-coordinator-service-startup
build-coordinator
#:update-datastore? update-datastore?
- #:pid-file pid-file)
+ #:pid-file pid-file
+ #:parallel-hooks parallel-hooks)
;; Create some worker thread channels, which need to be created prior
;; to run-fibers being called.
@@ -674,7 +679,7 @@
#:unwind? #t)))
#:parallel? #t))
-(define (start-hook-processing-threads build-coordinator)
+(define (start-hook-processing-threads build-coordinator parallel-hooks)
(define wait-timeout-seconds (* 60 5))
(define datastore
@@ -734,42 +739,106 @@
#:label-values
`((event . ,event))))))
+ (define (single-thread-process-events mtx condvar event-name handler)
+ (call-with-new-thread
+ (lambda ()
+ (lock-mutex mtx)
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (with-exception-handler
+ (lambda _
+ ;; Things are really going wrong if logging about
+ ;; the hook processing thread crashing, also raises
+ ;; an exception, so just try and sleep and hope
+ ;; things go better next time
+ (sleep 10))
+ (lambda ()
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "hook processing thread " event-name
+ " exception: " exn))
+ #:unwind? #t))
+ (lambda ()
+ (while #t
+ (match (datastore-list-unprocessed-hook-events datastore event-name 1)
+ (()
+ (wait-condition-variable condvar
+ mtx
+ (+ (time-second (current-time))
+ wait-timeout-seconds)))
+ (((id event arguments))
+ (process-event id event arguments handler)))))
+ #:unwind? #t)
+ (sleep 10)))))
+
+ (define (work-queue-process-events mtx condvar event-name handler thread-count)
+ (let-values (((process-job count-jobs
+ count-threads
+ list-jobs)
+ (create-work-queue
+ thread-count
+ (lambda (id)
+ (match (datastore-find-unprocessed-hook-event
+ datastore
+ id)
+ (#f #f) ; already processed
+ ((event arguments)
+ (process-event id event arguments handler)))))))
+
+ (call-with-new-thread
+ (lambda ()
+ (lock-mutex mtx)
+ (while #t
+ (match (datastore-list-unprocessed-hook-events
+ datastore
+ event-name
+ 100000)
+ (()
+ (wait-condition-variable condvar
+ mtx
+ (+ (time-second (current-time))
+ 10)))
+ (job-args
+ (let* ((jobs
+ (list-jobs))
+ (running-ids
+ (make-hash-table (length jobs))))
+ (for-each (match-lambda
+ ((id _ _)
+ (hash-set! running-ids
+ id
+ #t)))
+ jobs)
+
+ (for-each (match-lambda
+ ((id _ _)
+ (unless (hash-ref running-ids id)
+ (process-job id))))
+ job-args))
+ (wait-condition-variable condvar
+ mtx
+ (+ (time-second (current-time))
+ 10)))))))))
+
(map
(match-lambda
((event-name . handler)
(let ((mtx (make-mutex))
(condvar (make-condition-variable)))
- (call-with-new-thread
- (lambda ()
- (lock-mutex mtx)
- (while #t
- (with-exception-handler
- (lambda (exn)
- (with-exception-handler
- (lambda _
- ;; Things are really going wrong if logging about
- ;; the hook processing thread crashing, also raises
- ;; an exception, so just try and sleep and hope
- ;; things go better next time
- (sleep 10))
- (lambda ()
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "hook processing thread " event-name
- " exception: " exn))
- #:unwind? #t))
- (lambda ()
- (while #t
- (match (datastore-list-unprocessed-hook-events datastore event-name 1)
- (()
- (wait-condition-variable condvar
- mtx
- (+ (time-second (current-time))
- wait-timeout-seconds)))
- (((id event arguments))
- (process-event id event arguments handler)))))
- #:unwind? #t)
- (sleep 10))))
+ (or (and=>
+ (assq-ref parallel-hooks event-name)
+ (lambda (thread-count)
+ (work-queue-process-events mtx
+ condvar
+ event-name
+ handler
+ thread-count)))
+ (single-thread-process-events mtx
+ condvar
+ event-name
+ handler))
+
(cons event-name condvar))))
(build-coordinator-hooks build-coordinator)))
diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm
index 4aa2981..156e637 100644
--- a/guix-build-coordinator/datastore.scm
+++ b/guix-build-coordinator/datastore.scm
@@ -66,6 +66,7 @@
(re-export datastore-insert-unprocessed-hook-event)
(re-export datastore-count-unprocessed-hook-events)
(re-export datastore-list-unprocessed-hook-events)
+(re-export datastore-find-unprocessed-hook-event)
(re-export datastore-delete-unprocessed-hook-event)
(re-export datastore-list-agent-builds)
(re-export datastore-find-derivation)
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index bbd9c28..a6c93f3 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -80,6 +80,7 @@
datastore-insert-unprocessed-hook-event
datastore-count-unprocessed-hook-events
datastore-list-unprocessed-hook-events
+ datastore-find-unprocessed-hook-event
datastore-delete-unprocessed-hook-event
datastore-list-agent-builds
datastore-agent-for-build
@@ -2480,6 +2481,37 @@ LIMIT :limit"
events)))))
+(define-method (datastore-find-unprocessed-hook-event
+ (datastore <sqlite-datastore>)
+ id)
+ (call-with-worker-thread
+ (slot-ref datastore 'worker-reader-thread-channel)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT event, arguments
+FROM unprocessed_hook_events
+WHERE id = :id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:id id)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#f #f)
+ (#(event arguments)
+ (list (string->symbol event)
+ (call-with-input-string arguments
+ (lambda (port)
+ (read port))))))))
+ (sqlite-reset statement)
+
+ result)))))
+
(define-method (datastore-delete-unprocessed-hook-event
(datastore <sqlite-datastore>)
id)
diff --git a/scripts/guix-build-coordinator.in b/scripts/guix-build-coordinator.in
index 1fbcad3..c9e7ee9 100644
--- a/scripts/guix-build-coordinator.in
+++ b/scripts/guix-build-coordinator.in
@@ -327,14 +327,24 @@
arg)
(exit 1)))
result))))
- (map (lambda (hook)
- (option (list (simple-format #f "~A-hook" hook)) #t #f
- (lambda (opt name arg result)
- (alist-cons (symbol-append hook '-hook)
- (read/eval arg)
- (alist-delete (symbol-append hook '-hook)
- result)))))
- %known-hooks)))
+ (append-map
+ (lambda (hook)
+ (list
+ (option (list (simple-format #f "~A-hook" hook)) #t #f
+ (lambda (opt name arg result)
+ (alist-cons (symbol-append hook '-hook)
+ (read/eval arg)
+ (alist-delete (symbol-append hook '-hook)
+ result))))
+ (option (list (simple-format #f "~A-hook-threads" hook)) #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'parallel-hooks
+ `((,hook . ,(string->number arg))
+ ,@(or (assq-ref result 'parallel-hooks)
+ '()))
+ (alist-delete 'parallel-hooks
+ result))))))
+ %known-hooks)))
(define %service-option-defaults
;; Alist of default option values
@@ -851,4 +861,5 @@ tags:
#:update-datastore? (assoc-ref opts 'update-database)
#:pid-file (assq-ref opts 'pid-file)
#:agent-communication-uri (assq-ref opts 'agent-communication)
- #:client-communication-uri (assq-ref opts 'client-communication)))))))
+ #:client-communication-uri (assq-ref opts 'client-communication)
+ #:parallel-hooks (assq-ref opts 'parallel-hooks)))))))