diff options
author | Christopher Baines <mail@cbaines.net> | 2020-05-17 22:05:48 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2020-05-17 22:05:48 +0100 |
commit | ff0d12fb5cbedd09b368815b6bd433104e145454 (patch) | |
tree | f5f7154dcc4b9b1d1fcea58b08b2008e68a039e5 | |
parent | 137d39440e78454cef83dd4be5701290d00e771e (diff) | |
download | build-coordinator-ff0d12fb5cbedd09b368815b6bd433104e145454.tar build-coordinator-ff0d12fb5cbedd09b368815b6bd433104e145454.tar.gz |
Support agents processing builds in parallel
-rw-r--r-- | guix-build-coordinator/agent-messaging/http.scm | 8 | ||||
-rw-r--r-- | guix-build-coordinator/agent.scm | 167 | ||||
-rw-r--r-- | guix-build-coordinator/coordinator.scm | 4 | ||||
-rw-r--r-- | scripts/guix-build-coordinator-agent.in | 9 |
4 files changed, 133 insertions, 55 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm index d22cf5c..7e1b540 100644 --- a/guix-build-coordinator/agent-messaging/http.scm +++ b/guix-build-coordinator/agent-messaging/http.scm @@ -259,7 +259,9 @@ port. Also, the port used can be changed by passing the --port option.\n" #:code 403))) (('POST "agent" uuid "fetch-builds") (if (authenticated? uuid request) - (let ((builds (fetch-builds build-coordinator uuid))) + (let* ((json-body (json-string->scm (utf8->string body))) + (count (assoc-ref json-body "count")) + (builds (fetch-builds build-coordinator uuid count))) (render-json `((builds . ,(list->vector builds))))) (render-json @@ -623,10 +625,12 @@ port. Also, the port used can be changed by passing the --port option.\n" #:body report #:succeed-on-access-denied-retry? #t)) -(define (fetch-builds-for-agent coordinator-uri agent-uuid password) +(define* (fetch-builds-for-agent coordinator-uri agent-uuid password + #:key (count 1)) (vector->list (assoc-ref (coordinator-http-request coordinator-uri agent-uuid password (string-append "/agent/" agent-uuid "/fetch-builds") + #:body `((count . ,count)) #:method 'POST) "builds"))) diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index 65f2400..92104f2 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -32,59 +32,126 @@ #:use-module (guix-build-coordinator agent-messaging http) #:export (run-agent)) +(define (process-jobs-concurrently fetch-new-jobs + process-job + max-processes) + (define processes + (make-hash-table)) + + (define (wait-on-processes) + (catch + #t + (lambda () + (match (waitpid WAIT_ANY WNOHANG) + ((0 . status) + ;; No process to wait for + #f) + ((pid . status) + (hashv-remove! processes pid) + (simple-format (current-error-port) + "pid ~A failed with status ~A\n" + pid status) + + ;; Recurse, to check for other finished processes. + (wait-on-processes) + #t))) + (lambda (key . args) + (simple-format #t "wait-on-processes: ~A: ~A\n" + key args) + #f))) + + (define (fork-and-process-job job-args) + (match (primitive-fork) + (0 + (dynamic-wind + (const #t) + (lambda () + (apply process-job job-args)) + (lambda () + (primitive-exit 127)))) + (pid + (hashv-set! processes pid + (list (current-time) job-args)) + #t))) + + (while #t + (unless (wait-on-processes) + (unless (eq? 0 (sleep 5)) + (exit 0))) + (let ((current-processes (hash-count (const #t) processes))) + (when (< current-processes max-processes) + (match (fetch-new-jobs (- max-processes current-processes)) + (() + ;; Nothing to do + #f) + ((jobs ...) + (for-each + (lambda (job-args) + (fork-and-process-job (list job-args))) + jobs))))))) + (define (run-agent uuid coordinator-uri password + max-parallel-builds derivation-substitute-urls non-derivation-substitute-urls) - (while #t - (let* ((details (submit-status coordinator-uri uuid password - 'idle)) - (builds (let ((already-allocated-builds - (vector->list - (assoc-ref details "builds")))) - (if (null? already-allocated-builds) - (fetch-builds-for-agent - coordinator-uri uuid password) - already-allocated-builds)))) - (simple-format #t "received allocated builds (~A)\n" - (length builds)) - (for-each (lambda (build) - (simple-format #t "processing build: ~A\n" - (assoc-ref build "uuid")) - (let ((derivation-name (assoc-ref build "derivation-name"))) - (simple-format #t "setting up to build: ~A\n" - derivation-name) - (let ((pre-build-status (pre-build-process - derivation-substitute-urls - non-derivation-substitute-urls - derivation-name))) - (if (eq? (assq-ref pre-build-status 'result) 'success) - (begin - (simple-format #t "setup successful, building: ~A\n" - derivation-name) - (let ((result (perform-build derivation-name))) - (and=> (derivation-log-file derivation-name) - (lambda (log-file) - (simple-format #t "uploading log file ~A\n" - log-file) - (submit-log-file - coordinator-uri uuid password - (assoc-ref build "uuid") - log-file))) - ((if result - post-build-success - post-build-failure) - uuid coordinator-uri password - (assoc-ref build "uuid") - derivation-name))) - (begin - (simple-format #t "failure: ~A\n" pre-build-status) - (report-setup-failure coordinator-uri uuid password - (assoc-ref build "uuid") - pre-build-status)))))) - builds) - - (when (null? builds) - (sleep 5))))) + (let ((initial-builds #f)) + + (define (fetch-new-jobs count) + (if initial-builds + (let ((builds initial-builds)) + (set! initial-builds #f) + builds) + (let ((received-builds + (fetch-builds-for-agent coordinator-uri uuid password + #:count count))) + (simple-format #t "requested ~A builds, received ~A\n" + count (length received-builds)) + received-builds))) + + (define (process-job build) + (simple-format #t "processing build: ~A\n" + (assoc-ref build "uuid")) + (let ((derivation-name (assoc-ref build "derivation-name"))) + (simple-format #t "setting up to build: ~A\n" + derivation-name) + (let ((pre-build-status (pre-build-process + derivation-substitute-urls + non-derivation-substitute-urls + derivation-name))) + (if (eq? (assq-ref pre-build-status 'result) 'success) + (begin + (simple-format #t "setup successful, building: ~A\n" + derivation-name) + (let ((result (perform-build derivation-name))) + (and=> (derivation-log-file derivation-name) + (lambda (log-file) + (simple-format #t "uploading log file ~A\n" + log-file) + (submit-log-file + coordinator-uri uuid password + (assoc-ref build "uuid") + log-file))) + ((if result + post-build-success + post-build-failure) + uuid coordinator-uri password + (assoc-ref build "uuid") + derivation-name))) + (begin + (simple-format #t "failure: ~A\n" pre-build-status) + (report-setup-failure coordinator-uri uuid password + (assoc-ref build "uuid") + pre-build-status))))) + (exit 0)) + + (let ((details (submit-status coordinator-uri uuid password 'idle))) + (let ((builds (vector->list (assoc-ref details "builds")))) + (unless (null? builds) + (set! initial-builds builds)))) + + (process-jobs-concurrently fetch-new-jobs + process-job + max-parallel-builds))) (define (pre-build-process derivation-substitute-urls non-derivation-substitute-urls diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 0eb8bd0..9815565 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -268,7 +268,7 @@ #f)))))))) #t) -(define (fetch-builds build-coordinator agent) +(define (fetch-builds build-coordinator agent count) (call-with-duration-metric (build-coordinator-metrics-registry build-coordinator) "guixbuildcoordinator_coordinator_fetch_builds_duration_seconds" @@ -276,7 +276,7 @@ (let ((builds (datastore-list-allocation-plan-builds (build-coordinator-datastore build-coordinator) agent - 1))) + count))) (unless (null? builds) (datastore-allocate-builds-to-agent (build-coordinator-datastore build-coordinator) diff --git a/scripts/guix-build-coordinator-agent.in b/scripts/guix-build-coordinator-agent.in index d75b66c..5c8549e 100644 --- a/scripts/guix-build-coordinator-agent.in +++ b/scripts/guix-build-coordinator-agent.in @@ -43,6 +43,11 @@ (alist-cons 'password arg result))) + (option '("max-parallel-builds") #t #f + (lambda (opt name arg result) + (alist-cons 'max-parallel-builds + (string->number arg) + result))) (option '("substitute-urls") #t #f (lambda (opt name arg result) (alist-cons 'substitute-urls @@ -61,7 +66,8 @@ (define %option-defaults ;; Alist of default option values - `((coordinator . "http://localhost:8745"))) + `((coordinator . "http://localhost:8745") + (max-parallel-builds . 1))) (define (parse-options options defaults args) (args-fold @@ -81,6 +87,7 @@ (run-agent (assq-ref opts 'uuid) (assq-ref opts 'coordinator) (assq-ref opts 'password) + (assq-ref opts 'max-parallel-builds) (or (assq-ref opts 'derivation-substitute-urls) (assq-ref opts 'substitute-urls)) (or (assq-ref opts 'non-derivation-substitute-urls) |