diff options
author | Christopher Baines <mail@cbaines.net> | 2023-05-08 16:05:05 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2023-05-08 19:47:30 +0100 |
commit | 0ee9ce1b3755706cf5e283e4612b68581c4be37e (patch) | |
tree | e142898ddc5e47a7a20c346ccee08623e0a6c58a | |
parent | dd68c838e992075f338d349d413c8b98a4395c7d (diff) | |
download | build-coordinator-0ee9ce1b3755706cf5e283e4612b68581c4be37e.tar build-coordinator-0ee9ce1b3755706cf5e283e4612b68581c4be37e.tar.gz |
Stop using chunked transfers for file uploads
As the amount of data to upload is known, this is unnecessary complexity and
overhead.
-rw-r--r-- | guix-build-coordinator/agent-messaging/http.scm | 130 | ||||
-rw-r--r-- | guix-build-coordinator/agent-messaging/http/server.scm | 394 | ||||
-rw-r--r-- | guix-build-coordinator/utils.scm | 33 |
3 files changed, 252 insertions, 305 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm index 7a56520..2703c91 100644 --- a/guix-build-coordinator/agent-messaging/http.scm +++ b/guix-build-coordinator/agent-messaging/http.scm @@ -21,6 +21,7 @@ (define-module (guix-build-coordinator agent-messaging http) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) + #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) @@ -370,57 +371,58 @@ (seek file-port bytes SEEK_SET) (log 'INFO "resuming upload from byte " bytes)) - (let ((upload-uri - (coordinator-uri-for-path - (slot-ref interface 'coordinator-uri) - (string-append "/build/" build-id "/output/" output-name - (if (integer? bytes) - "/partial" - ""))))) - - (let-values (((response body) - ;; TODO This chunks the transfer, but it - ;; doesn't need to since the length of the - ;; body is known. The chunking is useful - ;; however, as it matches up with the - ;; read-request-body hack to avoid reading - ;; the entire request body in to memory. - (call-with-streaming-http-request - upload-uri - (lambda (port) - (with-time-logging - (simple-format #f "sending ~A" file) - (dump-port* file-port port - #:reporter reporter))) - #:headers `((Authorization . ,auth-value)) - #:method (if bytes 'POST 'PUT)))) - (log 'DEBUG "perform upload " file ", response code: " - (response-code response)) - - (when (>= (response-code response) 400) - (raise-exception - (make-exception-with-message - (coordinator-handle-failed-request - log - 'PUT - (uri-path upload-uri) - response - body)))))))))))) - - (unless (and=> - (get-completed-upload-bytes) - (lambda (uploaded-bytes) - (= uploaded-bytes file-size))) - (retry-on-error perform-upload - #:times 100 - #:delay 15 - #:error-hook - (lambda _ - (log 'DEBUG - "perform-upload " file - " (bytes: " file-size ", " - "md5: " (force file-md5-hash-promise) - ")"))))) + (let* ((upload-uri + (coordinator-uri-for-path + (slot-ref interface 'coordinator-uri) + (string-append "/build/" build-id "/output/" output-name + (if (integer? bytes) + "/partial" + "")))) + (bytes-to-send + (if bytes + (- file-size bytes) + file-size)) + (response + body + (call-with-streaming-http-request + upload-uri + bytes-to-send + (lambda (port) + (with-time-logging + (simple-format #f "sending ~A" file) + (dump-port* file-port port + #:reporter reporter))) + #:headers `((Authorization . ,auth-value)) + #:method (if bytes 'POST 'PUT)))) + + (log 'DEBUG "perform upload " file ", response code: " + (response-code response)) + + (when (>= (response-code response) 400) + (raise-exception + (make-exception-with-message + (coordinator-handle-failed-request + log + 'PUT + (uri-path upload-uri) + response + body)))))) + #:binary #t))))) + + (unless (and=> + (get-completed-upload-bytes) + (lambda (uploaded-bytes) + (= uploaded-bytes file-size))) + (retry-on-error perform-upload + #:times 100 + #:delay 15 + #:error-hook + (lambda _ + (log 'DEBUG + "perform-upload " file + " (bytes: " file-size ", " + "md5: " (force file-md5-hash-promise) + ")"))))) args)) (define-method (submit-log-file @@ -452,21 +454,17 @@ (retry-on-error (lambda () - (let-values (((response body) - ;; TODO This chunks the transfer, but it doesn't need to - ;; since the length of the body is known. The chunking - ;; is useful however, as it matches up with the - ;; read-request-body hack to avoid reading the entire - ;; request body in to memory. - (call-with-streaming-http-request - uri - (lambda (request-port) - (call-with-input-file file - (lambda (file-port) - (dump-port file-port request-port - #:buffer-size (expt 2 20))) - #:binary #t)) - #:headers `((Authorization . ,auth-value))))) + (let ((response + body + (call-with-streaming-http-request + uri + (stat:size (stat file)) + (lambda (request-port) + (call-with-input-file file + (lambda (file-port) + (dump-port file-port request-port)) + #:binary #t)) + #:headers `((Authorization . ,auth-value))))) (if (>= (response-code response) 400) (raise-exception (make-exception-with-message diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm index 4cf1e97..f2b6bab 100644 --- a/guix-build-coordinator/agent-messaging/http/server.scm +++ b/guix-build-coordinator/agent-messaging/http/server.scm @@ -55,7 +55,7 @@ (define (bad-request message . args) (throw 'bad-request message args)) -(define (fixed/read-request-body r) +(define (read-request-body/patch r) "Reads the request body from R, as a bytevector. Return â#fâ if there was no request body." (cond @@ -64,6 +64,12 @@ if there was no request body." (make-chunked-input-port (request-port r) ;; closing the port is handled elsewhere #:keep-alive? #t)) + ;; Since the guile-fibers webserver/Guile force reading the request body as + ;; a bytevector, patching read-request-body in this way to detect this + ;; custom header allows handling some request bodies without reading it in + ;; to a bytevector. This is used when handling file uploads. + ((assq-ref (request-headers r) 'stream-body) + (request-port r)) (else (let ((nbytes (request-content-length r))) (and nbytes @@ -75,7 +81,7 @@ if there was no request body." (module-set! (resolve-module '(web request)) 'read-request-body - fixed/read-request-body) + read-request-body/patch) (define* (port-hash* algorithm port #:key (reporter progress-reporter/silent)) @@ -308,6 +314,69 @@ port. Also, the port used can be changed by passing the --port option.\n" #:binary #t)) #:unwind? #t))) +(define* (receive-file body + length + output-file-name + #:key append?) + (define body-port + (if (bytevector? body) + ;; If the Stream-Body header isn't set, then it's possible + ;; that the body will be a port + (open-bytevector-input-port body) + body)) + + (call-with-port + (if append? + (open-file output-file-name "a") + (open-output-file output-file-name #:binary #t)) + (lambda (output-port) + (let ((start-time (current-time time-utc))) + + (define output-progress + (rate-limited + (lambda (bytes-read) + (display + (simple-format + #f + "receiving ~A + ~AMB read so far... +" + (basename output-file-name) + (format #f "~2,2f" + (/ bytes-read + 1000000))))) + (make-time time-duration 0 20))) + + (dump-port + body-port + output-port + length + #:progress + (lambda (bytes-transfered continue-thunk) + (output-progress bytes-transfered) + (continue-thunk))) + + (let* ((end-time (current-time time-utc)) + (elapsed (time-difference end-time + start-time)) + (seconds-elapsed + (+ (time-second elapsed) + (/ (time-nanosecond elapsed) 1e9)))) + (display + (simple-format + #f + "received ~A + took ~A seconds + data transfered: ~AMB (~A bytes) + speed (MB/s): ~A +" + (basename output-file-name) + seconds-elapsed + (format #f "~2,2f" (/ length 1000000)) + length + (format #f "~2,2f" (/ (/ length 1000000) + seconds-elapsed))))))))) + (define (controller request method-and-path-components body @@ -483,19 +552,26 @@ port. Also, the port used can be changed by passing the --port option.\n" (lambda (file) (not (member file '("." "..")))))) - (if (bytevector? body) - (call-with-output-file tmp-output-file-name - (lambda (output-port) - (put-bytevector output-port body))) - (call-with-worker-thread - chunked-request-channel - (lambda () - (call-with-output-file tmp-output-file-name - (lambda (output-port) - (let loop ((bv (get-bytevector-some body))) - (unless (eof-object? bv) - (put-bytevector output-port bv) - (loop (get-bytevector-some body))))))))) + (let ((body-port + (if (bytevector? body) + ;; If the Stream-Body header isn't set, then it's possible + ;; that the body will be a port + (open-bytevector-input-port body) + body))) + (call-with-output-file tmp-output-file-name + (lambda (output-port) + ;; Older agents may still attempt to use chunked encoding + ;; for this request + (if (member '(chunked) (request-transfer-encoding request)) + (call-with-worker-thread + chunked-request-channel + (lambda () + (dump-port body-port + output-port + (request-content-length request)))) + (dump-port body-port + output-port + (request-content-length request)))))) (rename-file tmp-output-file-name output-file-name) (no-content)) @@ -522,59 +598,6 @@ port. Also, the port used can be changed by passing the --port option.\n" (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) - (define (receive-file output-file-name tmp-output-file-name) - (call-with-worker-thread - chunked-request-channel - (lambda () - (call-with-output-file tmp-output-file-name - (lambda (output-port) - (let ((start-time (current-time time-utc))) - (let loop ((bv (get-bytevector-some body)) - (bytes-read 0) - (last-progress-update-bytes-read 0)) - (if (eof-object? bv) - (let* ((end-time (current-time time-utc)) - (elapsed (time-difference end-time - start-time)) - (seconds-elapsed - (+ (time-second elapsed) - (/ (time-nanosecond elapsed) 1e9)))) - (display - (simple-format - #f - "received ~A - took ~A seconds - data transfered: ~AMB (~A bytes) - speed (MB/s): ~A -" - (basename output-file-name) - seconds-elapsed - (format #f "~2,2f" (/ bytes-read 1000000)) - bytes-read - (format #f "~2,2f" (/ (/ bytes-read 1000000) - seconds-elapsed))))) - (begin - (put-bytevector output-port bv) - (loop (get-bytevector-some body) - (+ bytes-read - (bytevector-length bv)) - (if (> (- bytes-read - last-progress-update-bytes-read) - 50000000) ; ~50MB - (begin - (display - (simple-format - #f - "receiving ~A - ~AMB read so far... -" - (basename output-file-name) - (format #f "~2,2f" - (/ bytes-read - 1000000)))) - bytes-read) - last-progress-update-bytes-read))))))))))) - (if (authenticated? agent-id-for-build request) (let* ((output-file-name (build-output-file-location datastore uuid output-name)) @@ -595,56 +618,46 @@ port. Also, the port used can be changed by passing the --port option.\n" "deleting " tmp-output-file-name) (delete-file tmp-output-file-name)) - (if (bytevector? body) - (begin - (call-with-output-file tmp-output-file-name - (lambda (output-port) - (put-bytevector output-port body))) - - (let ((hash (compute-hash-of-uploaded-output - logger - tmp-output-file-name))) - (log-msg logger - 'DEBUG - "computed the hash of " tmp-output-file-name - ", renaming") - - (rename-file tmp-output-file-name - output-file-name) - - (call-with-output-file (string-append output-file-name ".hash") - (lambda (port) - (simple-format port "~A\n" hash)))) - - (no-content)) - (begin - (receive-file output-file-name - tmp-output-file-name) - - (list - (build-response - #:code 200 - #:headers '((content-type . (text/plain)) - (Transfer-Encoding . "chunked"))) - (lambda (response-port) - ;; Compute the hash of the file - (let* ((reporter (progress-reporter/hash - (stat:size (stat tmp-output-file-name)) - response-port)) - (hash (compute-hash-of-uploaded-output - logger - tmp-output-file-name - #:reporter reporter))) - (log-msg logger - 'DEBUG - "computed the hash of " tmp-output-file-name ", renaming") - - (call-with-output-file (string-append output-file-name ".hash") - (lambda (port) - (simple-format port "~A\n" hash)))) - - (rename-file tmp-output-file-name - output-file-name)))))) + (if (member '(chunked) (request-transfer-encoding request)) + ;; Older agents may use chunked encoding for this request + (call-with-worker-thread + chunked-request-channel + (lambda () + (receive-file body + (request-content-length request) + tmp-output-file-name))) + (receive-file body + (request-content-length request) + tmp-output-file-name)) + + (list + (build-response + #:code 200 + #:headers '((content-type . (text/plain)) + (Transfer-Encoding . "chunked"))) + (lambda (response-port) + ;; Compute the hash of the file + (let* ((reporter (progress-reporter/hash + (stat:size (stat tmp-output-file-name)) + response-port)) + (hash + (call-with-worker-thread + chunked-request-channel + (lambda () + (compute-hash-of-uploaded-output + logger + tmp-output-file-name + #:reporter reporter))))) + (log-msg logger + 'DEBUG + "computed the hash of " tmp-output-file-name ", renaming") + + (call-with-output-file (string-append output-file-name ".hash") + (lambda (port) + (simple-format port "~A\n" hash)))) + + (rename-file tmp-output-file-name + output-file-name)))) (render-json '(("error" . "access denied")) #:code 403)))) @@ -670,60 +683,6 @@ port. Also, the port used can be changed by passing the --port option.\n" (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) - (define (receive-file output-file-name tmp-output-file-name) - (call-with-worker-thread - chunked-request-channel - (lambda () - (let ((output-port (open-file tmp-output-file-name "a"))) - (let ((start-time (current-time time-utc))) - (let loop ((bv (get-bytevector-some body)) - (bytes-read 0) - (last-progress-update-bytes-read 0)) - (if (eof-object? bv) - (let* ((end-time (current-time time-utc)) - (elapsed (time-difference end-time - start-time)) - (seconds-elapsed - (+ (time-second elapsed) - (/ (time-nanosecond elapsed) 1e9)))) - (display - (simple-format - #f - "received ~A - took ~A seconds - data transfered: ~AMB (~A bytes) - speed (MB/s): ~A -" - (basename output-file-name) - seconds-elapsed - (format #f "~2,2f" - (/ bytes-read 1000000)) - bytes-read - (format #f "~2,2f" (/ (/ bytes-read 1000000) - seconds-elapsed))))) - (begin - (put-bytevector output-port bv) - (loop (get-bytevector-some body) - (+ bytes-read - (bytevector-length bv)) - (if (> (- bytes-read - last-progress-update-bytes-read) - 50000000) ; ~50MB - (begin - (display - (simple-format - #f - "receiving ~A - ~AMB read so far... -" - (basename output-file-name) - (format #f "~2,2f" - (/ bytes-read - 1000000)))) - bytes-read) - last-progress-update-bytes-read)))))) - (close-port output-port))))) - (if (authenticated? agent-id-for-build request) (let* ((output-file-name (build-output-file-location datastore uuid output-name)) @@ -739,55 +698,48 @@ port. Also, the port used can be changed by passing the --port option.\n" "deleting " output-file-name) (delete-file output-file-name)) - (if (bytevector? body) - (let ((output-port (open-file tmp-output-file-name "a"))) - (put-bytevector output-port body) - (close-port output-port) - - (let ((hash (compute-hash-of-uploaded-output - logger - tmp-output-file-name))) - (log-msg logger - 'DEBUG - "computed the hash of " tmp-output-file-name - ", renaming") - - (rename-file tmp-output-file-name - output-file-name) - - (call-with-output-file (string-append output-file-name ".hash") - (lambda (port) - (simple-format port "~A\n" hash)))) - - (no-content)) - (begin - (receive-file output-file-name - tmp-output-file-name) - - (list - (build-response - #:code 200 - #:headers '((content-type . (text/plain)) - (Transfer-Encoding . "chunked"))) - (lambda (response-port) - ;; Compute the hash of the file - (let* ((reporter (progress-reporter/hash - (stat:size (stat tmp-output-file-name)) - response-port)) - (hash (compute-hash-of-uploaded-output - logger - tmp-output-file-name - #:reporter reporter))) - (log-msg logger - 'DEBUG - "computed the hash of " tmp-output-file-name ", renaming") - - (call-with-output-file (string-append output-file-name ".hash") - (lambda (port) - (simple-format port "~A\n" hash)))) - - (rename-file tmp-output-file-name - output-file-name)))))) + (if (member '(chunked) (request-transfer-encoding request)) + ;; Older agents may use chunked encoding for this request + (call-with-worker-thread + chunked-request-channel + (lambda () + (receive-file body + (request-content-length request) + tmp-output-file-name + #:append? #t))) + (receive-file body + (request-content-length request) + tmp-output-file-name + #:append? #t)) + + (list + (build-response + #:code 200 + #:headers '((content-type . (text/plain)) + (Transfer-Encoding . "chunked"))) + (lambda (response-port) + ;; Compute the hash of the file + (let* ((reporter (progress-reporter/hash + (stat:size (stat tmp-output-file-name)) + response-port)) + (hash + (call-with-worker-thread + chunked-request-channel + (lambda () + (compute-hash-of-uploaded-output + logger + tmp-output-file-name + #:reporter reporter))))) + (log-msg logger + 'DEBUG + "computed the hash of " tmp-output-file-name ", renaming") + + (call-with-output-file (string-append output-file-name ".hash") + (lambda (port) + (simple-format port "~A\n" hash)))) + + (rename-file tmp-output-file-name + output-file-name)))) (render-json '(("error" . "access denied")) #:code 403)))) diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index 2fe7948..7f4bf0c 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -304,7 +304,9 @@ (setvbuf ret 'block buffering) ret)) -(define* (call-with-streaming-http-request uri callback +(define* (call-with-streaming-http-request uri + content-length + callback #:key (headers '()) (method 'PUT)) (with-port-timeouts @@ -316,8 +318,10 @@ #:method method #:version '(1 . 1) #:headers `((connection close) - (Transfer-Encoding . "chunked") + (content-length . ,content-length) (Content-Type . "application/octet-stream") + ;; read-request-body/patch looks for this header + (Stream-Body . "true") ,@headers) #:port port))) @@ -325,26 +329,19 @@ (setvbuf port 'block (expt 2 13)) (with-exception-handler (lambda (exp) - (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp) + (simple-format #t "error: ~A ~A: ~A\n" + method (uri-path uri) exp) (close-port port) (raise-exception exp)) (lambda () (let ((request (write-request request port))) - (let* ((chunked-output-port - (make-chunked-output-port* - port - #:buffering (expt 2 12) - #:keep-alive? #t))) - - (set-port-encoding! chunked-output-port "ISO-8859-1") - (callback chunked-output-port) - (close-port chunked-output-port) - - (let ((response (read-response port))) - (let ((body (read-response-body response))) - (close-port port) - (values response - body))))))))))) + (callback port) + + (let ((response (read-response port))) + (let ((body (read-response-body response))) + (close-port port) + (values response + body)))))))))) (define (find-missing-substitutes-for-output store substitute-urls output) (if (valid-path? store output) |