aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-build-coordinator/agent.scm159
-rw-r--r--guix-build-coordinator/utils.scm47
2 files changed, 107 insertions, 99 deletions
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index 51f0c1d..20bd74a 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -35,113 +35,76 @@
#:use-module (guix-build-coordinator agent-messaging http)
#:export (run-agent))
-(define (process-jobs-concurrently fetch-new-jobs
- process-job
- max-processes)
- (define futures
- (make-hash-table))
-
- (define (futures-count)
- (hash-count (const #t) futures))
-
- (define (handle-finished-futures)
- (for-each (lambda (job-future)
- (when (atomic-box-ref (hashq-ref futures job-future))
- (touch job-future)
- (hashq-remove! futures job-future)))
- (hash-map->list
- (lambda (key value) key)
- futures)))
-
- (define (process-job-in-future job-args)
- (let* ((result-box (make-atomic-box #f))
- (job-future
- (make-future
- (lambda ()
- (catch
- #t
- (lambda ()
- (apply process-job job-args)
- (atomic-box-set! result-box #t))
- (lambda (key args)
- (atomic-box-set! result-box #t)))))))
- (hashq-set! futures job-future result-box)))
-
- (while #t
- (handle-finished-futures)
- (when (< (futures-count) max-processes)
- (match (fetch-new-jobs (- max-processes (futures-count)))
- (()
- ;; No new jobs available
- (sleep 30))
- ((jobs ...)
- (for-each
- (lambda (job-args)
- (process-job-in-future (list job-args)))
- jobs))))
- (sleep 5)))
-
(define (run-agent uuid coordinator-uri password
max-parallel-builds
derivation-substitute-urls
non-derivation-substitute-urls)
- (let ((initial-builds #f))
-
- (define (fetch-new-jobs count)
- (if initial-builds
- (let ((builds initial-builds))
- (set! initial-builds #f)
- builds)
- (let ((received-builds
- (fetch-builds-for-agent coordinator-uri uuid password
- #:count count)))
- (simple-format #t "requested ~A builds, received ~A\n"
- count (length received-builds))
- received-builds)))
+ (define (fetch-new-jobs count)
+ (let ((received-builds
+ (fetch-builds-for-agent coordinator-uri uuid password
+ #:count count)))
+ (simple-format #t "requested ~A builds, received ~A\n"
+ count (length received-builds))
+ received-builds))
- (define (process-job build)
- (simple-format #t "processing build: ~A\n"
- (assoc-ref build "uuid"))
- (let ((derivation-name (assoc-ref build "derivation-name")))
- (simple-format #t "setting up to build: ~A\n"
- derivation-name)
- (let ((pre-build-status (pre-build-process
- derivation-substitute-urls
- non-derivation-substitute-urls
- derivation-name)))
- (if (eq? (assq-ref pre-build-status 'result) 'success)
- (begin
- (simple-format #t "setup successful, building: ~A\n"
- derivation-name)
- (let ((result (perform-build derivation-name)))
- (and=> (derivation-log-file derivation-name)
- (lambda (log-file)
- (simple-format #t "uploading log file ~A\n"
- log-file)
- (submit-log-file
- coordinator-uri uuid password
- (assoc-ref build "uuid")
- log-file)))
- ((if result
- post-build-success
- post-build-failure)
- uuid coordinator-uri password
- (assoc-ref build "uuid")
- derivation-name)))
- (begin
- (simple-format #t "failure: ~A\n" pre-build-status)
- (report-setup-failure coordinator-uri uuid password
- (assoc-ref build "uuid")
- pre-build-status))))))
+ (define (process-job build)
+ (simple-format #t "processing build: ~A\n"
+ (assoc-ref build "uuid"))
+ (let ((derivation-name (assoc-ref build "derivation-name")))
+ (simple-format #t "setting up to build: ~A\n"
+ derivation-name)
+ (let ((pre-build-status (pre-build-process
+ derivation-substitute-urls
+ non-derivation-substitute-urls
+ derivation-name)))
+ (if (eq? (assq-ref pre-build-status 'result) 'success)
+ (begin
+ (simple-format #t "setup successful, building: ~A\n"
+ derivation-name)
+ (let ((result (perform-build derivation-name)))
+ (and=> (derivation-log-file derivation-name)
+ (lambda (log-file)
+ (simple-format #t "uploading log file ~A\n"
+ log-file)
+ (submit-log-file
+ coordinator-uri uuid password
+ (assoc-ref build "uuid")
+ log-file)))
+ ((if result
+ post-build-success
+ post-build-failure)
+ uuid coordinator-uri password
+ (assoc-ref build "uuid")
+ derivation-name)))
+ (begin
+ (simple-format #t "failure: ~A\n" pre-build-status)
+ (report-setup-failure coordinator-uri uuid password
+ (assoc-ref build "uuid")
+ pre-build-status))))))
+ (let-values (((process-job-with-queue count-running-jobs)
+ (create-work-queue max-parallel-builds
+ process-job)))
(let ((details (submit-status coordinator-uri uuid password 'idle)))
(let ((builds (vector->list (assoc-ref details "builds"))))
- (unless (null? builds)
- (set! initial-builds builds))))
+ (for-each
+ (lambda (job-args)
+ (process-job-with-queue job-args))
+ builds)))
- (process-jobs-concurrently fetch-new-jobs
- process-job
- max-parallel-builds)))
+ (while #t
+ (let ((running-jobs (count-running-jobs)))
+ (when (< running-jobs max-parallel-builds)
+ (match (fetch-new-jobs (- max-parallel-builds running-jobs))
+ (()
+ ;; No new jobs available
+ (sleep 30))
+ ((jobs ...)
+ (for-each
+ (lambda (job-args)
+ (process-job-with-queue job-args))
+ jobs)))))
+ (sleep 5))))
(define (pre-build-process derivation-substitute-urls
non-derivation-substitute-urls
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index 43a1db7..f605f7f 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -3,6 +3,7 @@
#:use-module (srfi srfi-19)
#:use-module (srfi srfi-26)
#:use-module (srfi srfi-60)
+ #:use-module (ice-9 q)
#:use-module (ice-9 ftw)
#:use-module (ice-9 popen)
#:use-module (ice-9 match)
@@ -51,7 +52,9 @@
s3-list-objects
s3-cp
- with-time-logging))
+ with-time-logging
+
+ create-work-queue))
(define %worker-thread-args
@@ -515,3 +518,45 @@ References: ~a~%"
(define-syntax-rule (with-time-logging name exp ...)
"Log under NAME the time taken to evaluate EXP."
(call-with-time-logging name (lambda () exp ...)))
+
+(define (create-work-queue thread-count proc)
+ (let ((queue (make-q))
+ (queue-mutex (make-mutex))
+ (job-available (make-condition-variable))
+ (running-jobs-count 0))
+
+ (define (process-job . args)
+ (with-mutex queue-mutex
+ (enq! queue args)
+ (signal-condition-variable job-available)))
+
+ (define (count-running-jobs)
+ (with-mutex queue-mutex
+ running-jobs-count))
+
+ (for-each
+ (lambda (i)
+ (call-with-new-thread
+ (lambda ()
+ (let loop ()
+ (lock-mutex queue-mutex)
+ (when (q-empty? queue)
+ (wait-condition-variable job-available
+ queue-mutex))
+ (let ((job-args (deq! queue)))
+ (set! running-jobs-count (+ 1 running-jobs-count))
+ (unlock-mutex queue-mutex)
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "exception when handling job: ~A\n"
+ exn))
+ (lambda ()
+ (apply proc job-args))
+ #:unwind? #t)
+ (with-mutex queue-mutex
+ (set! running-jobs-count (- running-jobs-count 1))))
+ (loop)))))
+ (iota thread-count))
+
+ (values process-job count-running-jobs)))