diff options
Diffstat (limited to 'guix-build-coordinator/agent.scm')
-rw-r--r-- | guix-build-coordinator/agent.scm | 167 |
1 files changed, 117 insertions, 50 deletions
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index 65f2400..92104f2 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -32,59 +32,126 @@ #:use-module (guix-build-coordinator agent-messaging http) #:export (run-agent)) +(define (process-jobs-concurrently fetch-new-jobs + process-job + max-processes) + (define processes + (make-hash-table)) + + (define (wait-on-processes) + (catch + #t + (lambda () + (match (waitpid WAIT_ANY WNOHANG) + ((0 . status) + ;; No process to wait for + #f) + ((pid . status) + (hashv-remove! processes pid) + (simple-format (current-error-port) + "pid ~A failed with status ~A\n" + pid status) + + ;; Recurse, to check for other finished processes. + (wait-on-processes) + #t))) + (lambda (key . args) + (simple-format #t "wait-on-processes: ~A: ~A\n" + key args) + #f))) + + (define (fork-and-process-job job-args) + (match (primitive-fork) + (0 + (dynamic-wind + (const #t) + (lambda () + (apply process-job job-args)) + (lambda () + (primitive-exit 127)))) + (pid + (hashv-set! processes pid + (list (current-time) job-args)) + #t))) + + (while #t + (unless (wait-on-processes) + (unless (eq? 0 (sleep 5)) + (exit 0))) + (let ((current-processes (hash-count (const #t) processes))) + (when (< current-processes max-processes) + (match (fetch-new-jobs (- max-processes current-processes)) + (() + ;; Nothing to do + #f) + ((jobs ...) + (for-each + (lambda (job-args) + (fork-and-process-job (list job-args))) + jobs))))))) + (define (run-agent uuid coordinator-uri password + max-parallel-builds derivation-substitute-urls non-derivation-substitute-urls) - (while #t - (let* ((details (submit-status coordinator-uri uuid password - 'idle)) - (builds (let ((already-allocated-builds - (vector->list - (assoc-ref details "builds")))) - (if (null? already-allocated-builds) - (fetch-builds-for-agent - coordinator-uri uuid password) - already-allocated-builds)))) - (simple-format #t "received allocated builds (~A)\n" - (length builds)) - (for-each (lambda (build) - (simple-format #t "processing build: ~A\n" - (assoc-ref build "uuid")) - (let ((derivation-name (assoc-ref build "derivation-name"))) - (simple-format #t "setting up to build: ~A\n" - derivation-name) - (let ((pre-build-status (pre-build-process - derivation-substitute-urls - non-derivation-substitute-urls - derivation-name))) - (if (eq? (assq-ref pre-build-status 'result) 'success) - (begin - (simple-format #t "setup successful, building: ~A\n" - derivation-name) - (let ((result (perform-build derivation-name))) - (and=> (derivation-log-file derivation-name) - (lambda (log-file) - (simple-format #t "uploading log file ~A\n" - log-file) - (submit-log-file - coordinator-uri uuid password - (assoc-ref build "uuid") - log-file))) - ((if result - post-build-success - post-build-failure) - uuid coordinator-uri password - (assoc-ref build "uuid") - derivation-name))) - (begin - (simple-format #t "failure: ~A\n" pre-build-status) - (report-setup-failure coordinator-uri uuid password - (assoc-ref build "uuid") - pre-build-status)))))) - builds) - - (when (null? builds) - (sleep 5))))) + (let ((initial-builds #f)) + + (define (fetch-new-jobs count) + (if initial-builds + (let ((builds initial-builds)) + (set! initial-builds #f) + builds) + (let ((received-builds + (fetch-builds-for-agent coordinator-uri uuid password + #:count count))) + (simple-format #t "requested ~A builds, received ~A\n" + count (length received-builds)) + received-builds))) + + (define (process-job build) + (simple-format #t "processing build: ~A\n" + (assoc-ref build "uuid")) + (let ((derivation-name (assoc-ref build "derivation-name"))) + (simple-format #t "setting up to build: ~A\n" + derivation-name) + (let ((pre-build-status (pre-build-process + derivation-substitute-urls + non-derivation-substitute-urls + derivation-name))) + (if (eq? (assq-ref pre-build-status 'result) 'success) + (begin + (simple-format #t "setup successful, building: ~A\n" + derivation-name) + (let ((result (perform-build derivation-name))) + (and=> (derivation-log-file derivation-name) + (lambda (log-file) + (simple-format #t "uploading log file ~A\n" + log-file) + (submit-log-file + coordinator-uri uuid password + (assoc-ref build "uuid") + log-file))) + ((if result + post-build-success + post-build-failure) + uuid coordinator-uri password + (assoc-ref build "uuid") + derivation-name))) + (begin + (simple-format #t "failure: ~A\n" pre-build-status) + (report-setup-failure coordinator-uri uuid password + (assoc-ref build "uuid") + pre-build-status))))) + (exit 0)) + + (let ((details (submit-status coordinator-uri uuid password 'idle))) + (let ((builds (vector->list (assoc-ref details "builds")))) + (unless (null? builds) + (set! initial-builds builds)))) + + (process-jobs-concurrently fetch-new-jobs + process-job + max-parallel-builds))) (define (pre-build-process derivation-substitute-urls non-derivation-substitute-urls |