diff options
author | Christopher Baines <mail@cbaines.net> | 2021-11-14 14:29:01 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2021-11-14 14:29:01 +0000 |
commit | e4f6b6061c7a04944c7d7ba0bc0fb0a878b207ca (patch) | |
tree | 89e70cbdaf63fb53863f700c9a71cc14b5ee7c85 /guix-build-coordinator/agent-messaging | |
parent | 898a87c9fa4089fc3648ffe721df15f7719e02fe (diff) | |
download | build-coordinator-e4f6b6061c7a04944c7d7ba0bc0fb0a878b207ca.tar build-coordinator-e4f6b6061c7a04944c7d7ba0bc0fb0a878b207ca.tar.gz |
Implement initial support for resuming HTTP uploads
This means agents reattempting uploads don't have to start from scratch, and
can instead pick up from what's already been uploaded to the coordinator.
Diffstat (limited to 'guix-build-coordinator/agent-messaging')
-rw-r--r-- | guix-build-coordinator/agent-messaging/http.scm | 62 | ||||
-rw-r--r-- | guix-build-coordinator/agent-messaging/http/server.scm | 121 |
2 files changed, 163 insertions, 20 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm index a29c908..2252293 100644 --- a/guix-build-coordinator/agent-messaging/http.scm +++ b/guix-build-coordinator/agent-messaging/http.scm @@ -253,6 +253,20 @@ #:key (log default-log) report-bytes-sent) + (define (get-partial-upload-bytes) + (let-values (((body response) + (coordinator-http-request + log + interface + (string-append "/build/" build-id + "/output/" output-name + "/partial") + #:method 'HEAD))) + (if (eq? (response-code response) + 404) + #f + (response-content-length response)))) + (define auth-value (string-append "Basic " @@ -262,10 +276,11 @@ ":" (slot-ref interface 'password)))))) - (define uri + (define* (uri #:key resume?) (coordinator-uri-for-path (slot-ref interface 'coordinator-uri) - (string-append "/build/" build-id "/output/" output-name))) + (string-append "/build/" build-id "/output/" output-name + (if resume? "/partial" "")))) (define path-info (with-store store @@ -287,24 +302,31 @@ (lambda () (call-with-input-file template (lambda (file-port) - (let-values (((response body) - (call-with-streaming-http-request - uri - (lambda (port) - (with-time-logging - (simple-format #f "sending ~A" file) - (dump-port file-port port - #:buffer-size 65536))) - #:headers `((Authorization . ,auth-value))))) - (when (>= (response-code response) 400) - (raise-exception - (make-exception-with-message - (coordinator-handle-failed-request log - 'PUT - (uri-path uri) - response - body)))))))) - #:times 12 + (let ((bytes (get-partial-upload-bytes))) + (when bytes + (seek file-port bytes SEEK_SET) + (log 'INFO "resuming upload from byte " bytes)) + + (let-values (((response body) + (call-with-streaming-http-request + (uri #:resume? (integer? bytes)) + (lambda (port) + (with-time-logging + (simple-format #f "sending ~A" file) + (dump-port file-port port + #:buffer-size 65536))) + #:headers `((Authorization . ,auth-value)) + #:method (if bytes 'POST 'PUT) + #:report-bytes-sent (lambda (bytes) (when (eq? bytes 65536) (error "FOO")))))) + (when (>= (response-code response) 400) + (raise-exception + (make-exception-with-message + (coordinator-handle-failed-request log + 'PUT + (uri-path uri) + response + body))))))))) + #:times 100 #:delay (random 15)) (delete-file template))) diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm index a21ae80..1f4bdd5 100644 --- a/guix-build-coordinator/agent-messaging/http/server.scm +++ b/guix-build-coordinator/agent-messaging/http/server.scm @@ -492,6 +492,127 @@ port. Also, the port used can be changed by passing the --port option.\n" (render-json '(("error" . "access denied")) #:code 403)))) + (('HEAD "build" uuid "output" output-name "partial") + (let ((agent-id-for-build + (datastore-agent-for-build datastore uuid))) + (if (authenticated? agent-id-for-build request) + (let* ((output-file-name + (build-output-file-location datastore uuid output-name)) + (tmp-output-file-name + (string-append output-file-name ".tmp"))) + (if (file-exists? tmp-output-file-name) + (let ((bytes (stat:size (stat tmp-output-file-name)))) + (list (build-response + #:code 200 + #:headers `((content-length . ,bytes))) + #f)) + (render-json + '((error . "no partial content")) + #:code 404))) + (render-json + '(("error" . "access denied")) + #:code 403)))) + (('POST "build" uuid "output" output-name "partial") + (let ((agent-id-for-build + (datastore-agent-for-build datastore uuid))) + + (define (receive-file output-file-name tmp-output-file-name) + (call-with-worker-thread + chunked-request-channel + (lambda () + (let ((output-port (open-file tmp-output-file-name "a"))) + (let ((start-time (current-time time-utc))) + (let loop ((bv (get-bytevector-some body)) + (bytes-read 0) + (last-progress-update-bytes-read 0)) + (if (eof-object? bv) + (let* ((end-time (current-time time-utc)) + (elapsed (time-difference end-time + start-time)) + (seconds-elapsed + (+ (time-second elapsed) + (/ (time-nanosecond elapsed) 1e9)))) + (display + (simple-format + #f + "received ~A + took ~A seconds + data transfered: ~AMB (~A bytes) + speed (MB/s): ~A +" + (basename output-file-name) + seconds-elapsed + (rationalize (exact->inexact (/ bytes-read 1000000)) + 0.1) + bytes-read + (rationalize (/ (/ bytes-read 1000000) + seconds-elapsed) + 0.1)))) + (begin + (put-bytevector output-port bv) + (loop (get-bytevector-some body) + (+ bytes-read + (bytevector-length bv)) + (if (> (- bytes-read + last-progress-update-bytes-read) + 50000000) ; ~50MB + (begin + (display + (simple-format + #f + "receiving ~A + ~AMB read so far... +" + (basename output-file-name) + (rationalize (exact->inexact + (/ bytes-read + 1000000)) + 0.1))) + bytes-read) + last-progress-update-bytes-read)))))) + (close-port output-port)) + + ;; Compute the hash of the file + (let ((hash (bytevector->nix-base32-string + (call-with-input-file tmp-output-file-name + (lambda (compressed-port) + (call-with-lzip-input-port compressed-port + port-sha256)))))) + (rename-file tmp-output-file-name + output-file-name) + hash)))) + + (if (authenticated? agent-id-for-build request) + (let* ((output-file-name + (build-output-file-location datastore uuid output-name)) + (tmp-output-file-name + (string-append output-file-name ".tmp"))) + (let ((hash + (if (bytevector? body) + (let ((output-port (open-file tmp-output-file-name "a"))) + (put-bytevector output-port body) + (close-port output-port) + + (let ((hash + (bytevector->nix-base32-string + (call-with-input-file tmp-output-file-name + (lambda (compressed-port) + (call-with-lzip-input-port compressed-port + port-sha256)))))) + (rename-file tmp-output-file-name + output-file-name) + hash)) + (receive-file output-file-name + tmp-output-file-name)))) + + (call-with-output-file (string-append output-file-name ".hash") + (lambda (port) + (simple-format port "~A\n" hash))) + + (no-content))) + (render-json + '(("error" . "access denied")) + #:code 403)))) (('GET "metrics") (update-managed-metrics!) (list (build-response |