aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/agent-messaging
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2021-11-14 14:29:01 +0000
committerChristopher Baines <mail@cbaines.net>2021-11-14 14:29:01 +0000
commite4f6b6061c7a04944c7d7ba0bc0fb0a878b207ca (patch)
tree89e70cbdaf63fb53863f700c9a71cc14b5ee7c85 /guix-build-coordinator/agent-messaging
parent898a87c9fa4089fc3648ffe721df15f7719e02fe (diff)
downloadbuild-coordinator-e4f6b6061c7a04944c7d7ba0bc0fb0a878b207ca.tar
build-coordinator-e4f6b6061c7a04944c7d7ba0bc0fb0a878b207ca.tar.gz
Implement initial support for resuming HTTP uploads
This means agents reattempting uploads don't have to start from scratch, and can instead pick up from what's already been uploaded to the coordinator.
Diffstat (limited to 'guix-build-coordinator/agent-messaging')
-rw-r--r--guix-build-coordinator/agent-messaging/http.scm62
-rw-r--r--guix-build-coordinator/agent-messaging/http/server.scm121
2 files changed, 163 insertions, 20 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm
index a29c908..2252293 100644
--- a/guix-build-coordinator/agent-messaging/http.scm
+++ b/guix-build-coordinator/agent-messaging/http.scm
@@ -253,6 +253,20 @@
#:key
(log default-log)
report-bytes-sent)
+ (define (get-partial-upload-bytes)
+ (let-values (((body response)
+ (coordinator-http-request
+ log
+ interface
+ (string-append "/build/" build-id
+ "/output/" output-name
+ "/partial")
+ #:method 'HEAD)))
+ (if (eq? (response-code response)
+ 404)
+ #f
+ (response-content-length response))))
+
(define auth-value
(string-append
"Basic "
@@ -262,10 +276,11 @@
":"
(slot-ref interface 'password))))))
- (define uri
+ (define* (uri #:key resume?)
(coordinator-uri-for-path
(slot-ref interface 'coordinator-uri)
- (string-append "/build/" build-id "/output/" output-name)))
+ (string-append "/build/" build-id "/output/" output-name
+ (if resume? "/partial" ""))))
(define path-info
(with-store store
@@ -287,24 +302,31 @@
(lambda ()
(call-with-input-file template
(lambda (file-port)
- (let-values (((response body)
- (call-with-streaming-http-request
- uri
- (lambda (port)
- (with-time-logging
- (simple-format #f "sending ~A" file)
- (dump-port file-port port
- #:buffer-size 65536)))
- #:headers `((Authorization . ,auth-value)))))
- (when (>= (response-code response) 400)
- (raise-exception
- (make-exception-with-message
- (coordinator-handle-failed-request log
- 'PUT
- (uri-path uri)
- response
- body))))))))
- #:times 12
+ (let ((bytes (get-partial-upload-bytes)))
+ (when bytes
+ (seek file-port bytes SEEK_SET)
+ (log 'INFO "resuming upload from byte " bytes))
+
+ (let-values (((response body)
+ (call-with-streaming-http-request
+ (uri #:resume? (integer? bytes))
+ (lambda (port)
+ (with-time-logging
+ (simple-format #f "sending ~A" file)
+ (dump-port file-port port
+ #:buffer-size 65536)))
+ #:headers `((Authorization . ,auth-value))
+ #:method (if bytes 'POST 'PUT)
+ #:report-bytes-sent (lambda (bytes) (when (eq? bytes 65536) (error "FOO"))))))
+ (when (>= (response-code response) 400)
+ (raise-exception
+ (make-exception-with-message
+ (coordinator-handle-failed-request log
+ 'PUT
+ (uri-path uri)
+ response
+ body)))))))))
+ #:times 100
#:delay (random 15))
(delete-file template)))
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm
index a21ae80..1f4bdd5 100644
--- a/guix-build-coordinator/agent-messaging/http/server.scm
+++ b/guix-build-coordinator/agent-messaging/http/server.scm
@@ -492,6 +492,127 @@ port. Also, the port used can be changed by passing the --port option.\n"
(render-json
'(("error" . "access denied"))
#:code 403))))
+ (('HEAD "build" uuid "output" output-name "partial")
+ (let ((agent-id-for-build
+ (datastore-agent-for-build datastore uuid)))
+ (if (authenticated? agent-id-for-build request)
+ (let* ((output-file-name
+ (build-output-file-location datastore uuid output-name))
+ (tmp-output-file-name
+ (string-append output-file-name ".tmp")))
+ (if (file-exists? tmp-output-file-name)
+ (let ((bytes (stat:size (stat tmp-output-file-name))))
+ (list (build-response
+ #:code 200
+ #:headers `((content-length . ,bytes)))
+ #f))
+ (render-json
+ '((error . "no partial content"))
+ #:code 404)))
+ (render-json
+ '(("error" . "access denied"))
+ #:code 403))))
+ (('POST "build" uuid "output" output-name "partial")
+ (let ((agent-id-for-build
+ (datastore-agent-for-build datastore uuid)))
+
+ (define (receive-file output-file-name tmp-output-file-name)
+ (call-with-worker-thread
+ chunked-request-channel
+ (lambda ()
+ (let ((output-port (open-file tmp-output-file-name "a")))
+ (let ((start-time (current-time time-utc)))
+ (let loop ((bv (get-bytevector-some body))
+ (bytes-read 0)
+ (last-progress-update-bytes-read 0))
+ (if (eof-object? bv)
+ (let* ((end-time (current-time time-utc))
+ (elapsed (time-difference end-time
+ start-time))
+ (seconds-elapsed
+ (+ (time-second elapsed)
+ (/ (time-nanosecond elapsed) 1e9))))
+ (display
+ (simple-format
+ #f
+ "received ~A
+ took ~A seconds
+ data transfered: ~AMB (~A bytes)
+ speed (MB/s): ~A
+"
+ (basename output-file-name)
+ seconds-elapsed
+ (rationalize (exact->inexact (/ bytes-read 1000000))
+ 0.1)
+ bytes-read
+ (rationalize (/ (/ bytes-read 1000000)
+ seconds-elapsed)
+ 0.1))))
+ (begin
+ (put-bytevector output-port bv)
+ (loop (get-bytevector-some body)
+ (+ bytes-read
+ (bytevector-length bv))
+ (if (> (- bytes-read
+ last-progress-update-bytes-read)
+ 50000000) ; ~50MB
+ (begin
+ (display
+ (simple-format
+ #f
+ "receiving ~A
+ ~AMB read so far...
+"
+ (basename output-file-name)
+ (rationalize (exact->inexact
+ (/ bytes-read
+ 1000000))
+ 0.1)))
+ bytes-read)
+ last-progress-update-bytes-read))))))
+ (close-port output-port))
+
+ ;; Compute the hash of the file
+ (let ((hash (bytevector->nix-base32-string
+ (call-with-input-file tmp-output-file-name
+ (lambda (compressed-port)
+ (call-with-lzip-input-port compressed-port
+ port-sha256))))))
+ (rename-file tmp-output-file-name
+ output-file-name)
+ hash))))
+
+ (if (authenticated? agent-id-for-build request)
+ (let* ((output-file-name
+ (build-output-file-location datastore uuid output-name))
+ (tmp-output-file-name
+ (string-append output-file-name ".tmp")))
+ (let ((hash
+ (if (bytevector? body)
+ (let ((output-port (open-file tmp-output-file-name "a")))
+ (put-bytevector output-port body)
+ (close-port output-port)
+
+ (let ((hash
+ (bytevector->nix-base32-string
+ (call-with-input-file tmp-output-file-name
+ (lambda (compressed-port)
+ (call-with-lzip-input-port compressed-port
+ port-sha256))))))
+ (rename-file tmp-output-file-name
+ output-file-name)
+ hash))
+ (receive-file output-file-name
+ tmp-output-file-name))))
+
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" hash)))
+
+ (no-content)))
+ (render-json
+ '(("error" . "access denied"))
+ #:code 403))))
(('GET "metrics")
(update-managed-metrics!)
(list (build-response