aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-build-coordinator/agent-messaging/http.scm64
-rw-r--r--guix-build-coordinator/utils.scm67
2 files changed, 48 insertions, 83 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm
index 9e1205e..3f539c4 100644
--- a/guix-build-coordinator/agent-messaging/http.scm
+++ b/guix-build-coordinator/agent-messaging/http.scm
@@ -263,60 +263,18 @@
(with-store store
(query-path-info store file)))
- ;; For small outputs, compress while sending the data, but for bigger
- ;; store items, do all the compression up front to hopefully reduce the
- ;; time to send them.
- (if (< (path-info-nar-size path-info)
- 1000000) ; 1MB
- (retry-on-error
- (lambda ()
- (call-with-streaming-http-request
- uri
- (lambda (port)
- (call-with-lzip-output-port port
- (lambda (port)
- (write-file file port))
- #:level 9))
- #:headers `((Authorization . ,auth-value))))
- #:times 6
- #:delay 15)
- (let* ((directory (or (getenv "TMPDIR") "/tmp"))
- (template (string-append directory
- "/guix-build-coordinator-file.XXXXXX"))
- (out (mkstemp! template)))
- (log 'INFO "compressing " file " -> " template " prior to sending")
- (call-with-lzip-output-port out
+ (retry-on-error
+ (lambda ()
+ (call-with-streaming-http-request
+ uri
+ (lambda (port)
+ (call-with-lzip-output-port port
(lambda (port)
(write-file file port))
- #:level 9)
- (close-port out)
-
- (log 'INFO "finished compressing " file ", now sending")
- (retry-on-error
- (lambda ()
- (call-with-input-file template
- (lambda (file-port)
- (let-values (((response body)
- (call-with-streaming-http-request
- uri
- (lambda (port)
- (with-time-logging
- (simple-format #f "sending ~A" file)
- (dump-port file-port port
- #:buffer-size 65536)))
- #:headers `((Authorization . ,auth-value)))))
- (when (>= (response-code response) 400)
- (raise-exception
- (make-exception-with-message
- (coordinator-handle-failed-request log
- 'PUT
- (uri-path uri)
- response
- body))))))))
- #:times 12
- #:delay (random 15))
-
- (delete-file template))))
+ #:level 9))
+ #:headers `((Authorization . ,auth-value))))
+ #:times 12
+ #:delay (random 15)))
args))
(define-method (submit-log-file
@@ -355,7 +313,7 @@
(call-with-input-file file
(lambda (file-port)
(dump-port file-port request-port
- #:buffer-size 65536))
+ #:buffer-size (expt 2 20)))
#:binary #t))
#:headers `((Authorization . ,auth-value)))))
(if (>= (response-code response) 400)
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index b221199..9887d05 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -6,6 +6,7 @@
#:use-module (ice-9 q)
#:use-module (ice-9 ftw)
#:use-module (ice-9 popen)
+ #:use-module (ice-9 iconv)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
@@ -254,30 +255,41 @@ upcoming chunk."
'())))
(define (with-gc-protection thunk)
- (monitor
- (dynamic-wind
- gc-disable
- thunk
- gc-enable)))
-
-(define (make-gc-guard-port port)
- (define (%put-char c)
- (write c port))
+ (dynamic-wind
+ gc-disable
+ thunk
+ gc-enable))
+(define* (make-chunked-output-port* port #:key (keep-alive? #f)
+ (buffering 1200))
(define (%put-string s)
- (display s port))
+ (let ((length (string-length s)))
+ (unless (eq? length 0)
+ (with-gc-protection
+ (lambda ()
+ (put-string port (number->string length 16))
+ (put-string port "\r\n")
+ (write s port)
+ (put-string port "\r\n"))))))
+ (define (%put-char c)
+ (%put-string (list->string (list c))))
(define (flush)
+ (force-output port))
+ (define (safe-flush)
+ (with-gc-protection flush))
+ (define (close)
(with-gc-protection
(lambda ()
- (force-output port))))
-
- (define (close)
- (close-port port))
-
- (make-soft-port
- (vector %put-char %put-string flush #f close)
- "w"))
+ (flush)
+ (put-string port "0\r\n\r\n")
+ (force-output port)
+ (unless keep-alive?
+ (close-port port)))))
+ (let ((ret (make-soft-port
+ (vector %put-char %put-string safe-flush #f close) "w")))
+ (setvbuf ret 'block buffering)
+ ret))
(define* (call-with-streaming-http-request uri callback
#:key (headers '()))
@@ -294,7 +306,7 @@ upcoming chunk."
#:port port)))
(set-port-encoding! port "ISO-8859-1")
- (setvbuf port 'block 65536)
+ (setvbuf port 'block (expt 2 13))
(with-exception-handler
(lambda (exp)
(simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp)
@@ -302,14 +314,11 @@ upcoming chunk."
(raise-exception exp))
(lambda ()
(let ((request (write-request request port)))
- (let ((chunked-output-port
- (make-gc-guard-port
- (make-chunked-output-port
+ (let* ((chunked-output-port
+ (make-chunked-output-port*
port
- #:buffering 65536
- #:keep-alive? #t))))
-
- (setvbuf chunked-output-port 'block 1048576)
+ #:buffering (expt 2 12)
+ #:keep-alive? #t)))
;; A SIGPIPE will kill Guile, so ignore it
(sigaction SIGPIPE
@@ -318,12 +327,10 @@ upcoming chunk."
(set-port-encoding! chunked-output-port "ISO-8859-1")
(callback chunked-output-port)
+ (close-port chunked-output-port)
+
(with-gc-protection
(lambda ()
- (close-port chunked-output-port)
- (display "\r\n" port)
- (force-output port)
-
(let ((response (read-response port)))
(let ((body (read-response-body response)))
(close-port port)