diff options
-rw-r--r-- | guix-build-coordinator/agent-messaging/http.scm | 6 | ||||
-rw-r--r-- | guix-build-coordinator/utils.scm | 68 |
2 files changed, 70 insertions, 4 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm index a561b18..75689c2 100644 --- a/guix-build-coordinator/agent-messaging/http.scm +++ b/guix-build-coordinator/agent-messaging/http.scm @@ -56,9 +56,9 @@ if there was no request body." (cond ((member '(chunked) (request-transfer-encoding r)) - (make-chunked-input-port (request-port r) - ;; closing the port is handled elsewhere - #:keep-alive? #t)) + (make-chunked-input-port* (request-port r) + ;; closing the port is handled elsewhere + #:keep-alive? #t)) (else (let ((nbytes (request-content-length r))) (and nbytes diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index 975853c..49bcfe5 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -4,6 +4,7 @@ #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) + #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (rnrs bytevectors) #:use-module (web uri) @@ -27,6 +28,7 @@ make-base64-output-port call-with-streaming-http-request + make-chunked-input-port* substitute-derivation @@ -170,6 +172,68 @@ If already in the worker thread, call PROC immediately." #f close)) +;; Chunked Responses +(define (read-chunk-header port) + "Read a chunk header from PORT and return the size in bytes of the +upcoming chunk." + (match (read-line port) + ((? eof-object?) + ;; Connection closed prematurely: there's nothing left to read. + 0) + (str + (let ((extension-start (string-index str + (lambda (c) + (or (char=? c #\;) + (char=? c #\return)))))) + (string->number (if extension-start ; unnecessary? + (substring str 0 extension-start) + str) + 16))))) + +(define* (make-chunked-input-port* port #:key (keep-alive? #f)) + (define (close) + (unless keep-alive? + (close-port port))) + + (define chunk-size 0) ;size of the current chunk + (define remaining 0) ;number of bytes left from the current chunk + (define finished? #f) ;did we get all the chunks? + + (define (read! bv idx to-read) + (define (loop to-read num-read) + (cond ((or finished? (zero? to-read)) + num-read) + ((zero? remaining) ;get a new chunk + (let ((size (read-chunk-header port))) + (set! chunk-size size) + (set! remaining size) + (cond + ((zero? size) + (set! finished? #t) + (get-bytevector-n port 2) ; \r\n follows the last chunk + num-read) + (else + (loop to-read num-read))))) + (else ;read from the current chunk + (let* ((ask-for (min to-read remaining)) + (read (get-bytevector-n! port bv (+ idx num-read) + ask-for))) + (cond + ((eof-object? read) ;premature termination + (set! finished? #t) + num-read) + (else + (let ((left (- remaining read))) + (set! remaining left) + (when (zero? left) + ;; We're done with this chunk; read CR and LF. + (get-u8 port) (get-u8 port)) + (loop (- to-read read) + (+ num-read read))))))))) + (loop to-read 0)) + + (make-custom-binary-input-port "chunked input port" read! #f #f close)) + (define* (call-with-streaming-http-request uri callback #:key (headers '())) (let* ((port (open-socket-for-uri uri)) @@ -196,7 +260,9 @@ If already in the worker thread, call PROC immediately." (make-base64-output-port chunked-output-port))) (callback base64-output-port) (close-port base64-output-port) - (close-port chunked-output-port)) + (close-port chunked-output-port) + (display "\r\n" port) + (force-output port)) (let ((response (read-response port))) (let ((body (read-response-body response))) |