aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2021-06-08 19:24:26 +0100
committerChristopher Baines <mail@cbaines.net>2021-06-08 19:24:26 +0100
commitaac5a14ef1ac4ab86d023efbef786816a37f6ccb (patch)
treeaaa3464d5b77eb0fba4b54d666290c668487f5be
parent4a474176aa7401fa6da1de84d2df5816e33f65b8 (diff)
downloadbuild-coordinator-aac5a14ef1ac4ab86d023efbef786816a37f6ccb.tar
build-coordinator-aac5a14ef1ac4ab86d023efbef786816a37f6ccb.tar.gz
WIP
-rw-r--r--guix-build-coordinator/agent-messaging/http.scm8
-rw-r--r--guix-build-coordinator/agent.scm85
-rw-r--r--guix-build-coordinator/utils.scm11
3 files changed, 90 insertions, 14 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm
index 38ccb70..5e753eb 100644
--- a/guix-build-coordinator/agent-messaging/http.scm
+++ b/guix-build-coordinator/agent-messaging/http.scm
@@ -244,7 +244,10 @@
.
args)
(apply
- (lambda* (build-id output-name file #:key (log default-log))
+ (lambda* (build-id output-name file
+ #:key
+ (log default-log)
+ report-bytes-sent)
(define auth-value
(string-append
"Basic "
@@ -272,7 +275,8 @@
(lambda (port)
(write-file file port))
#:level 9))
- #:headers `((Authorization . ,auth-value))))
+ #:headers `((Authorization . ,auth-value))
+ #:report-bytes-sent report-bytes-sent))
#:times 48
#:delay (random 15)))
args))
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index 424d808..e5ba2c5 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -20,6 +20,7 @@
(define-module (guix-build-coordinator agent)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-19)
#:use-module (ice-9 match)
@@ -43,6 +44,11 @@
#:use-module (guix-build-coordinator agent-messaging abstract)
#:export (run-agent))
+(define-record-type <upload-progress>
+ (make-upload-progress bytes-sent)
+ upload-progress?
+ (bytes-sent upload-progress-bytes-sent set-upload-progress-bytes-sent!))
+
(define (run-agent uuid
coordinator-interface
systems
@@ -95,6 +101,61 @@
(write-textfile metrics-registry
metrics-file)))
+ (define upload-slots
+ (make-vector 2 #f))
+
+ (define queued-uploads
+ '())
+
+ (define uploads-mutex
+ (make-mutex))
+ (define uploads-condition-variable
+ (make-condition-variable))
+
+ (define (with-upload-slot lgr file p)
+ (define (report-bytes-sent bytes)
+ (peek "UPLOAD SLOTS" upload-slots)
+ (peek "QUEUED UPLOADS" queued-uploads)
+ (display
+ (simple-format #f "bytes sent: ~A\n" bytes)))
+
+ (lock-mutex uploads-mutex)
+
+ (set! queued-uploads
+ (cons file queued-uploads))
+
+ (let loop ()
+ (let ((free-index
+ (any (lambda (index)
+ (eq? (vector-ref upload-slots index)
+ #f))
+ (iota (vector-length upload-slots)
+ 0))))
+
+ (if free-index
+ (begin
+ (vector-set! upload-slots
+ free-index
+ (make-upload-progress 0))
+
+ (set! queued-uploads
+ (delete file queued-uploads string=?))
+
+ (unlock-mutex uploads-mutex)
+
+ (call-with-values (lambda () (p report-bytes-sent))
+ (lambda vals
+ (signal-condition-variable uploads-condition-variable)
+
+ (apply values vals))))
+ (begin
+ (wait-condition-variable
+ uploads-condition-variable
+ uploads-mutex
+ (+ 15 (time-second (current-time))))
+
+ (loop))))))
+
(define (process-job build)
(let ((build-id (assoc-ref build "uuid"))
(derivation-name (or (assoc-ref build "derivation_name")
@@ -143,7 +204,8 @@
build-id
derivation-name
end-time
- submit-outputs?)
+ submit-outputs?
+ with-upload-slot)
(post-build-failure lgr
coordinator-interface
build-id
@@ -579,7 +641,8 @@ but the guix-daemon claims it's unavailable"
(define (post-build-success lgr coordinator-interface
build-id derivation end-time
- submit-outputs?)
+ submit-outputs?
+ with-upload-slot)
(define output-details
(map
(match-lambda
@@ -670,13 +733,17 @@ but the guix-daemon claims it's unavailable"
#:unwind? #t))
(define (submit-one-output output-name output)
- (log-msg lgr 'INFO
- build-id ": submitting output "
- (derivation-output-path output))
- (submit-output coordinator-interface
- build-id output-name
- (derivation-output-path output)
- #:log (build-log-procedure lgr build-id)))
+ (with-upload-slot
+ lgr
+ (derivation-output-path output)
+ (lambda (report-bytes-sent)
+ (log-msg lgr 'INFO
+ build-id ": submitting output "
+ (derivation-output-path output))
+ (submit-output coordinator-interface
+ build-id output-name
+ (derivation-output-path output)
+ #:log (build-log-procedure lgr build-id)))))
(if submit-outputs?
(begin
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index c833183..bc18081 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -261,7 +261,8 @@ upcoming chunk."
gc-enable))
(define* (make-chunked-output-port* port #:key (keep-alive? #f)
- (buffering 1200))
+ (buffering 1200)
+ report-bytes-sent)
(define heap-allocated-limit
(expt 2 20)) ;; 1MiB
@@ -276,6 +277,8 @@ upcoming chunk."
(put-bytevector port bv)
(put-string port "\r\n")))
+ (when report-bytes-sent
+ (report-bytes-sent length))
(let* ((stats (gc-stats))
(initial-gc-times
(assq-ref stats 'gc-times)))
@@ -304,7 +307,8 @@ upcoming chunk."
ret))
(define* (call-with-streaming-http-request uri callback
- #:key (headers '()))
+ #:key (headers '())
+ report-bytes-sent)
(let* ((port (open-socket-for-uri uri))
(request
(build-request
@@ -330,7 +334,8 @@ upcoming chunk."
(make-chunked-output-port*
port
#:buffering (expt 2 12)
- #:keep-alive? #t)))
+ #:keep-alive? #t
+ #:report-bytes-sent report-bytes-sent)))
;; A SIGPIPE will kill Guile, so ignore it
(sigaction SIGPIPE