aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-07-27 16:08:00 +0100
committerChristopher Baines <mail@cbaines.net>2023-07-27 16:08:00 +0100
commit7138f044f8b4fc5d002bb03fd9b7ed8a3921696c (patch)
tree3eb75642e7edbf9cf913e66e351d4a485050664c
parent53682fac7e00cd2801406edbd014922c1720c347 (diff)
downloadnar-herder-7138f044f8b4fc5d002bb03fd9b7ed8a3921696c.tar
nar-herder-7138f044f8b4fc5d002bb03fd9b7ed8a3921696c.tar.gz
Throw some connection caching in for info requests
To avoid opening a new connection for each request.
-rw-r--r--nar-herder/storage.scm91
-rw-r--r--nar-herder/utils.scm15
2 files changed, 92 insertions, 14 deletions
diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm
index 560eadd..8822f31 100644
--- a/nar-herder/storage.scm
+++ b/nar-herder/storage.scm
@@ -230,6 +230,79 @@
'()
files))))
+(define (at-most max-length lst)
+ "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise
+return its MAX-LENGTH first elements and its tail."
+ (let loop ((len 0)
+ (lst lst)
+ (result '()))
+ (match lst
+ (()
+ (values (reverse result) '()))
+ ((head . tail)
+ (if (>= len max-length)
+ (values (reverse result) lst)
+ (loop (+ 1 len) tail (cons head result)))))))
+
+(define %max-cached-connections
+ ;; Maximum number of connections kept in cache by
+ ;; 'open-connection-for-uri/cached'.
+ 16)
+
+(define open-socket-for-uri/cached
+ (let ((cache '()))
+ (lambda* (uri #:key fresh? verify-certificate?)
+ "Return a connection for URI, possibly reusing a cached connection.
+When FRESH? is true, delete any cached connections for URI and open a new one.
+Return #f if URI's scheme is 'file' or #f.
+
+When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
+ (define host (uri-host uri))
+ (define scheme (uri-scheme uri))
+ (define key (list host scheme (uri-port uri)))
+
+ (and (not (memq scheme '(file #f)))
+ (match (assoc-ref cache key)
+ (#f
+ ;; Open a new connection to URI and evict old entries from
+ ;; CACHE, if any.
+ (let ((socket
+ (open-socket-for-uri*
+ uri
+ #:verify-certificate? verify-certificate?))
+ (new-cache evicted
+ (at-most (- %max-cached-connections 1) cache)))
+ (for-each (match-lambda
+ ((_ . port)
+ (false-if-exception (close-port port))))
+ evicted)
+ (set! cache (alist-cons key socket new-cache))
+ socket))
+ (socket
+ (if (or fresh? (port-closed? socket))
+ (begin
+ (false-if-exception (close-port socket))
+ (set! cache (alist-delete key cache))
+ (open-socket-for-uri/cached uri
+ #:verify-certificate?
+ verify-certificate?))
+ (begin
+ ;; Drain input left from the previous use.
+ (drain-input socket)
+ socket))))))))
+
+(define (call-with-cached-connection uri proc)
+ (with-exception-handler
+ (lambda (exn)
+ ;; Get a new connection, in case this exception is the result
+ ;; of a connection issue
+ (open-socket-for-uri/cached uri #:fresh? #t)
+
+ (raise-exception exn))
+ (lambda ()
+ (proc (open-socket-for-uri/cached uri)))
+ #:unwind? #t))
+
(define (start-nar-removal-thread database
storage-root storage-limit
metrics-registry
@@ -269,22 +342,20 @@
(lambda ()
(with-port-timeouts
(lambda ()
- (let ((port
- socket
- (open-socket-for-uri* uri)))
- (set-socket-timeout socket #:seconds 30)
-
- (http-get uri
- #:port port
- #:decode-body? #f)))))
+ (call-with-cached-connection uri
+ (lambda (port)
+ (http-get uri
+ #:port port
+ #:decode-body? #f
+ #:keep-alive? #t
+ #:streaming? #t))))))
#:times 3
#:delay 5))
(lambda (response body)
(and (= (response-code response)
200)
- (let ((json-body (json-string->scm
- (utf8->string body))))
+ (let ((json-body (json->scm body)))
(eq? (assoc-ref json-body "stored")
#t)))))))))
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index d31c677..292d308 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -605,7 +605,6 @@ If already in the worker thread, call PROC immediately."
(setsockopt port SOL_SOCKET SO_RCVTIMEO `(,seconds . 0))
(setsockopt port SOL_SOCKET SO_SNDTIMEO `(,seconds . 0))))
-;; Returns the port as well as the raw socket
(define* (open-socket-for-uri* uri
#:key (verify-certificate? #t))
(define tls-wrap
@@ -629,9 +628,17 @@ If already in the worker thread, call PROC immediately."
(let ((s (open-socket-for-uri plain-uri)))
(values
(if https?
- (tls-wrap s (uri-host uri)
- #:verify-certificate? verify-certificate?)
- s)
+ (let ((port
+ (tls-wrap s (uri-host uri)
+ #:verify-certificate? verify-certificate?)))
+ ;; Guile/guile-gnutls don't handle the handshake happening on a non
+ ;; blocking socket, so change the behavior here.
+ (let ((flags (fcntl s F_GETFL)))
+ (fcntl s F_SETFL (logior O_NONBLOCK flags)))
+ port)
+ (let ((flags (fcntl s F_GETFL)))
+ (fcntl s F_SETFL (logior O_NONBLOCK flags))
+ s))
s)))
;; Copied from (fibers web server)