aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2020-05-17 22:05:48 +0100
committerChristopher Baines <mail@cbaines.net>2020-05-17 22:05:48 +0100
commitff0d12fb5cbedd09b368815b6bd433104e145454 (patch)
treef5f7154dcc4b9b1d1fcea58b08b2008e68a039e5
parent137d39440e78454cef83dd4be5701290d00e771e (diff)
downloadbuild-coordinator-ff0d12fb5cbedd09b368815b6bd433104e145454.tar
build-coordinator-ff0d12fb5cbedd09b368815b6bd433104e145454.tar.gz
Support agents processing builds in parallel
-rw-r--r--guix-build-coordinator/agent-messaging/http.scm8
-rw-r--r--guix-build-coordinator/agent.scm167
-rw-r--r--guix-build-coordinator/coordinator.scm4
-rw-r--r--scripts/guix-build-coordinator-agent.in9
4 files changed, 133 insertions, 55 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm
index d22cf5c..7e1b540 100644
--- a/guix-build-coordinator/agent-messaging/http.scm
+++ b/guix-build-coordinator/agent-messaging/http.scm
@@ -259,7 +259,9 @@ port. Also, the port used can be changed by passing the --port option.\n"
#:code 403)))
(('POST "agent" uuid "fetch-builds")
(if (authenticated? uuid request)
- (let ((builds (fetch-builds build-coordinator uuid)))
+ (let* ((json-body (json-string->scm (utf8->string body)))
+ (count (assoc-ref json-body "count"))
+ (builds (fetch-builds build-coordinator uuid count)))
(render-json
`((builds . ,(list->vector builds)))))
(render-json
@@ -623,10 +625,12 @@ port. Also, the port used can be changed by passing the --port option.\n"
#:body report
#:succeed-on-access-denied-retry? #t))
-(define (fetch-builds-for-agent coordinator-uri agent-uuid password)
+(define* (fetch-builds-for-agent coordinator-uri agent-uuid password
+ #:key (count 1))
(vector->list
(assoc-ref (coordinator-http-request
coordinator-uri agent-uuid password
(string-append "/agent/" agent-uuid "/fetch-builds")
+ #:body `((count . ,count))
#:method 'POST)
"builds")))
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index 65f2400..92104f2 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -32,59 +32,126 @@
#:use-module (guix-build-coordinator agent-messaging http)
#:export (run-agent))
+(define (process-jobs-concurrently fetch-new-jobs
+ process-job
+ max-processes)
+ (define processes
+ (make-hash-table))
+
+ (define (wait-on-processes)
+ (catch
+ #t
+ (lambda ()
+ (match (waitpid WAIT_ANY WNOHANG)
+ ((0 . status)
+ ;; No process to wait for
+ #f)
+ ((pid . status)
+ (hashv-remove! processes pid)
+ (simple-format (current-error-port)
+ "pid ~A failed with status ~A\n"
+ pid status)
+
+ ;; Recurse, to check for other finished processes.
+ (wait-on-processes)
+ #t)))
+ (lambda (key . args)
+ (simple-format #t "wait-on-processes: ~A: ~A\n"
+ key args)
+ #f)))
+
+ (define (fork-and-process-job job-args)
+ (match (primitive-fork)
+ (0
+ (dynamic-wind
+ (const #t)
+ (lambda ()
+ (apply process-job job-args))
+ (lambda ()
+ (primitive-exit 127))))
+ (pid
+ (hashv-set! processes pid
+ (list (current-time) job-args))
+ #t)))
+
+ (while #t
+ (unless (wait-on-processes)
+ (unless (eq? 0 (sleep 5))
+ (exit 0)))
+ (let ((current-processes (hash-count (const #t) processes)))
+ (when (< current-processes max-processes)
+ (match (fetch-new-jobs (- max-processes current-processes))
+ (()
+ ;; Nothing to do
+ #f)
+ ((jobs ...)
+ (for-each
+ (lambda (job-args)
+ (fork-and-process-job (list job-args)))
+ jobs)))))))
+
(define (run-agent uuid coordinator-uri password
+ max-parallel-builds
derivation-substitute-urls
non-derivation-substitute-urls)
- (while #t
- (let* ((details (submit-status coordinator-uri uuid password
- 'idle))
- (builds (let ((already-allocated-builds
- (vector->list
- (assoc-ref details "builds"))))
- (if (null? already-allocated-builds)
- (fetch-builds-for-agent
- coordinator-uri uuid password)
- already-allocated-builds))))
- (simple-format #t "received allocated builds (~A)\n"
- (length builds))
- (for-each (lambda (build)
- (simple-format #t "processing build: ~A\n"
- (assoc-ref build "uuid"))
- (let ((derivation-name (assoc-ref build "derivation-name")))
- (simple-format #t "setting up to build: ~A\n"
- derivation-name)
- (let ((pre-build-status (pre-build-process
- derivation-substitute-urls
- non-derivation-substitute-urls
- derivation-name)))
- (if (eq? (assq-ref pre-build-status 'result) 'success)
- (begin
- (simple-format #t "setup successful, building: ~A\n"
- derivation-name)
- (let ((result (perform-build derivation-name)))
- (and=> (derivation-log-file derivation-name)
- (lambda (log-file)
- (simple-format #t "uploading log file ~A\n"
- log-file)
- (submit-log-file
- coordinator-uri uuid password
- (assoc-ref build "uuid")
- log-file)))
- ((if result
- post-build-success
- post-build-failure)
- uuid coordinator-uri password
- (assoc-ref build "uuid")
- derivation-name)))
- (begin
- (simple-format #t "failure: ~A\n" pre-build-status)
- (report-setup-failure coordinator-uri uuid password
- (assoc-ref build "uuid")
- pre-build-status))))))
- builds)
-
- (when (null? builds)
- (sleep 5)))))
+ (let ((initial-builds #f))
+
+ (define (fetch-new-jobs count)
+ (if initial-builds
+ (let ((builds initial-builds))
+ (set! initial-builds #f)
+ builds)
+ (let ((received-builds
+ (fetch-builds-for-agent coordinator-uri uuid password
+ #:count count)))
+ (simple-format #t "requested ~A builds, received ~A\n"
+ count (length received-builds))
+ received-builds)))
+
+ (define (process-job build)
+ (simple-format #t "processing build: ~A\n"
+ (assoc-ref build "uuid"))
+ (let ((derivation-name (assoc-ref build "derivation-name")))
+ (simple-format #t "setting up to build: ~A\n"
+ derivation-name)
+ (let ((pre-build-status (pre-build-process
+ derivation-substitute-urls
+ non-derivation-substitute-urls
+ derivation-name)))
+ (if (eq? (assq-ref pre-build-status 'result) 'success)
+ (begin
+ (simple-format #t "setup successful, building: ~A\n"
+ derivation-name)
+ (let ((result (perform-build derivation-name)))
+ (and=> (derivation-log-file derivation-name)
+ (lambda (log-file)
+ (simple-format #t "uploading log file ~A\n"
+ log-file)
+ (submit-log-file
+ coordinator-uri uuid password
+ (assoc-ref build "uuid")
+ log-file)))
+ ((if result
+ post-build-success
+ post-build-failure)
+ uuid coordinator-uri password
+ (assoc-ref build "uuid")
+ derivation-name)))
+ (begin
+ (simple-format #t "failure: ~A\n" pre-build-status)
+ (report-setup-failure coordinator-uri uuid password
+ (assoc-ref build "uuid")
+ pre-build-status)))))
+ (exit 0))
+
+ (let ((details (submit-status coordinator-uri uuid password 'idle)))
+ (let ((builds (vector->list (assoc-ref details "builds"))))
+ (unless (null? builds)
+ (set! initial-builds builds))))
+
+ (process-jobs-concurrently fetch-new-jobs
+ process-job
+ max-parallel-builds)))
(define (pre-build-process derivation-substitute-urls
non-derivation-substitute-urls
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index 0eb8bd0..9815565 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -268,7 +268,7 @@
#f))))))))
#t)
-(define (fetch-builds build-coordinator agent)
+(define (fetch-builds build-coordinator agent count)
(call-with-duration-metric
(build-coordinator-metrics-registry build-coordinator)
"guixbuildcoordinator_coordinator_fetch_builds_duration_seconds"
@@ -276,7 +276,7 @@
(let ((builds (datastore-list-allocation-plan-builds
(build-coordinator-datastore build-coordinator)
agent
- 1)))
+ count)))
(unless (null? builds)
(datastore-allocate-builds-to-agent
(build-coordinator-datastore build-coordinator)
diff --git a/scripts/guix-build-coordinator-agent.in b/scripts/guix-build-coordinator-agent.in
index d75b66c..5c8549e 100644
--- a/scripts/guix-build-coordinator-agent.in
+++ b/scripts/guix-build-coordinator-agent.in
@@ -43,6 +43,11 @@
(alist-cons 'password
arg
result)))
+ (option '("max-parallel-builds") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'max-parallel-builds
+ (string->number arg)
+ result)))
(option '("substitute-urls") #t #f
(lambda (opt name arg result)
(alist-cons 'substitute-urls
@@ -61,7 +66,8 @@
(define %option-defaults
;; Alist of default option values
- `((coordinator . "http://localhost:8745")))
+ `((coordinator . "http://localhost:8745")
+ (max-parallel-builds . 1)))
(define (parse-options options defaults args)
(args-fold
@@ -81,6 +87,7 @@
(run-agent (assq-ref opts 'uuid)
(assq-ref opts 'coordinator)
(assq-ref opts 'password)
+ (assq-ref opts 'max-parallel-builds)
(or (assq-ref opts 'derivation-substitute-urls)
(assq-ref opts 'substitute-urls))
(or (assq-ref opts 'non-derivation-substitute-urls)