aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/agent.scm
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2020-05-17 22:05:48 +0100
committerChristopher Baines <mail@cbaines.net>2020-05-17 22:05:48 +0100
commitff0d12fb5cbedd09b368815b6bd433104e145454 (patch)
treef5f7154dcc4b9b1d1fcea58b08b2008e68a039e5 /guix-build-coordinator/agent.scm
parent137d39440e78454cef83dd4be5701290d00e771e (diff)
downloadbuild-coordinator-ff0d12fb5cbedd09b368815b6bd433104e145454.tar
build-coordinator-ff0d12fb5cbedd09b368815b6bd433104e145454.tar.gz
Support agents processing builds in parallel
Diffstat (limited to 'guix-build-coordinator/agent.scm')
-rw-r--r--guix-build-coordinator/agent.scm167
1 files changed, 117 insertions, 50 deletions
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index 65f2400..92104f2 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -32,59 +32,126 @@
#:use-module (guix-build-coordinator agent-messaging http)
#:export (run-agent))
+(define (process-jobs-concurrently fetch-new-jobs
+ process-job
+ max-processes)
+ (define processes
+ (make-hash-table))
+
+ (define (wait-on-processes)
+ (catch
+ #t
+ (lambda ()
+ (match (waitpid WAIT_ANY WNOHANG)
+ ((0 . status)
+ ;; No process to wait for
+ #f)
+ ((pid . status)
+ (hashv-remove! processes pid)
+ (simple-format (current-error-port)
+ "pid ~A failed with status ~A\n"
+ pid status)
+
+ ;; Recurse, to check for other finished processes.
+ (wait-on-processes)
+ #t)))
+ (lambda (key . args)
+ (simple-format #t "wait-on-processes: ~A: ~A\n"
+ key args)
+ #f)))
+
+ (define (fork-and-process-job job-args)
+ (match (primitive-fork)
+ (0
+ (dynamic-wind
+ (const #t)
+ (lambda ()
+ (apply process-job job-args))
+ (lambda ()
+ (primitive-exit 127))))
+ (pid
+ (hashv-set! processes pid
+ (list (current-time) job-args))
+ #t)))
+
+ (while #t
+ (unless (wait-on-processes)
+ (unless (eq? 0 (sleep 5))
+ (exit 0)))
+ (let ((current-processes (hash-count (const #t) processes)))
+ (when (< current-processes max-processes)
+ (match (fetch-new-jobs (- max-processes current-processes))
+ (()
+ ;; Nothing to do
+ #f)
+ ((jobs ...)
+ (for-each
+ (lambda (job-args)
+ (fork-and-process-job (list job-args)))
+ jobs)))))))
+
(define (run-agent uuid coordinator-uri password
+ max-parallel-builds
derivation-substitute-urls
non-derivation-substitute-urls)
- (while #t
- (let* ((details (submit-status coordinator-uri uuid password
- 'idle))
- (builds (let ((already-allocated-builds
- (vector->list
- (assoc-ref details "builds"))))
- (if (null? already-allocated-builds)
- (fetch-builds-for-agent
- coordinator-uri uuid password)
- already-allocated-builds))))
- (simple-format #t "received allocated builds (~A)\n"
- (length builds))
- (for-each (lambda (build)
- (simple-format #t "processing build: ~A\n"
- (assoc-ref build "uuid"))
- (let ((derivation-name (assoc-ref build "derivation-name")))
- (simple-format #t "setting up to build: ~A\n"
- derivation-name)
- (let ((pre-build-status (pre-build-process
- derivation-substitute-urls
- non-derivation-substitute-urls
- derivation-name)))
- (if (eq? (assq-ref pre-build-status 'result) 'success)
- (begin
- (simple-format #t "setup successful, building: ~A\n"
- derivation-name)
- (let ((result (perform-build derivation-name)))
- (and=> (derivation-log-file derivation-name)
- (lambda (log-file)
- (simple-format #t "uploading log file ~A\n"
- log-file)
- (submit-log-file
- coordinator-uri uuid password
- (assoc-ref build "uuid")
- log-file)))
- ((if result
- post-build-success
- post-build-failure)
- uuid coordinator-uri password
- (assoc-ref build "uuid")
- derivation-name)))
- (begin
- (simple-format #t "failure: ~A\n" pre-build-status)
- (report-setup-failure coordinator-uri uuid password
- (assoc-ref build "uuid")
- pre-build-status))))))
- builds)
-
- (when (null? builds)
- (sleep 5)))))
+ (let ((initial-builds #f))
+
+ (define (fetch-new-jobs count)
+ (if initial-builds
+ (let ((builds initial-builds))
+ (set! initial-builds #f)
+ builds)
+ (let ((received-builds
+ (fetch-builds-for-agent coordinator-uri uuid password
+ #:count count)))
+ (simple-format #t "requested ~A builds, received ~A\n"
+ count (length received-builds))
+ received-builds)))
+
+ (define (process-job build)
+ (simple-format #t "processing build: ~A\n"
+ (assoc-ref build "uuid"))
+ (let ((derivation-name (assoc-ref build "derivation-name")))
+ (simple-format #t "setting up to build: ~A\n"
+ derivation-name)
+ (let ((pre-build-status (pre-build-process
+ derivation-substitute-urls
+ non-derivation-substitute-urls
+ derivation-name)))
+ (if (eq? (assq-ref pre-build-status 'result) 'success)
+ (begin
+ (simple-format #t "setup successful, building: ~A\n"
+ derivation-name)
+ (let ((result (perform-build derivation-name)))
+ (and=> (derivation-log-file derivation-name)
+ (lambda (log-file)
+ (simple-format #t "uploading log file ~A\n"
+ log-file)
+ (submit-log-file
+ coordinator-uri uuid password
+ (assoc-ref build "uuid")
+ log-file)))
+ ((if result
+ post-build-success
+ post-build-failure)
+ uuid coordinator-uri password
+ (assoc-ref build "uuid")
+ derivation-name)))
+ (begin
+ (simple-format #t "failure: ~A\n" pre-build-status)
+ (report-setup-failure coordinator-uri uuid password
+ (assoc-ref build "uuid")
+ pre-build-status)))))
+ (exit 0))
+
+ (let ((details (submit-status coordinator-uri uuid password 'idle)))
+ (let ((builds (vector->list (assoc-ref details "builds"))))
+ (unless (null? builds)
+ (set! initial-builds builds))))
+
+ (process-jobs-concurrently fetch-new-jobs
+ process-job
+ max-parallel-builds)))
(define (pre-build-process derivation-substitute-urls
non-derivation-substitute-urls