aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-build-coordinator/agent-messaging/http.scm6
-rw-r--r--guix-build-coordinator/utils.scm68
2 files changed, 70 insertions, 4 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm
index a561b18..75689c2 100644
--- a/guix-build-coordinator/agent-messaging/http.scm
+++ b/guix-build-coordinator/agent-messaging/http.scm
@@ -56,9 +56,9 @@
if there was no request body."
(cond
((member '(chunked) (request-transfer-encoding r))
- (make-chunked-input-port (request-port r)
- ;; closing the port is handled elsewhere
- #:keep-alive? #t))
+ (make-chunked-input-port* (request-port r)
+ ;; closing the port is handled elsewhere
+ #:keep-alive? #t))
(else
(let ((nbytes (request-content-length r)))
(and nbytes
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index 975853c..49bcfe5 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -4,6 +4,7 @@
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
+ #:use-module (ice-9 rdelim)
#:use-module (ice-9 binary-ports)
#:use-module (rnrs bytevectors)
#:use-module (web uri)
@@ -27,6 +28,7 @@
make-base64-output-port
call-with-streaming-http-request
+ make-chunked-input-port*
substitute-derivation
@@ -170,6 +172,68 @@ If already in the worker thread, call PROC immediately."
#f
close))
+;; Chunked Responses
+(define (read-chunk-header port)
+ "Read a chunk header from PORT and return the size in bytes of the
+upcoming chunk."
+ (match (read-line port)
+ ((? eof-object?)
+ ;; Connection closed prematurely: there's nothing left to read.
+ 0)
+ (str
+ (let ((extension-start (string-index str
+ (lambda (c)
+ (or (char=? c #\;)
+ (char=? c #\return))))))
+ (string->number (if extension-start ; unnecessary?
+ (substring str 0 extension-start)
+ str)
+ 16)))))
+
+(define* (make-chunked-input-port* port #:key (keep-alive? #f))
+ (define (close)
+ (unless keep-alive?
+ (close-port port)))
+
+ (define chunk-size 0) ;size of the current chunk
+ (define remaining 0) ;number of bytes left from the current chunk
+ (define finished? #f) ;did we get all the chunks?
+
+ (define (read! bv idx to-read)
+ (define (loop to-read num-read)
+ (cond ((or finished? (zero? to-read))
+ num-read)
+ ((zero? remaining) ;get a new chunk
+ (let ((size (read-chunk-header port)))
+ (set! chunk-size size)
+ (set! remaining size)
+ (cond
+ ((zero? size)
+ (set! finished? #t)
+ (get-bytevector-n port 2) ; \r\n follows the last chunk
+ num-read)
+ (else
+ (loop to-read num-read)))))
+ (else ;read from the current chunk
+ (let* ((ask-for (min to-read remaining))
+ (read (get-bytevector-n! port bv (+ idx num-read)
+ ask-for)))
+ (cond
+ ((eof-object? read) ;premature termination
+ (set! finished? #t)
+ num-read)
+ (else
+ (let ((left (- remaining read)))
+ (set! remaining left)
+ (when (zero? left)
+ ;; We're done with this chunk; read CR and LF.
+ (get-u8 port) (get-u8 port))
+ (loop (- to-read read)
+ (+ num-read read)))))))))
+ (loop to-read 0))
+
+ (make-custom-binary-input-port "chunked input port" read! #f #f close))
+
(define* (call-with-streaming-http-request uri callback
#:key (headers '()))
(let* ((port (open-socket-for-uri uri))
@@ -196,7 +260,9 @@ If already in the worker thread, call PROC immediately."
(make-base64-output-port chunked-output-port)))
(callback base64-output-port)
(close-port base64-output-port)
- (close-port chunked-output-port))
+ (close-port chunked-output-port)
+ (display "\r\n" port)
+ (force-output port))
(let ((response (read-response port)))
(let ((body (read-response-body response)))