diff options
-rw-r--r-- | guix-build-coordinator/agent.scm | 159 | ||||
-rw-r--r-- | guix-build-coordinator/utils.scm | 47 |
2 files changed, 107 insertions, 99 deletions
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index 51f0c1d..20bd74a 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -35,113 +35,76 @@ #:use-module (guix-build-coordinator agent-messaging http) #:export (run-agent)) -(define (process-jobs-concurrently fetch-new-jobs - process-job - max-processes) - (define futures - (make-hash-table)) - - (define (futures-count) - (hash-count (const #t) futures)) - - (define (handle-finished-futures) - (for-each (lambda (job-future) - (when (atomic-box-ref (hashq-ref futures job-future)) - (touch job-future) - (hashq-remove! futures job-future))) - (hash-map->list - (lambda (key value) key) - futures))) - - (define (process-job-in-future job-args) - (let* ((result-box (make-atomic-box #f)) - (job-future - (make-future - (lambda () - (catch - #t - (lambda () - (apply process-job job-args) - (atomic-box-set! result-box #t)) - (lambda (key args) - (atomic-box-set! result-box #t))))))) - (hashq-set! futures job-future result-box))) - - (while #t - (handle-finished-futures) - (when (< (futures-count) max-processes) - (match (fetch-new-jobs (- max-processes (futures-count))) - (() - ;; No new jobs available - (sleep 30)) - ((jobs ...) - (for-each - (lambda (job-args) - (process-job-in-future (list job-args))) - jobs)))) - (sleep 5))) - (define (run-agent uuid coordinator-uri password max-parallel-builds derivation-substitute-urls non-derivation-substitute-urls) - (let ((initial-builds #f)) - - (define (fetch-new-jobs count) - (if initial-builds - (let ((builds initial-builds)) - (set! initial-builds #f) - builds) - (let ((received-builds - (fetch-builds-for-agent coordinator-uri uuid password - #:count count))) - (simple-format #t "requested ~A builds, received ~A\n" - count (length received-builds)) - received-builds))) + (define (fetch-new-jobs count) + (let ((received-builds + (fetch-builds-for-agent coordinator-uri uuid password + #:count count))) + (simple-format #t "requested ~A builds, received ~A\n" + count (length received-builds)) + received-builds)) - (define (process-job build) - (simple-format #t "processing build: ~A\n" - (assoc-ref build "uuid")) - (let ((derivation-name (assoc-ref build "derivation-name"))) - (simple-format #t "setting up to build: ~A\n" - derivation-name) - (let ((pre-build-status (pre-build-process - derivation-substitute-urls - non-derivation-substitute-urls - derivation-name))) - (if (eq? (assq-ref pre-build-status 'result) 'success) - (begin - (simple-format #t "setup successful, building: ~A\n" - derivation-name) - (let ((result (perform-build derivation-name))) - (and=> (derivation-log-file derivation-name) - (lambda (log-file) - (simple-format #t "uploading log file ~A\n" - log-file) - (submit-log-file - coordinator-uri uuid password - (assoc-ref build "uuid") - log-file))) - ((if result - post-build-success - post-build-failure) - uuid coordinator-uri password - (assoc-ref build "uuid") - derivation-name))) - (begin - (simple-format #t "failure: ~A\n" pre-build-status) - (report-setup-failure coordinator-uri uuid password - (assoc-ref build "uuid") - pre-build-status)))))) + (define (process-job build) + (simple-format #t "processing build: ~A\n" + (assoc-ref build "uuid")) + (let ((derivation-name (assoc-ref build "derivation-name"))) + (simple-format #t "setting up to build: ~A\n" + derivation-name) + (let ((pre-build-status (pre-build-process + derivation-substitute-urls + non-derivation-substitute-urls + derivation-name))) + (if (eq? (assq-ref pre-build-status 'result) 'success) + (begin + (simple-format #t "setup successful, building: ~A\n" + derivation-name) + (let ((result (perform-build derivation-name))) + (and=> (derivation-log-file derivation-name) + (lambda (log-file) + (simple-format #t "uploading log file ~A\n" + log-file) + (submit-log-file + coordinator-uri uuid password + (assoc-ref build "uuid") + log-file))) + ((if result + post-build-success + post-build-failure) + uuid coordinator-uri password + (assoc-ref build "uuid") + derivation-name))) + (begin + (simple-format #t "failure: ~A\n" pre-build-status) + (report-setup-failure coordinator-uri uuid password + (assoc-ref build "uuid") + pre-build-status)))))) + (let-values (((process-job-with-queue count-running-jobs) + (create-work-queue max-parallel-builds + process-job))) (let ((details (submit-status coordinator-uri uuid password 'idle))) (let ((builds (vector->list (assoc-ref details "builds")))) - (unless (null? builds) - (set! initial-builds builds)))) + (for-each + (lambda (job-args) + (process-job-with-queue job-args)) + builds))) - (process-jobs-concurrently fetch-new-jobs - process-job - max-parallel-builds))) + (while #t + (let ((running-jobs (count-running-jobs))) + (when (< running-jobs max-parallel-builds) + (match (fetch-new-jobs (- max-parallel-builds running-jobs)) + (() + ;; No new jobs available + (sleep 30)) + ((jobs ...) + (for-each + (lambda (job-args) + (process-job-with-queue job-args)) + jobs))))) + (sleep 5)))) (define (pre-build-process derivation-substitute-urls non-derivation-substitute-urls diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index 43a1db7..f605f7f 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -3,6 +3,7 @@ #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) #:use-module (srfi srfi-60) + #:use-module (ice-9 q) #:use-module (ice-9 ftw) #:use-module (ice-9 popen) #:use-module (ice-9 match) @@ -51,7 +52,9 @@ s3-list-objects s3-cp - with-time-logging)) + with-time-logging + + create-work-queue)) (define %worker-thread-args @@ -515,3 +518,45 @@ References: ~a~%" (define-syntax-rule (with-time-logging name exp ...) "Log under NAME the time taken to evaluate EXP." (call-with-time-logging name (lambda () exp ...))) + +(define (create-work-queue thread-count proc) + (let ((queue (make-q)) + (queue-mutex (make-mutex)) + (job-available (make-condition-variable)) + (running-jobs-count 0)) + + (define (process-job . args) + (with-mutex queue-mutex + (enq! queue args) + (signal-condition-variable job-available))) + + (define (count-running-jobs) + (with-mutex queue-mutex + running-jobs-count)) + + (for-each + (lambda (i) + (call-with-new-thread + (lambda () + (let loop () + (lock-mutex queue-mutex) + (when (q-empty? queue) + (wait-condition-variable job-available + queue-mutex)) + (let ((job-args (deq! queue))) + (set! running-jobs-count (+ 1 running-jobs-count)) + (unlock-mutex queue-mutex) + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "exception when handling job: ~A\n" + exn)) + (lambda () + (apply proc job-args)) + #:unwind? #t) + (with-mutex queue-mutex + (set! running-jobs-count (- running-jobs-count 1)))) + (loop))))) + (iota thread-count)) + + (values process-job count-running-jobs))) |