From 93a0b224cc06ec358cab63a9a88c5eab44d4f4fd Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 8 Jun 2021 21:31:35 +0100 Subject: Limit the parallelism of output uploads And report the progress periodically. This can be a bottleneck if the upload speed is slow, and the machine is fast at building things. --- guix-build-coordinator/agent.scm | 154 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 144 insertions(+), 10 deletions(-) diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index 424d808..b89c1f8 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -20,8 +20,10 @@ (define-module (guix-build-coordinator agent) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) + #:use-module (srfi srfi-43) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 futures) @@ -43,6 +45,12 @@ #:use-module (guix-build-coordinator agent-messaging abstract) #:export (run-agent)) +(define-record-type + (make-upload-progress file bytes-sent) + upload-progress? + (file upload-progress-file) + (bytes-sent upload-progress-bytes-sent set-upload-progress-bytes-sent!)) + (define (run-agent uuid coordinator-interface systems @@ -95,6 +103,121 @@ (write-textfile metrics-registry metrics-file))) + (define upload-slots + (make-vector 3 #f)) + + (define queued-uploads + '()) + + (define uploads-mutex + (make-mutex)) + (define uploads-condition-variable + (make-condition-variable)) + + (define (with-upload-slot lgr file p) + (define upload-progress-record + (make-upload-progress file 0)) + + (define last-progress-update-bytes-sent 0) + + (define (report-bytes-sent bytes) + (let ((bytes-now-sent + (+ (upload-progress-bytes-sent upload-progress-record) + bytes))) + + (set-upload-progress-bytes-sent! + upload-progress-record + bytes-now-sent) + + (when (> bytes-now-sent + (+ last-progress-update-bytes-sent 20000000)) + (set! last-progress-update-bytes-sent + bytes-now-sent) + + (let ((uploads-count + (vector-count (lambda (_ slot) + (not (eq? #f slot))) + upload-slots)) + (queued-uploads-count + (length queued-uploads))) + + (log-msg lgr 'INFO + uploads-count " uploads in progress, " + queued-uploads-count " queued") + + (vector-for-each + (lambda (_ upload-progress) + (when upload-progress + (log-msg lgr 'INFO + (upload-progress-file upload-progress) + ": " + (rationalize (exact->inexact + (/ (upload-progress-bytes-sent + upload-progress) + 1000000)) + 0.1) + "MB sent"))) + upload-slots))))) + + (define (free-slot index) + (with-mutex uploads-mutex + (peek "FREEING SLOT" index) + (vector-set! upload-slots + index + #f)) + + (signal-condition-variable uploads-condition-variable)) + + (lock-mutex uploads-mutex) + + (set! queued-uploads + (cons file queued-uploads)) + + (let loop () + (let ((free-index + (any (lambda (index) + (if (eq? (vector-ref upload-slots index) + #f) + index + #f)) + (iota (vector-length upload-slots) + 0)))) + + (if free-index + (begin + (vector-set! upload-slots + free-index + upload-progress-record) + + (peek "USING SLOT" free-index) + + (set! queued-uploads + (delete file queued-uploads string=?)) + + (unlock-mutex uploads-mutex) + + (call-with-values + (lambda () + (with-exception-handler + (lambda (exn) + (free-slot free-index) + + (raise-exception exn)) + (lambda () + (p report-bytes-sent)) + #:unwind? #t)) + (lambda vals + (free-slot free-index) + + (apply values vals)))) + (begin + (wait-condition-variable + uploads-condition-variable + uploads-mutex + (+ 15 (time-second (current-time)))) + + (loop)))))) + (define (process-job build) (let ((build-id (assoc-ref build "uuid")) (derivation-name (or (assoc-ref build "derivation_name") @@ -143,7 +266,8 @@ build-id derivation-name end-time - submit-outputs?) + submit-outputs? + with-upload-slot) (post-build-failure lgr coordinator-interface build-id @@ -180,7 +304,11 @@ (create-work-queue current-max-builds process-job #:thread-start-delay - (make-time time-duration 0 90)))) + (make-time time-duration + 0 + (max 20 + (- 121 (* 100 + (/ 120 64)))))))) (define (display-info) (display (simple-format @@ -579,7 +707,8 @@ but the guix-daemon claims it's unavailable" (define (post-build-success lgr coordinator-interface build-id derivation end-time - submit-outputs?) + submit-outputs? + with-upload-slot) (define output-details (map (match-lambda @@ -670,13 +799,18 @@ but the guix-daemon claims it's unavailable" #:unwind? #t)) (define (submit-one-output output-name output) - (log-msg lgr 'INFO - build-id ": submitting output " - (derivation-output-path output)) - (submit-output coordinator-interface - build-id output-name - (derivation-output-path output) - #:log (build-log-procedure lgr build-id))) + (with-upload-slot + lgr + (derivation-output-path output) + (lambda (report-bytes-sent) + (log-msg lgr 'INFO + build-id ": submitting output " + (derivation-output-path output)) + (submit-output coordinator-interface + build-id output-name + (derivation-output-path output) + #:log (build-log-procedure lgr build-id) + #:report-bytes-sent report-bytes-sent)))) (if submit-outputs? (begin -- cgit v1.2.3