aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-05-08 16:05:05 +0100
committerChristopher Baines <mail@cbaines.net>2023-05-08 19:47:30 +0100
commit0ee9ce1b3755706cf5e283e4612b68581c4be37e (patch)
treee142898ddc5e47a7a20c346ccee08623e0a6c58a
parentdd68c838e992075f338d349d413c8b98a4395c7d (diff)
downloadbuild-coordinator-0ee9ce1b3755706cf5e283e4612b68581c4be37e.tar
build-coordinator-0ee9ce1b3755706cf5e283e4612b68581c4be37e.tar.gz
Stop using chunked transfers for file uploads
As the amount of data to upload is known, this is unnecessary complexity and overhead.
-rw-r--r--guix-build-coordinator/agent-messaging/http.scm130
-rw-r--r--guix-build-coordinator/agent-messaging/http/server.scm394
-rw-r--r--guix-build-coordinator/utils.scm33
3 files changed, 252 insertions, 305 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm
index 7a56520..2703c91 100644
--- a/guix-build-coordinator/agent-messaging/http.scm
+++ b/guix-build-coordinator/agent-messaging/http.scm
@@ -21,6 +21,7 @@
(define-module (guix-build-coordinator agent-messaging http)
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-19)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
@@ -370,57 +371,58 @@
(seek file-port bytes SEEK_SET)
(log 'INFO "resuming upload from byte " bytes))
- (let ((upload-uri
- (coordinator-uri-for-path
- (slot-ref interface 'coordinator-uri)
- (string-append "/build/" build-id "/output/" output-name
- (if (integer? bytes)
- "/partial"
- "")))))
-
- (let-values (((response body)
- ;; TODO This chunks the transfer, but it
- ;; doesn't need to since the length of the
- ;; body is known. The chunking is useful
- ;; however, as it matches up with the
- ;; read-request-body hack to avoid reading
- ;; the entire request body in to memory.
- (call-with-streaming-http-request
- upload-uri
- (lambda (port)
- (with-time-logging
- (simple-format #f "sending ~A" file)
- (dump-port* file-port port
- #:reporter reporter)))
- #:headers `((Authorization . ,auth-value))
- #:method (if bytes 'POST 'PUT))))
- (log 'DEBUG "perform upload " file ", response code: "
- (response-code response))
-
- (when (>= (response-code response) 400)
- (raise-exception
- (make-exception-with-message
- (coordinator-handle-failed-request
- log
- 'PUT
- (uri-path upload-uri)
- response
- body))))))))))))
-
- (unless (and=>
- (get-completed-upload-bytes)
- (lambda (uploaded-bytes)
- (= uploaded-bytes file-size)))
- (retry-on-error perform-upload
- #:times 100
- #:delay 15
- #:error-hook
- (lambda _
- (log 'DEBUG
- "perform-upload " file
- " (bytes: " file-size ", "
- "md5: " (force file-md5-hash-promise)
- ")")))))
+ (let* ((upload-uri
+ (coordinator-uri-for-path
+ (slot-ref interface 'coordinator-uri)
+ (string-append "/build/" build-id "/output/" output-name
+ (if (integer? bytes)
+ "/partial"
+ ""))))
+ (bytes-to-send
+ (if bytes
+ (- file-size bytes)
+ file-size))
+ (response
+ body
+ (call-with-streaming-http-request
+ upload-uri
+ bytes-to-send
+ (lambda (port)
+ (with-time-logging
+ (simple-format #f "sending ~A" file)
+ (dump-port* file-port port
+ #:reporter reporter)))
+ #:headers `((Authorization . ,auth-value))
+ #:method (if bytes 'POST 'PUT))))
+
+ (log 'DEBUG "perform upload " file ", response code: "
+ (response-code response))
+
+ (when (>= (response-code response) 400)
+ (raise-exception
+ (make-exception-with-message
+ (coordinator-handle-failed-request
+ log
+ 'PUT
+ (uri-path upload-uri)
+ response
+ body))))))
+ #:binary #t)))))
+
+ (unless (and=>
+ (get-completed-upload-bytes)
+ (lambda (uploaded-bytes)
+ (= uploaded-bytes file-size)))
+ (retry-on-error perform-upload
+ #:times 100
+ #:delay 15
+ #:error-hook
+ (lambda _
+ (log 'DEBUG
+ "perform-upload " file
+ " (bytes: " file-size ", "
+ "md5: " (force file-md5-hash-promise)
+ ")")))))
args))
(define-method (submit-log-file
@@ -452,21 +454,17 @@
(retry-on-error
(lambda ()
- (let-values (((response body)
- ;; TODO This chunks the transfer, but it doesn't need to
- ;; since the length of the body is known. The chunking
- ;; is useful however, as it matches up with the
- ;; read-request-body hack to avoid reading the entire
- ;; request body in to memory.
- (call-with-streaming-http-request
- uri
- (lambda (request-port)
- (call-with-input-file file
- (lambda (file-port)
- (dump-port file-port request-port
- #:buffer-size (expt 2 20)))
- #:binary #t))
- #:headers `((Authorization . ,auth-value)))))
+ (let ((response
+ body
+ (call-with-streaming-http-request
+ uri
+ (stat:size (stat file))
+ (lambda (request-port)
+ (call-with-input-file file
+ (lambda (file-port)
+ (dump-port file-port request-port))
+ #:binary #t))
+ #:headers `((Authorization . ,auth-value)))))
(if (>= (response-code response) 400)
(raise-exception
(make-exception-with-message
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm
index 4cf1e97..f2b6bab 100644
--- a/guix-build-coordinator/agent-messaging/http/server.scm
+++ b/guix-build-coordinator/agent-messaging/http/server.scm
@@ -55,7 +55,7 @@
(define (bad-request message . args)
(throw 'bad-request message args))
-(define (fixed/read-request-body r)
+(define (read-request-body/patch r)
"Reads the request body from R, as a bytevector. Return ‘#f’
if there was no request body."
(cond
@@ -64,6 +64,12 @@ if there was no request body."
(make-chunked-input-port (request-port r)
;; closing the port is handled elsewhere
#:keep-alive? #t))
+ ;; Since the guile-fibers webserver/Guile force reading the request body as
+ ;; a bytevector, patching read-request-body in this way to detect this
+ ;; custom header allows handling some request bodies without reading it in
+ ;; to a bytevector. This is used when handling file uploads.
+ ((assq-ref (request-headers r) 'stream-body)
+ (request-port r))
(else
(let ((nbytes (request-content-length r)))
(and nbytes
@@ -75,7 +81,7 @@ if there was no request body."
(module-set! (resolve-module '(web request))
'read-request-body
- fixed/read-request-body)
+ read-request-body/patch)
(define* (port-hash* algorithm port
#:key (reporter progress-reporter/silent))
@@ -308,6 +314,69 @@ port. Also, the port used can be changed by passing the --port option.\n"
#:binary #t))
#:unwind? #t)))
+(define* (receive-file body
+ length
+ output-file-name
+ #:key append?)
+ (define body-port
+ (if (bytevector? body)
+ ;; If the Stream-Body header isn't set, then it's possible
+ ;; that the body will be a port
+ (open-bytevector-input-port body)
+ body))
+
+ (call-with-port
+ (if append?
+ (open-file output-file-name "a")
+ (open-output-file output-file-name #:binary #t))
+ (lambda (output-port)
+ (let ((start-time (current-time time-utc)))
+
+ (define output-progress
+ (rate-limited
+ (lambda (bytes-read)
+ (display
+ (simple-format
+ #f
+ "receiving ~A
+ ~AMB read so far...
+"
+ (basename output-file-name)
+ (format #f "~2,2f"
+ (/ bytes-read
+ 1000000)))))
+ (make-time time-duration 0 20)))
+
+ (dump-port
+ body-port
+ output-port
+ length
+ #:progress
+ (lambda (bytes-transfered continue-thunk)
+ (output-progress bytes-transfered)
+ (continue-thunk)))
+
+ (let* ((end-time (current-time time-utc))
+ (elapsed (time-difference end-time
+ start-time))
+ (seconds-elapsed
+ (+ (time-second elapsed)
+ (/ (time-nanosecond elapsed) 1e9))))
+ (display
+ (simple-format
+ #f
+ "received ~A
+ took ~A seconds
+ data transfered: ~AMB (~A bytes)
+ speed (MB/s): ~A
+"
+ (basename output-file-name)
+ seconds-elapsed
+ (format #f "~2,2f" (/ length 1000000))
+ length
+ (format #f "~2,2f" (/ (/ length 1000000)
+ seconds-elapsed)))))))))
+
(define (controller request
method-and-path-components
body
@@ -483,19 +552,26 @@ port. Also, the port used can be changed by passing the --port option.\n"
(lambda (file)
(not (member file '("." ".."))))))
- (if (bytevector? body)
- (call-with-output-file tmp-output-file-name
- (lambda (output-port)
- (put-bytevector output-port body)))
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (call-with-output-file tmp-output-file-name
- (lambda (output-port)
- (let loop ((bv (get-bytevector-some body)))
- (unless (eof-object? bv)
- (put-bytevector output-port bv)
- (loop (get-bytevector-some body)))))))))
+ (let ((body-port
+ (if (bytevector? body)
+ ;; If the Stream-Body header isn't set, then it's possible
+ ;; that the body will be a port
+ (open-bytevector-input-port body)
+ body)))
+ (call-with-output-file tmp-output-file-name
+ (lambda (output-port)
+ ;; Older agents may still attempt to use chunked encoding
+ ;; for this request
+ (if (member '(chunked) (request-transfer-encoding request))
+ (call-with-worker-thread
+ chunked-request-channel
+ (lambda ()
+ (dump-port body-port
+ output-port
+ (request-content-length request))))
+ (dump-port body-port
+ output-port
+ (request-content-length request))))))
(rename-file tmp-output-file-name
output-file-name)
(no-content))
@@ -522,59 +598,6 @@ port. Also, the port used can be changed by passing the --port option.\n"
(let ((agent-id-for-build
(datastore-agent-for-build datastore uuid)))
- (define (receive-file output-file-name tmp-output-file-name)
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (call-with-output-file tmp-output-file-name
- (lambda (output-port)
- (let ((start-time (current-time time-utc)))
- (let loop ((bv (get-bytevector-some body))
- (bytes-read 0)
- (last-progress-update-bytes-read 0))
- (if (eof-object? bv)
- (let* ((end-time (current-time time-utc))
- (elapsed (time-difference end-time
- start-time))
- (seconds-elapsed
- (+ (time-second elapsed)
- (/ (time-nanosecond elapsed) 1e9))))
- (display
- (simple-format
- #f
- "received ~A
- took ~A seconds
- data transfered: ~AMB (~A bytes)
- speed (MB/s): ~A
-"
- (basename output-file-name)
- seconds-elapsed
- (format #f "~2,2f" (/ bytes-read 1000000))
- bytes-read
- (format #f "~2,2f" (/ (/ bytes-read 1000000)
- seconds-elapsed)))))
- (begin
- (put-bytevector output-port bv)
- (loop (get-bytevector-some body)
- (+ bytes-read
- (bytevector-length bv))
- (if (> (- bytes-read
- last-progress-update-bytes-read)
- 50000000) ; ~50MB
- (begin
- (display
- (simple-format
- #f
- "receiving ~A
- ~AMB read so far...
-"
- (basename output-file-name)
- (format #f "~2,2f"
- (/ bytes-read
- 1000000))))
- bytes-read)
- last-progress-update-bytes-read)))))))))))
-
(if (authenticated? agent-id-for-build request)
(let* ((output-file-name
(build-output-file-location datastore uuid output-name))
@@ -595,56 +618,46 @@ port. Also, the port used can be changed by passing the --port option.\n"
"deleting " tmp-output-file-name)
(delete-file tmp-output-file-name))
- (if (bytevector? body)
- (begin
- (call-with-output-file tmp-output-file-name
- (lambda (output-port)
- (put-bytevector output-port body)))
-
- (let ((hash (compute-hash-of-uploaded-output
- logger
- tmp-output-file-name)))
- (log-msg logger
- 'DEBUG
- "computed the hash of " tmp-output-file-name
- ", renaming")
-
- (rename-file tmp-output-file-name
- output-file-name)
-
- (call-with-output-file (string-append output-file-name ".hash")
- (lambda (port)
- (simple-format port "~A\n" hash))))
-
- (no-content))
- (begin
- (receive-file output-file-name
- tmp-output-file-name)
-
- (list
- (build-response
- #:code 200
- #:headers '((content-type . (text/plain))
- (Transfer-Encoding . "chunked")))
- (lambda (response-port)
- ;; Compute the hash of the file
- (let* ((reporter (progress-reporter/hash
- (stat:size (stat tmp-output-file-name))
- response-port))
- (hash (compute-hash-of-uploaded-output
- logger
- tmp-output-file-name
- #:reporter reporter)))
- (log-msg logger
- 'DEBUG
- "computed the hash of " tmp-output-file-name ", renaming")
-
- (call-with-output-file (string-append output-file-name ".hash")
- (lambda (port)
- (simple-format port "~A\n" hash))))
-
- (rename-file tmp-output-file-name
- output-file-name))))))
+ (if (member '(chunked) (request-transfer-encoding request))
+ ;; Older agents may use chunked encoding for this request
+ (call-with-worker-thread
+ chunked-request-channel
+ (lambda ()
+ (receive-file body
+ (request-content-length request)
+ tmp-output-file-name)))
+ (receive-file body
+ (request-content-length request)
+ tmp-output-file-name))
+
+ (list
+ (build-response
+ #:code 200
+ #:headers '((content-type . (text/plain))
+ (Transfer-Encoding . "chunked")))
+ (lambda (response-port)
+ ;; Compute the hash of the file
+ (let* ((reporter (progress-reporter/hash
+ (stat:size (stat tmp-output-file-name))
+ response-port))
+ (hash
+ (call-with-worker-thread
+ chunked-request-channel
+ (lambda ()
+ (compute-hash-of-uploaded-output
+ logger
+ tmp-output-file-name
+ #:reporter reporter)))))
+ (log-msg logger
+ 'DEBUG
+ "computed the hash of " tmp-output-file-name ", renaming")
+
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" hash))))
+
+ (rename-file tmp-output-file-name
+ output-file-name))))
(render-json
'(("error" . "access denied"))
#:code 403))))
@@ -670,60 +683,6 @@ port. Also, the port used can be changed by passing the --port option.\n"
(let ((agent-id-for-build
(datastore-agent-for-build datastore uuid)))
- (define (receive-file output-file-name tmp-output-file-name)
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (let ((output-port (open-file tmp-output-file-name "a")))
- (let ((start-time (current-time time-utc)))
- (let loop ((bv (get-bytevector-some body))
- (bytes-read 0)
- (last-progress-update-bytes-read 0))
- (if (eof-object? bv)
- (let* ((end-time (current-time time-utc))
- (elapsed (time-difference end-time
- start-time))
- (seconds-elapsed
- (+ (time-second elapsed)
- (/ (time-nanosecond elapsed) 1e9))))
- (display
- (simple-format
- #f
- "received ~A
- took ~A seconds
- data transfered: ~AMB (~A bytes)
- speed (MB/s): ~A
-"
- (basename output-file-name)
- seconds-elapsed
- (format #f "~2,2f"
- (/ bytes-read 1000000))
- bytes-read
- (format #f "~2,2f" (/ (/ bytes-read 1000000)
- seconds-elapsed)))))
- (begin
- (put-bytevector output-port bv)
- (loop (get-bytevector-some body)
- (+ bytes-read
- (bytevector-length bv))
- (if (> (- bytes-read
- last-progress-update-bytes-read)
- 50000000) ; ~50MB
- (begin
- (display
- (simple-format
- #f
- "receiving ~A
- ~AMB read so far...
-"
- (basename output-file-name)
- (format #f "~2,2f"
- (/ bytes-read
- 1000000))))
- bytes-read)
- last-progress-update-bytes-read))))))
- (close-port output-port)))))
-
(if (authenticated? agent-id-for-build request)
(let* ((output-file-name
(build-output-file-location datastore uuid output-name))
@@ -739,55 +698,48 @@ port. Also, the port used can be changed by passing the --port option.\n"
"deleting " output-file-name)
(delete-file output-file-name))
- (if (bytevector? body)
- (let ((output-port (open-file tmp-output-file-name "a")))
- (put-bytevector output-port body)
- (close-port output-port)
-
- (let ((hash (compute-hash-of-uploaded-output
- logger
- tmp-output-file-name)))
- (log-msg logger
- 'DEBUG
- "computed the hash of " tmp-output-file-name
- ", renaming")
-
- (rename-file tmp-output-file-name
- output-file-name)
-
- (call-with-output-file (string-append output-file-name ".hash")
- (lambda (port)
- (simple-format port "~A\n" hash))))
-
- (no-content))
- (begin
- (receive-file output-file-name
- tmp-output-file-name)
-
- (list
- (build-response
- #:code 200
- #:headers '((content-type . (text/plain))
- (Transfer-Encoding . "chunked")))
- (lambda (response-port)
- ;; Compute the hash of the file
- (let* ((reporter (progress-reporter/hash
- (stat:size (stat tmp-output-file-name))
- response-port))
- (hash (compute-hash-of-uploaded-output
- logger
- tmp-output-file-name
- #:reporter reporter)))
- (log-msg logger
- 'DEBUG
- "computed the hash of " tmp-output-file-name ", renaming")
-
- (call-with-output-file (string-append output-file-name ".hash")
- (lambda (port)
- (simple-format port "~A\n" hash))))
-
- (rename-file tmp-output-file-name
- output-file-name))))))
+ (if (member '(chunked) (request-transfer-encoding request))
+ ;; Older agents may use chunked encoding for this request
+ (call-with-worker-thread
+ chunked-request-channel
+ (lambda ()
+ (receive-file body
+ (request-content-length request)
+ tmp-output-file-name
+ #:append? #t)))
+ (receive-file body
+ (request-content-length request)
+ tmp-output-file-name
+ #:append? #t))
+
+ (list
+ (build-response
+ #:code 200
+ #:headers '((content-type . (text/plain))
+ (Transfer-Encoding . "chunked")))
+ (lambda (response-port)
+ ;; Compute the hash of the file
+ (let* ((reporter (progress-reporter/hash
+ (stat:size (stat tmp-output-file-name))
+ response-port))
+ (hash
+ (call-with-worker-thread
+ chunked-request-channel
+ (lambda ()
+ (compute-hash-of-uploaded-output
+ logger
+ tmp-output-file-name
+ #:reporter reporter)))))
+ (log-msg logger
+ 'DEBUG
+ "computed the hash of " tmp-output-file-name ", renaming")
+
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" hash))))
+
+ (rename-file tmp-output-file-name
+ output-file-name))))
(render-json
'(("error" . "access denied"))
#:code 403))))
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index 2fe7948..7f4bf0c 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -304,7 +304,9 @@
(setvbuf ret 'block buffering)
ret))
-(define* (call-with-streaming-http-request uri callback
+(define* (call-with-streaming-http-request uri
+ content-length
+ callback
#:key (headers '())
(method 'PUT))
(with-port-timeouts
@@ -316,8 +318,10 @@
#:method method
#:version '(1 . 1)
#:headers `((connection close)
- (Transfer-Encoding . "chunked")
+ (content-length . ,content-length)
(Content-Type . "application/octet-stream")
+ ;; read-request-body/patch looks for this header
+ (Stream-Body . "true")
,@headers)
#:port port)))
@@ -325,26 +329,19 @@
(setvbuf port 'block (expt 2 13))
(with-exception-handler
(lambda (exp)
- (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp)
+ (simple-format #t "error: ~A ~A: ~A\n"
+ method (uri-path uri) exp)
(close-port port)
(raise-exception exp))
(lambda ()
(let ((request (write-request request port)))
- (let* ((chunked-output-port
- (make-chunked-output-port*
- port
- #:buffering (expt 2 12)
- #:keep-alive? #t)))
-
- (set-port-encoding! chunked-output-port "ISO-8859-1")
- (callback chunked-output-port)
- (close-port chunked-output-port)
-
- (let ((response (read-response port)))
- (let ((body (read-response-body response)))
- (close-port port)
- (values response
- body)))))))))))
+ (callback port)
+
+ (let ((response (read-response port)))
+ (let ((body (read-response-body response)))
+ (close-port port)
+ (values response
+ body))))))))))
(define (find-missing-substitutes-for-output store substitute-urls output)
(if (valid-path? store output)