diff options
Diffstat (limited to 'nar-herder/storage.scm')
-rw-r--r-- | nar-herder/storage.scm | 91 |
1 files changed, 81 insertions, 10 deletions
diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm index 560eadd..8822f31 100644 --- a/nar-herder/storage.scm +++ b/nar-herder/storage.scm @@ -230,6 +230,79 @@ '() files)))) +(define (at-most max-length lst) + "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise +return its MAX-LENGTH first elements and its tail." + (let loop ((len 0) + (lst lst) + (result '())) + (match lst + (() + (values (reverse result) '())) + ((head . tail) + (if (>= len max-length) + (values (reverse result) lst) + (loop (+ 1 len) tail (cons head result))))))) + +(define %max-cached-connections + ;; Maximum number of connections kept in cache by + ;; 'open-connection-for-uri/cached'. + 16) + +(define open-socket-for-uri/cached + (let ((cache '())) + (lambda* (uri #:key fresh? verify-certificate?) + "Return a connection for URI, possibly reusing a cached connection. +When FRESH? is true, delete any cached connections for URI and open a new one. +Return #f if URI's scheme is 'file' or #f. + +When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." + (define host (uri-host uri)) + (define scheme (uri-scheme uri)) + (define key (list host scheme (uri-port uri))) + + (and (not (memq scheme '(file #f))) + (match (assoc-ref cache key) + (#f + ;; Open a new connection to URI and evict old entries from + ;; CACHE, if any. + (let ((socket + (open-socket-for-uri* + uri + #:verify-certificate? verify-certificate?)) + (new-cache evicted + (at-most (- %max-cached-connections 1) cache))) + (for-each (match-lambda + ((_ . port) + (false-if-exception (close-port port)))) + evicted) + (set! cache (alist-cons key socket new-cache)) + socket)) + (socket + (if (or fresh? (port-closed? socket)) + (begin + (false-if-exception (close-port socket)) + (set! cache (alist-delete key cache)) + (open-socket-for-uri/cached uri + #:verify-certificate? + verify-certificate?)) + (begin + ;; Drain input left from the previous use. + (drain-input socket) + socket)))))))) + +(define (call-with-cached-connection uri proc) + (with-exception-handler + (lambda (exn) + ;; Get a new connection, in case this exception is the result + ;; of a connection issue + (open-socket-for-uri/cached uri #:fresh? #t) + + (raise-exception exn)) + (lambda () + (proc (open-socket-for-uri/cached uri))) + #:unwind? #t)) + (define (start-nar-removal-thread database storage-root storage-limit metrics-registry @@ -269,22 +342,20 @@ (lambda () (with-port-timeouts (lambda () - (let ((port - socket - (open-socket-for-uri* uri))) - (set-socket-timeout socket #:seconds 30) - - (http-get uri - #:port port - #:decode-body? #f))))) + (call-with-cached-connection uri + (lambda (port) + (http-get uri + #:port port + #:decode-body? #f + #:keep-alive? #t + #:streaming? #t)))))) #:times 3 #:delay 5)) (lambda (response body) (and (= (response-code response) 200) - (let ((json-body (json-string->scm - (utf8->string body)))) + (let ((json-body (json->scm body))) (eq? (assoc-ref json-body "stored") #t))))))))) |