From aac5a14ef1ac4ab86d023efbef786816a37f6ccb Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 8 Jun 2021 19:24:26 +0100 Subject: WIP --- guix-build-coordinator/agent-messaging/http.scm | 8 ++- guix-build-coordinator/agent.scm | 85 ++++++++++++++++++++++--- guix-build-coordinator/utils.scm | 11 +++- 3 files changed, 90 insertions(+), 14 deletions(-) diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm index 38ccb70..5e753eb 100644 --- a/guix-build-coordinator/agent-messaging/http.scm +++ b/guix-build-coordinator/agent-messaging/http.scm @@ -244,7 +244,10 @@ . args) (apply - (lambda* (build-id output-name file #:key (log default-log)) + (lambda* (build-id output-name file + #:key + (log default-log) + report-bytes-sent) (define auth-value (string-append "Basic " @@ -272,7 +275,8 @@ (lambda (port) (write-file file port)) #:level 9)) - #:headers `((Authorization . ,auth-value)))) + #:headers `((Authorization . ,auth-value)) + #:report-bytes-sent report-bytes-sent)) #:times 48 #:delay (random 15))) args)) diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index 424d808..e5ba2c5 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -20,6 +20,7 @@ (define-module (guix-build-coordinator agent) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (ice-9 match) @@ -43,6 +44,11 @@ #:use-module (guix-build-coordinator agent-messaging abstract) #:export (run-agent)) +(define-record-type + (make-upload-progress bytes-sent) + upload-progress? + (bytes-sent upload-progress-bytes-sent set-upload-progress-bytes-sent!)) + (define (run-agent uuid coordinator-interface systems @@ -95,6 +101,61 @@ (write-textfile metrics-registry metrics-file))) + (define upload-slots + (make-vector 2 #f)) + + (define queued-uploads + '()) + + (define uploads-mutex + (make-mutex)) + (define uploads-condition-variable + (make-condition-variable)) + + (define (with-upload-slot lgr file p) + (define (report-bytes-sent bytes) + (peek "UPLOAD SLOTS" upload-slots) + (peek "QUEUED UPLOADS" queued-uploads) + (display + (simple-format #f "bytes sent: ~A\n" bytes))) + + (lock-mutex uploads-mutex) + + (set! queued-uploads + (cons file queued-uploads)) + + (let loop () + (let ((free-index + (any (lambda (index) + (eq? (vector-ref upload-slots index) + #f)) + (iota (vector-length upload-slots) + 0)))) + + (if free-index + (begin + (vector-set! upload-slots + free-index + (make-upload-progress 0)) + + (set! queued-uploads + (delete file queued-uploads string=?)) + + (unlock-mutex uploads-mutex) + + (call-with-values (lambda () (p report-bytes-sent)) + (lambda vals + (signal-condition-variable uploads-condition-variable) + + (apply values vals)))) + (begin + (wait-condition-variable + uploads-condition-variable + uploads-mutex + (+ 15 (time-second (current-time)))) + + (loop)))))) + (define (process-job build) (let ((build-id (assoc-ref build "uuid")) (derivation-name (or (assoc-ref build "derivation_name") @@ -143,7 +204,8 @@ build-id derivation-name end-time - submit-outputs?) + submit-outputs? + with-upload-slot) (post-build-failure lgr coordinator-interface build-id @@ -579,7 +641,8 @@ but the guix-daemon claims it's unavailable" (define (post-build-success lgr coordinator-interface build-id derivation end-time - submit-outputs?) + submit-outputs? + with-upload-slot) (define output-details (map (match-lambda @@ -670,13 +733,17 @@ but the guix-daemon claims it's unavailable" #:unwind? #t)) (define (submit-one-output output-name output) - (log-msg lgr 'INFO - build-id ": submitting output " - (derivation-output-path output)) - (submit-output coordinator-interface - build-id output-name - (derivation-output-path output) - #:log (build-log-procedure lgr build-id))) + (with-upload-slot + lgr + (derivation-output-path output) + (lambda (report-bytes-sent) + (log-msg lgr 'INFO + build-id ": submitting output " + (derivation-output-path output)) + (submit-output coordinator-interface + build-id output-name + (derivation-output-path output) + #:log (build-log-procedure lgr build-id))))) (if submit-outputs? (begin diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index c833183..bc18081 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -261,7 +261,8 @@ upcoming chunk." gc-enable)) (define* (make-chunked-output-port* port #:key (keep-alive? #f) - (buffering 1200)) + (buffering 1200) + report-bytes-sent) (define heap-allocated-limit (expt 2 20)) ;; 1MiB @@ -276,6 +277,8 @@ upcoming chunk." (put-bytevector port bv) (put-string port "\r\n"))) + (when report-bytes-sent + (report-bytes-sent length)) (let* ((stats (gc-stats)) (initial-gc-times (assq-ref stats 'gc-times))) @@ -304,7 +307,8 @@ upcoming chunk." ret)) (define* (call-with-streaming-http-request uri callback - #:key (headers '())) + #:key (headers '()) + report-bytes-sent) (let* ((port (open-socket-for-uri uri)) (request (build-request @@ -330,7 +334,8 @@ upcoming chunk." (make-chunked-output-port* port #:buffering (expt 2 12) - #:keep-alive? #t))) + #:keep-alive? #t + #:report-bytes-sent report-bytes-sent))) ;; A SIGPIPE will kill Guile, so ignore it (sigaction SIGPIPE -- cgit v1.2.3