diff options
-rw-r--r-- | guix-build-coordinator/coordinator.scm | 141 | ||||
-rw-r--r-- | guix-build-coordinator/datastore.scm | 1 | ||||
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 32 | ||||
-rw-r--r-- | scripts/guix-build-coordinator.in | 29 |
4 files changed, 158 insertions, 45 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 08eecf3..800ae1a 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -21,6 +21,7 @@ (define-module (guix-build-coordinator coordinator) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) + #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) #:use-module (ice-9 ftw) @@ -169,7 +170,8 @@ #:key (update-datastore? #t) (pid-file #f) - (trigger-build-allocation? #t)) + (trigger-build-allocation? #t) + (parallel-hooks '())) (when update-datastore? (datastore-update (build-coordinator-datastore build-coordinator))) @@ -184,7 +186,8 @@ (set-build-coordinator-hook-condvars! build-coordinator - (start-hook-processing-threads build-coordinator)) + (start-hook-processing-threads build-coordinator + parallel-hooks)) (when trigger-build-allocation? (trigger-build-allocation build-coordinator))) @@ -198,11 +201,13 @@ (pid-file #f) (agent-communication-uri %default-agent-uri) (client-communication-uri %default-client-uri) - secret-key-base) + secret-key-base + (parallel-hooks '())) (perform-coordinator-service-startup build-coordinator #:update-datastore? update-datastore? - #:pid-file pid-file) + #:pid-file pid-file + #:parallel-hooks parallel-hooks) ;; Create some worker thread channels, which need to be created prior ;; to run-fibers being called. @@ -674,7 +679,7 @@ #:unwind? #t))) #:parallel? #t)) -(define (start-hook-processing-threads build-coordinator) +(define (start-hook-processing-threads build-coordinator parallel-hooks) (define wait-timeout-seconds (* 60 5)) (define datastore @@ -734,42 +739,106 @@ #:label-values `((event . ,event)))))) + (define (single-thread-process-events mtx condvar event-name handler) + (call-with-new-thread + (lambda () + (lock-mutex mtx) + (while #t + (with-exception-handler + (lambda (exn) + (with-exception-handler + (lambda _ + ;; Things are really going wrong if logging about + ;; the hook processing thread crashing, also raises + ;; an exception, so just try and sleep and hope + ;; things go better next time + (sleep 10)) + (lambda () + (log-msg (build-coordinator-logger build-coordinator) + 'CRITICAL + "hook processing thread " event-name + " exception: " exn)) + #:unwind? #t)) + (lambda () + (while #t + (match (datastore-list-unprocessed-hook-events datastore event-name 1) + (() + (wait-condition-variable condvar + mtx + (+ (time-second (current-time)) + wait-timeout-seconds))) + (((id event arguments)) + (process-event id event arguments handler))))) + #:unwind? #t) + (sleep 10))))) + + (define (work-queue-process-events mtx condvar event-name handler thread-count) + (let-values (((process-job count-jobs + count-threads + list-jobs) + (create-work-queue + thread-count + (lambda (id) + (match (datastore-find-unprocessed-hook-event + datastore + id) + (#f #f) ; already processed + ((event arguments) + (process-event id event arguments handler))))))) + + (call-with-new-thread + (lambda () + (lock-mutex mtx) + (while #t + (match (datastore-list-unprocessed-hook-events + datastore + event-name + 100000) + (() + (wait-condition-variable condvar + mtx + (+ (time-second (current-time)) + 10))) + (job-args + (let* ((jobs + (list-jobs)) + (running-ids + (make-hash-table (length jobs)))) + (for-each (match-lambda + ((id _ _) + (hash-set! running-ids + id + #t))) + jobs) + + (for-each (match-lambda + ((id _ _) + (unless (hash-ref running-ids id) + (process-job id)))) + job-args)) + (wait-condition-variable condvar + mtx + (+ (time-second (current-time)) + 10))))))))) + (map (match-lambda ((event-name . handler) (let ((mtx (make-mutex)) (condvar (make-condition-variable))) - (call-with-new-thread - (lambda () - (lock-mutex mtx) - (while #t - (with-exception-handler - (lambda (exn) - (with-exception-handler - (lambda _ - ;; Things are really going wrong if logging about - ;; the hook processing thread crashing, also raises - ;; an exception, so just try and sleep and hope - ;; things go better next time - (sleep 10)) - (lambda () - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "hook processing thread " event-name - " exception: " exn)) - #:unwind? #t)) - (lambda () - (while #t - (match (datastore-list-unprocessed-hook-events datastore event-name 1) - (() - (wait-condition-variable condvar - mtx - (+ (time-second (current-time)) - wait-timeout-seconds))) - (((id event arguments)) - (process-event id event arguments handler))))) - #:unwind? #t) - (sleep 10)))) + (or (and=> + (assq-ref parallel-hooks event-name) + (lambda (thread-count) + (work-queue-process-events mtx + condvar + event-name + handler + thread-count))) + (single-thread-process-events mtx + condvar + event-name + handler)) + (cons event-name condvar)))) (build-coordinator-hooks build-coordinator))) diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm index 4aa2981..156e637 100644 --- a/guix-build-coordinator/datastore.scm +++ b/guix-build-coordinator/datastore.scm @@ -66,6 +66,7 @@ (re-export datastore-insert-unprocessed-hook-event) (re-export datastore-count-unprocessed-hook-events) (re-export datastore-list-unprocessed-hook-events) +(re-export datastore-find-unprocessed-hook-event) (re-export datastore-delete-unprocessed-hook-event) (re-export datastore-list-agent-builds) (re-export datastore-find-derivation) diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index bbd9c28..a6c93f3 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -80,6 +80,7 @@ datastore-insert-unprocessed-hook-event datastore-count-unprocessed-hook-events datastore-list-unprocessed-hook-events + datastore-find-unprocessed-hook-event datastore-delete-unprocessed-hook-event datastore-list-agent-builds datastore-agent-for-build @@ -2480,6 +2481,37 @@ LIMIT :limit" events))))) +(define-method (datastore-find-unprocessed-hook-event + (datastore <sqlite-datastore>) + id) + (call-with-worker-thread + (slot-ref datastore 'worker-reader-thread-channel) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT event, arguments +FROM unprocessed_hook_events +WHERE id = :id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:id id) + + (let ((result + (match (sqlite-step statement) + (#f #f) + (#(event arguments) + (list (string->symbol event) + (call-with-input-string arguments + (lambda (port) + (read port)))))))) + (sqlite-reset statement) + + result))))) + (define-method (datastore-delete-unprocessed-hook-event (datastore <sqlite-datastore>) id) diff --git a/scripts/guix-build-coordinator.in b/scripts/guix-build-coordinator.in index 1fbcad3..c9e7ee9 100644 --- a/scripts/guix-build-coordinator.in +++ b/scripts/guix-build-coordinator.in @@ -327,14 +327,24 @@ arg) (exit 1))) result)))) - (map (lambda (hook) - (option (list (simple-format #f "~A-hook" hook)) #t #f - (lambda (opt name arg result) - (alist-cons (symbol-append hook '-hook) - (read/eval arg) - (alist-delete (symbol-append hook '-hook) - result))))) - %known-hooks))) + (append-map + (lambda (hook) + (list + (option (list (simple-format #f "~A-hook" hook)) #t #f + (lambda (opt name arg result) + (alist-cons (symbol-append hook '-hook) + (read/eval arg) + (alist-delete (symbol-append hook '-hook) + result)))) + (option (list (simple-format #f "~A-hook-threads" hook)) #t #f + (lambda (opt name arg result) + (alist-cons 'parallel-hooks + `((,hook . ,(string->number arg)) + ,@(or (assq-ref result 'parallel-hooks) + '())) + (alist-delete 'parallel-hooks + result)))))) + %known-hooks))) (define %service-option-defaults ;; Alist of default option values @@ -851,4 +861,5 @@ tags: #:update-datastore? (assoc-ref opts 'update-database) #:pid-file (assq-ref opts 'pid-file) #:agent-communication-uri (assq-ref opts 'agent-communication) - #:client-communication-uri (assq-ref opts 'client-communication))))))) + #:client-communication-uri (assq-ref opts 'client-communication) + #:parallel-hooks (assq-ref opts 'parallel-hooks))))))) |