aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix/http-client.scm12
-rw-r--r--guix/progress.scm8
-rwxr-xr-xguix/scripts/substitute.scm103
-rw-r--r--nix/libstore/build.cc29
4 files changed, 117 insertions, 35 deletions
diff --git a/guix/http-client.scm b/guix/http-client.scm
index a767175d67..553640fe9e 100644
--- a/guix/http-client.scm
+++ b/guix/http-client.scm
@@ -1,5 +1,5 @@
;;; GNU Guix --- Functional package management for GNU
-;;; Copyright © 2012, 2013, 2014, 2015, 2016, 2017, 2018 Ludovic Courtès <ludo@gnu.org>
+;;; Copyright © 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2020 Ludovic Courtès <ludo@gnu.org>
;;; Copyright © 2015 Mark H Weaver <mhw@netris.org>
;;; Copyright © 2012, 2015 Free Software Foundation, Inc.
;;; Copyright © 2017 Tobias Geerinckx-Rice <me@tobias.gr>
@@ -70,6 +70,7 @@
(define* (http-fetch uri #:key port (text? #f) (buffered? #t)
+ (keep-alive? #f)
(verify-certificate? #t)
(headers '((user-agent . "GNU Guile")))
timeout)
@@ -79,6 +80,9 @@ textual. Follow any HTTP redirection. When BUFFERED? is #f, return an
unbuffered port, suitable for use in `filtered-port'. HEADERS is an alist of
extra HTTP headers.
+When KEEP-ALIVE? is true, the connection is marked as 'keep-alive' and PORT is
+not closed upon completion.
+
When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates.
TIMEOUT specifies the timeout in seconds for connection establishment; when
@@ -104,11 +108,7 @@ Raise an '&http-get-error' condition if downloading fails."
(setvbuf port 'none))
(let*-values (((resp data)
(http-get uri #:streaming? #t #:port port
- ;; XXX: When #:keep-alive? is true, if DATA is
- ;; a chunked-encoding port, closing DATA won't
- ;; close PORT, leading to a file descriptor
- ;; leak.
- #:keep-alive? #f
+ #:keep-alive? keep-alive?
#:headers headers))
((code)
(response-code resp)))
diff --git a/guix/progress.scm b/guix/progress.scm
index fec65b424c..cd80ae620a 100644
--- a/guix/progress.scm
+++ b/guix/progress.scm
@@ -337,9 +337,10 @@ should be a <progress-reporter> object."
(report total)
(loop total (get-bytevector-n! in buffer 0 buffer-size))))))))
-(define (progress-report-port reporter port)
+(define* (progress-report-port reporter port #:key (close? #t))
"Return a port that continuously reports the bytes read from PORT using
-REPORTER, which should be a <progress-reporter> object."
+REPORTER, which should be a <progress-reporter> object. When CLOSE? is true,
+PORT is closed when the returned port is closed."
(match reporter
(($ <progress-reporter> start report stop)
(let* ((total 0)
@@ -364,5 +365,6 @@ REPORTER, which should be a <progress-reporter> object."
;; trace.
(unless (zero? total)
(stop))
- (close-port port)))))))
+ (when close?
+ (close-port port))))))))
diff --git a/guix/scripts/substitute.scm b/guix/scripts/substitute.scm
index 73abd3f029..25075eedff 100755
--- a/guix/scripts/substitute.scm
+++ b/guix/scripts/substitute.scm
@@ -188,9 +188,14 @@ again."
(sigaction SIGALRM SIG_DFL)
(apply values result)))))
-(define* (fetch uri #:key (buffered? #t) (timeout? #t))
+(define* (fetch uri #:key (buffered? #t) (timeout? #t)
+ (keep-alive? #f) (port #f))
"Return a binary input port to URI and the number of bytes it's expected to
-provide."
+provide.
+
+When PORT is true, use it as the underlying I/O port for HTTP transfers; when
+PORT is false, open a new connection for URI. When KEEP-ALIVE? is true, the
+connection (typically PORT) is kept open once data has been fetched from URI."
(case (uri-scheme uri)
((file)
(let ((port (open-file (uri-path uri)
@@ -206,7 +211,7 @@ provide."
;; sudo tc qdisc add dev eth0 root netem delay 1500ms
;; and then cancel with:
;; sudo tc qdisc del dev eth0 root
- (let ((port #f))
+ (let ((port port))
(with-timeout (if timeout?
%fetch-timeout
0)
@@ -217,10 +222,11 @@ provide."
(begin
(when (or (not port) (port-closed? port))
(set! port (guix:open-connection-for-uri
- uri #:verify-certificate? #f))
- (unless (or buffered? (not (file-port? port)))
- (setvbuf port 'none)))
+ uri #:verify-certificate? #f)))
+ (unless (or buffered? (not (file-port? port)))
+ (setvbuf port 'none))
(http-fetch uri #:text? #f #:port port
+ #:keep-alive? keep-alive?
#:verify-certificate? #f))))))
(else
(leave (G_ "unsupported substitute URI scheme: ~a~%")
@@ -478,17 +484,17 @@ indicates that PATH is unavailable at CACHE-URL."
(build-request (string->uri url) #:method 'GET #:headers headers)))
(define (at-most max-length lst)
- "If LST is shorter than MAX-LENGTH, return it; otherwise return its
-MAX-LENGTH first elements."
+ "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise
+return its MAX-LENGTH first elements and its tail."
(let loop ((len 0)
(lst lst)
(result '()))
(match lst
(()
- (reverse result))
+ (values (reverse result) '()))
((head . tail)
(if (>= len max-length)
- (reverse result)
+ (values (reverse result) lst)
(loop (+ 1 len) tail (cons head result)))))))
(define* (http-multiple-get base-uri proc seed requests
@@ -962,6 +968,68 @@ the URI, its compression method (a string), and the compressed file size."
(((uri compression file-size) _ ...)
(values uri compression file-size))))
+(define %max-cached-connections
+ ;; Maximum number of connections kept in cache by
+ ;; 'open-connection-for-uri/cached'.
+ 16)
+
+(define open-connection-for-uri/cached
+ (let ((cache '()))
+ (lambda* (uri #:key fresh?)
+ "Return a connection for URI, possibly reusing a cached connection.
+When FRESH? is true, delete any cached connections for URI and open a new
+one. Return #f if URI's scheme is 'file' or #f."
+ (define host (uri-host uri))
+ (define scheme (uri-scheme uri))
+ (define key (list host scheme (uri-port uri)))
+
+ (and (not (memq scheme '(file #f)))
+ (match (assoc-ref cache key)
+ (#f
+ ;; Open a new connection to URI and evict old entries from
+ ;; CACHE, if any.
+ (let-values (((socket)
+ (guix:open-connection-for-uri
+ uri #:verify-certificate? #f))
+ ((new-cache evicted)
+ (at-most (- %max-cached-connections 1) cache)))
+ (for-each (match-lambda
+ ((_ . port)
+ (false-if-exception (close-port port))))
+ evicted)
+ (set! cache (alist-cons key socket new-cache))
+ socket))
+ (socket
+ (if (or fresh? (port-closed? socket))
+ (begin
+ (false-if-exception (close-port socket))
+ (set! cache (alist-delete key cache))
+ (open-connection-for-uri/cached uri))
+ (begin
+ ;; Drain input left from the previous use.
+ (drain-input socket)
+ socket))))))))
+
+(define (call-with-cached-connection uri proc)
+ (let ((port (open-connection-for-uri/cached uri)))
+ (catch #t
+ (lambda ()
+ (proc port))
+ (lambda (key . args)
+ ;; If PORT was cached and the server closed the connection in the
+ ;; meantime, we get EPIPE. In that case, open a fresh connection and
+ ;; retry. We might also get 'bad-response or a similar exception from
+ ;; (web response) later on, once we've sent the request.
+ (if (or (and (eq? key 'system-error)
+ (= EPIPE (system-error-errno `(,key ,@args))))
+ (memq key '(bad-response bad-header bad-header-component)))
+ (proc (open-connection-for-uri/cached uri #:fresh? #t))
+ (apply throw key args))))))
+
+(define-syntax-rule (with-cached-connection uri port exp ...)
+ "Bind PORT with EXP... to a socket connected to URI."
+ (call-with-cached-connection uri (lambda (port) exp ...)))
+
(define* (process-substitution store-item destination
#:key cache-urls acl print-build-trace?)
"Substitute STORE-ITEM (a store file name) from CACHE-URLS, and write it to
@@ -984,10 +1052,12 @@ DESTINATION as a nar file. Verify the substitute against ACL."
(G_ "Downloading ~a...~%") (uri->string uri)))
(let*-values (((raw download-size)
- ;; Note that Hydra currently generates Nars on the fly
- ;; and doesn't specify a Content-Length, so
- ;; DOWNLOAD-SIZE is #f in practice.
- (fetch uri #:buffered? #f #:timeout? #f))
+ ;; 'guix publish' without '--cache' doesn't specify a
+ ;; Content-Length, so DOWNLOAD-SIZE is #f in this case.
+ (with-cached-connection uri port
+ (fetch uri #:buffered? #f #:timeout? #f
+ #:port port
+ #:keep-alive? #t)))
((progress)
(let* ((dl-size (or download-size
(and (equal? compression "none")
@@ -1001,7 +1071,9 @@ DESTINATION as a nar file. Verify the substitute against ACL."
(uri->string uri) dl-size
(current-error-port)
#:abbreviation nar-uri-abbreviation))))
- (progress-report-port reporter raw)))
+ ;; Keep RAW open upon completion so we can later reuse
+ ;; the underlying connection.
+ (progress-report-port reporter raw #:close? #f)))
((input pids)
;; NOTE: This 'progress' port of current process will be
;; closed here, while the child process doing the
@@ -1218,6 +1290,7 @@ default value."
;;; Local Variables:
;;; eval: (put 'with-timeout 'scheme-indent-function 1)
+;;; eval: (put 'with-cached-connection 'scheme-indent-function 2)
;;; End:
;;; substitute.scm ends here
diff --git a/nix/libstore/build.cc b/nix/libstore/build.cc
index 50d300253d..6cfe7aba7e 100644
--- a/nix/libstore/build.cc
+++ b/nix/libstore/build.cc
@@ -3114,17 +3114,24 @@ void SubstitutionGoal::handleChildOutput(int fd, const string & data)
}
if (fd == substituter->fromAgent.readSide) {
- /* Trim whitespace to the right. */
- size_t end = data.find_last_not_of(" \t\n");
- string trimmed = (end != string::npos) ? data.substr(0, end + 1) : data;
-
- if (expectedHashStr == "") {
- expectedHashStr = trimmed;
- } else if (status == "") {
- status = trimmed;
- worker.wakeUp(shared_from_this());
- } else {
- printMsg(lvlError, format("unexpected substituter message '%1%'") % data);
+ /* DATA may consist of several lines. Process them one by one. */
+ string input = data;
+ while (!input.empty()) {
+ /* Process up to the first newline. */
+ size_t end = input.find_first_of("\n");
+ string trimmed = (end != string::npos) ? input.substr(0, end) : input;
+
+ /* Update the goal's state accordingly. */
+ if (expectedHashStr == "") {
+ expectedHashStr = trimmed;
+ } else if (status == "") {
+ status = trimmed;
+ worker.wakeUp(shared_from_this());
+ } else {
+ printMsg(lvlError, format("unexpected substituter message '%1%'") % input);
+ }
+
+ input = (end != string::npos) ? input.substr(end + 1) : "";
}
}
}