aboutsummaryrefslogtreecommitdiff
path: root/nar-herder/storage.scm
diff options
context:
space:
mode:
Diffstat (limited to 'nar-herder/storage.scm')
-rw-r--r--nar-herder/storage.scm724
1 files changed, 526 insertions, 198 deletions
diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm
index c017685..df8ec4d 100644
--- a/nar-herder/storage.scm
+++ b/nar-herder/storage.scm
@@ -18,6 +18,7 @@
(define-module (nar-herder storage)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
@@ -25,21 +26,28 @@
#:use-module (web uri)
#:use-module (web client)
#:use-module (web response)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
#:use-module (logging logger)
#:use-module (logging port-log)
#:use-module (prometheus)
#:use-module (json)
#:use-module ((guix build utils) #:select (dump-port mkdir-p))
#:use-module ((guix store) #:select (store-path-hash-part))
+ #:use-module (guix progress)
#:use-module (nar-herder utils)
#:use-module (nar-herder database)
#:export (store-item-in-local-storage?
remove-nar-files-by-hash
- get-nar-files
+ initialise-storage-metrics
+ update-nar-files-metric
+ check-storage
- start-nar-removal-thread
- start-mirroring-thread))
+ removal-channel-remove-nar-from-storage
+
+ start-nar-removal-fiber
+ start-mirroring-fiber))
(define (store-item-in-local-storage? database storage-root hash)
(let ((narinfo-files (database-select-narinfo-files database hash)))
@@ -52,17 +60,12 @@
(assq-ref file 'url)))))
narinfo-files)))
-(define (remove-nar-files-by-hash database storage-root metrics-registry
- hash)
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
-
+(define* (remove-nar-files-by-hash database storage-root metrics-registry
+ hash
+ #:key (error-unless-files-to-remove? #t))
(let ((narinfo-files (database-select-narinfo-files database hash)))
- (when (null? narinfo-files)
+ (when (and (null? narinfo-files)
+ error-unless-files-to-remove?)
(error "no narinfo files"))
(for-each
(lambda (file)
@@ -76,14 +79,21 @@
(remove-nar-from-storage storage-root
(assq-ref file 'url)))
- (metric-decrement nar-files-metric
- #:label-values
- `((stored . ,(if exists? "true" "false"))))))
+ (and=> (metrics-registry-fetch-metric metrics-registry
+ "nar_files_total")
+ (lambda (metric)
+ ;; Just update this metric if it exists, since if it
+ ;; does, it should be set to a value
+ (metric-decrement
+ metric
+ #:label-values `((stored . ,(if exists? "true" "false"))))))))
narinfo-files)))
(define (get-storage-size storage-root)
(define enter? (const #t))
(define (leaf name stat result)
+ ;; Allow other fibers to run
+ (sleep 0)
(+ result
(or (and=> (stat:blocks stat)
(lambda (blocks)
@@ -159,50 +169,228 @@
(unrecognised-files . ,(hash-map->list (lambda (key _) key)
files-hash)))))
-(define* (get-nar-files database storage-root metrics-registry
- #:key stored?)
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
-
- (let* ((index (index-storage database storage-root))
- (selected-files
- (filter
- (lambda (file)
- (eq? (assq-ref file 'stored?) stored?))
- (assq-ref index 'narinfo-files))))
-
- (let ((selected-files-count
- (length selected-files))
- (all-files-count
- (length (assq-ref index 'narinfo-files))))
-
- (metric-set nar-files-metric
- selected-files-count
- #:label-values `((stored . ,(if stored? "true" "false"))))
- (metric-set nar-files-metric
- (- all-files-count selected-files-count)
- #:label-values `((stored . ,(if stored? "false" "true")))))
-
- selected-files))
-
-(define (start-nar-removal-thread database
- storage-root storage-limit
- metrics-registry
- nar-removal-criteria)
+(define* (fold-nar-files database storage-root
+ proc init
+ #:key stored?)
+ (define stored-files-count 0)
+ (define not-stored-files-count 0)
+
+ (let ((result
+ (database-fold-all-narinfo-files
+ database
+ (lambda (nar result)
+ (let* ((url
+ (uri-decode
+ (assq-ref nar 'url)))
+ (nar-stored?
+ (if storage-root
+ (file-exists?
+ (string-append storage-root url))
+ #f)))
+
+ (if nar-stored?
+ (set! stored-files-count (1+ stored-files-count))
+ (set! not-stored-files-count (1+ not-stored-files-count)))
+
+ (if (or (eq? stored? 'both)
+ (and stored? nar-stored?)
+ (and (not stored?)
+ (not nar-stored?)))
+ (proc nar result)
+ result)))
+ init)))
+
+ (values result
+ `((stored . ,stored-files-count)
+ (not-stored . ,not-stored-files-count)))))
+
+(define* (update-nar-files-metric metrics-registry
+ nar-file-counts
+ #:key fetched-count removed-count
+ not-stored-addition-count)
+
+ ;; Avoid incrementing or decrementing the metric if it hasn't been
+ ;; set yet
+ (when (or (metrics-registry-fetch-metric metrics-registry
+ "nar_files_total")
+ (= (length nar-file-counts) 2))
+
+ (let ((nar-files-metric
+ (or (metrics-registry-fetch-metric metrics-registry
+ "nar_files_total")
+ (make-gauge-metric metrics-registry
+ "nar_files_total"
+ #:labels '(stored)))))
+
+ ;; Set the values if the counts are known
+ (and=>
+ (assq-ref nar-file-counts 'stored)
+ (lambda (stored-count)
+ (metric-set nar-files-metric
+ stored-count
+ #:label-values '((stored . "true")))))
+ (and=>
+ (assq-ref nar-file-counts 'not-stored)
+ (lambda (not-stored-count)
+ (metric-set nar-files-metric
+ not-stored-count
+ #:label-values '((stored . "false")))))
+
+ ;; Then adjust by the fetched or removed counts
+ (when fetched-count
+ (metric-increment nar-files-metric
+ #:by fetched-count
+ #:label-values '((stored . "true")))
+ (metric-decrement nar-files-metric
+ #:by fetched-count
+ #:label-values '((stored . "false"))))
+ (when removed-count
+ (metric-decrement nar-files-metric
+ #:by removed-count
+ #:label-values '((stored . "true")))
+ (metric-increment nar-files-metric
+ #:by removed-count
+ #:label-values '((stored . "false"))))
+
+ (when not-stored-addition-count
+ (metric-increment nar-files-metric
+ #:by not-stored-addition-count
+ #:label-values '((stored . "false")))))))
+
+(define (initialise-storage-metrics database storage-root metrics-registry)
+ ;; Use a database transaction to block changes
+ (database-call-with-transaction
+ database
+ (lambda _
+ (log-msg 'INFO "starting to initialise storage metrics")
+ (let ((_
+ counts
+ (fold-nar-files
+ database
+ storage-root
+ (const #f)
+ #f
+ #:stored? 'both)))
+ (update-nar-files-metric
+ metrics-registry
+ counts))
+ (log-msg 'INFO "finished initialising storage metrics"))))
+
+(define (check-storage database storage-root metrics-registry)
+ (define files-count
+ (database-count-narinfo-files database))
+
+ (call-with-progress-reporter
+ (progress-reporter/bar files-count
+ (simple-format #f "checking ~A files" files-count)
+ (current-error-port))
+ (lambda (report)
+ (fold-nar-files
+ database
+ storage-root
+ (lambda (file _)
+ (let* ((full-filename
+ (string-append storage-root
+ (uri-decode (assq-ref file 'url))))
+ (file-size
+ (stat:size (stat full-filename)))
+ (database-size
+ (assq-ref file 'size)))
+ (report)
+ (unless (= file-size database-size)
+ (newline)
+ (log-msg 'WARN "file " full-filename
+ " has inconsistent size (database: "
+ database-size ", file: " file-size ")"))
+ #f))
+ #f
+ #:stored? 'both))))
+
+(define (at-most max-length lst)
+ "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise
+return its MAX-LENGTH first elements and its tail."
+ (let loop ((len 0)
+ (lst lst)
+ (result '()))
+ (match lst
+ (()
+ (values (reverse result) '()))
+ ((head . tail)
+ (if (>= len max-length)
+ (values (reverse result) lst)
+ (loop (+ 1 len) tail (cons head result)))))))
+
+(define %max-cached-connections
+ ;; Maximum number of connections kept in cache by
+ ;; 'open-connection-for-uri/cached'.
+ 16)
+
+(define open-socket-for-uri/cached
+ (let ((cache '()))
+ (lambda* (uri #:key fresh? verify-certificate?)
+ "Return a connection for URI, possibly reusing a cached connection.
+When FRESH? is true, delete any cached connections for URI and open a new one.
+Return #f if URI's scheme is 'file' or #f.
+
+When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
+ (define host (uri-host uri))
+ (define scheme (uri-scheme uri))
+ (define key (list host scheme (uri-port uri)))
+
+ (and (not (memq scheme '(file #f)))
+ (match (assoc-ref cache key)
+ (#f
+ ;; Open a new connection to URI and evict old entries from
+ ;; CACHE, if any.
+ (let ((socket
+ (open-socket-for-uri*
+ uri
+ #:verify-certificate? verify-certificate?))
+ (new-cache evicted
+ (at-most (- %max-cached-connections 1) cache)))
+ (for-each (match-lambda
+ ((_ . port)
+ (false-if-exception (close-port port))))
+ evicted)
+ (set! cache (alist-cons key socket new-cache))
+ socket))
+ (socket
+ (if (or fresh? (port-closed? socket))
+ (begin
+ (false-if-exception (close-port socket))
+ (set! cache (alist-delete key cache))
+ (open-socket-for-uri/cached uri
+ #:verify-certificate?
+ verify-certificate?))
+ (begin
+ ;; Drain input left from the previous use.
+ (drain-input socket)
+ socket))))))))
+
+(define (call-with-cached-connection uri proc)
+ (let ((port (open-socket-for-uri/cached uri)))
+ (with-throw-handler #t
+ (lambda ()
+ (proc port))
+ (lambda _
+ (close-port port)))))
+
+(define (removal-channel-remove-nar-from-storage
+ channel file)
+ (let ((reply (make-channel)))
+ (put-message channel (list 'remove-from-storage reply file))
+ (get-message reply)))
+
+(define (start-nar-removal-fiber database
+ storage-root storage-limit
+ metrics-registry
+ nar-removal-criteria)
(define storage-size-metric
(make-gauge-metric metrics-registry
"storage_size_bytes"))
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
+ (define removal-channel
+ (make-channel))
(define (check-removal-criteria nar criteria)
(define narinfo
@@ -222,21 +410,29 @@
(store-path-hash-part
(assq-ref narinfo 'store-path))
".narinfo/info"))))
- (call-with-values
- (lambda ()
- (retry-on-error
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
(lambda ()
- (http-get uri #:decode-body? #f))
- #:times 3
- #:delay 5))
- (lambda (response body)
- (and (= (response-code response)
- 200)
-
- (let ((json-body (json-string->scm
- (utf8->string body))))
- (eq? (assoc-ref json-body "stored")
- #t)))))))))
+ (retry-on-error
+ (lambda ()
+ (call-with-cached-connection uri
+ (lambda (port)
+ (http-get uri
+ #:port port
+ #:decode-body? #f
+ #:keep-alive? #t
+ #:streaming? #t))))
+ #:times 3
+ #:delay 5))
+ (lambda (response body)
+ (and (= (response-code response)
+ 200)
+
+ (let ((json-body (json->scm body)))
+ (eq? (assoc-ref json-body "stored")
+ #t))))))
+ #:timeout 30)))))
(define (nar-can-be-removed? nar)
(any (lambda (criteria)
@@ -252,43 +448,97 @@
(metric-set storage-size-metric
initial-storage-size)
- (let loop ((storage-size
- initial-storage-size)
- (stored-nar-files
- (with-time-logging "getting stored nar files"
- (get-nar-files database storage-root metrics-registry
- #:stored? #t))))
- ;; Look through items in local storage, check if the removal
- ;; criteria have been met, and if so, delete it
-
- (unless (null? stored-nar-files)
- (let ((nar-to-consider (car stored-nar-files)))
- (if (nar-can-be-removed? nar-to-consider)
- (begin
- (remove-nar-from-storage
- storage-root
- (uri-decode
- (assq-ref nar-to-consider 'url)))
-
- (metric-decrement nar-files-metric
- #:label-values '((stored . "true")))
- (metric-increment nar-files-metric
- #:label-values '((stored . "false")))
-
- (let ((storage-size-estimate
- (- storage-size
- (assq-ref nar-to-consider 'size))))
- (when (> storage-size-estimate storage-limit)
- (loop storage-size-estimate
- (cdr stored-nar-files)))))
- (loop storage-size
- (cdr stored-nar-files)))))))
- (log-msg 'INFO "finished looking for nars to remove"))
+ ;; Look through items in local storage, check if the removal
+ ;; criteria have been met, and if so, delete it
+ (let ((result
+ nar-file-counts
+ (fold-nar-files
+ database
+ storage-root
+ (lambda (nar result)
+ (match result
+ ((storage-size . removed-count)
+ (if (and (> storage-size storage-limit)
+ (nar-can-be-removed? nar))
+ (let ((response
+ (removal-channel-remove-nar-from-storage
+ removal-channel
+ (assq-ref nar 'url))))
+
+ (if (eq? response 'removed)
+ (let ((storage-size-estimate
+ (- storage-size
+ (assq-ref nar 'size))))
+ (cons storage-size-estimate
+ (+ removed-count 1)))
+ (cons storage-size
+ removed-count)))
+ (cons storage-size
+ removed-count)))))
+ (cons initial-storage-size 0)
+ #:stored? #t)))
+
+ (match result
+ ((storage-size . removed-count)
+
+ (log-msg 'INFO "finished looking for nars to remove, removed "
+ removed-count " files"))))))
(when (null? nar-removal-criteria)
(error "must be some removal criteria"))
- (call-with-new-thread
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (match (get-message removal-channel)
+ (('remove-from-storage reply file)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "nar remove from storage failed ("
+ file "): " exn)
+ (put-message reply
+ (cons 'exn exn)))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (cond
+ ((not (file-exists?
+ (string-append storage-root
+ (uri-decode file))))
+ (put-message reply 'does-not-exist))
+ ((not (nar-can-be-removed?
+ `((url . ,file))))
+ (put-message reply
+ 'removal-criteria-not-met))
+ (else
+ (remove-nar-from-storage
+ storage-root
+ (uri-decode file))
+
+ (update-nar-files-metric
+ metrics-registry
+ '()
+ #:removed-count 1)
+
+ (put-message reply 'removed))))
+ (lambda _
+ (backtrace))))
+ #:unwind? #t))
+ (('remove file)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to remove " file ": " exn))
+ (lambda ()
+ ;; TODO: Do more checking at this point
+ (remove-nar-from-storage
+ storage-root
+ (uri-decode file))
+ (update-nar-files-metric metrics-registry
+ '()
+ #:removed-count 1))
+ #:unwind? #t))))))
+
+ (spawn-fiber
(lambda ()
(while #t
(with-exception-handler
@@ -296,11 +546,12 @@
(log-msg 'ERROR "nar removal pass failed " exn))
run-removal-pass
#:unwind? #t)
+ (sleep (* 60 60 24)))))
- (sleep 300)))))
+ removal-channel)
-(define (start-mirroring-thread database mirror storage-limit storage-root
- metrics-registry)
+(define (start-mirroring-fiber database mirror storage-limit storage-root
+ metrics-registry)
(define no-storage-limit?
(not (integer? storage-limit)))
@@ -309,13 +560,6 @@
(make-gauge-metric metrics-registry
"storage_size_bytes"))
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
-
(define (fetch-file file)
(let* ((string-url
(string-append mirror file))
@@ -333,83 +577,147 @@
(when (file-exists? tmp-file-name)
(delete-file tmp-file-name))
- (call-with-values
- (lambda ()
- (http-get uri
- #:decode-body? #f
- #:streaming? #t))
- (lambda (response body)
- (unless (= (response-code response)
- 200)
- (error "unknown response code"))
-
- (call-with-output-file tmp-file-name
- (lambda (output-port)
- (dump-port body output-port)))
- (rename-file tmp-file-name
- destination-file-name)
-
- (metric-increment nar-files-metric
- #:label-values '((stored . "true")))
- (metric-decrement nar-files-metric
- #:label-values '((stored . "false")))))))
+ (with-exception-handler
+ (lambda (exn)
+ (when (file-exists? tmp-file-name)
+ (delete-file tmp-file-name))
+
+ (raise-exception exn))
+ (lambda ()
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (let ((port
+ socket
+ (open-socket-for-uri* uri)))
+ (http-get uri
+ #:port port
+ #:decode-body? #f
+ #:streaming? #t)))
+ (lambda (response body)
+ (unless (= (response-code response)
+ 200)
+ (error "unknown response code"
+ (response-code response)))
+
+ (call-with-output-file tmp-file-name
+ (lambda (output-port)
+ (dump-port body output-port))))))
+ #:timeout 30))
+ #:unwind? #t)
+
+ (rename-file tmp-file-name
+ destination-file-name)
+
+ (update-nar-files-metric
+ metrics-registry
+ '()
+ #:fetched-count 1)))
(define (download-nars initial-storage-size)
;; If there's free space, then consider downloading missing nars
- (when (< initial-storage-size storage-limit)
- (let loop ((storage-size initial-storage-size)
- (missing-nar-files (get-nar-files
- database storage-root metrics-registry
- #:stored? #f)))
- (unless (null? missing-nar-files)
- (let ((file (car missing-nar-files)))
- (log-msg 'DEBUG "considering "
- (assq-ref file 'url))
- (let ((file-bytes (assq-ref file 'size)))
- (if (or no-storage-limit?
- (< (+ storage-size file-bytes)
- storage-limit))
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "failed to fetch "
- (assq-ref file 'url)
- ": " exn)
- #f)
- (lambda ()
- (fetch-file (assq-ref file 'url))
- #t)
- #:unwind? #t)))
- (loop (if success?
- (+ storage-size file-bytes)
- storage-size)
- (cdr missing-nar-files)))
- ;; This file won't fit, so try the next one
- (loop storage-size
- (cdr missing-nar-files)))))))))
+ (if (< initial-storage-size storage-limit)
+ (let ((result
+ nar-file-counts
+ (fold-nar-files
+ database
+ storage-root
+ (lambda (file result)
+ (log-msg 'DEBUG "considering "
+ (assq-ref file 'url))
+ (match result
+ ((storage-size . fetched-count)
+ (let ((file-bytes (assq-ref file 'size)))
+ (if (or no-storage-limit?
+ (< (+ storage-size file-bytes)
+ storage-limit))
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to fetch "
+ (assq-ref file 'url)
+ ": " exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (retry-on-error
+ (lambda ()
+ (fetch-file (assq-ref file 'url)))
+ #:times 3
+ #:delay 5))
+ (lambda _
+ (backtrace)))
+ #t)
+ #:unwind? #t)))
+ (if success?
+ (cons (+ storage-size file-bytes)
+ (1+ fetched-count))
+ result))
+ ;; This file won't fit, so try the next one
+ result)))))
+ initial-storage-size
+ #:stored? #f)))
+
+ (match result
+ ((storage-size . fetched-count)
+ fetched-count)))
+ 0))
(define (fast-download-nars)
(define parallelism 3)
- (let ((missing-nar-files (get-nar-files
- database storage-root metrics-registry
- #:stored? #f)))
- (n-par-for-each
- parallelism
- (lambda (file)
- (log-msg 'DEBUG "considering "
- (assq-ref file 'url))
- (with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "failed to fetch "
- (assq-ref file 'url)
- ": " exn)
- #f)
- (lambda ()
- (fetch-file (assq-ref file 'url))
- #t)
- #:unwind? #t))
- missing-nar-files)))
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda _
+ (spawn-fiber
+ (lambda ()
+ (let loop ((fetched-count 0))
+ (match (get-message channel)
+ (('finished . reply)
+ (put-message reply fetched-count))
+ (url
+ (log-msg 'DEBUG "considering " url)
+ (loop
+ (+ fetched-count
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to fetch " url ": " exn)
+ 0)
+ (lambda ()
+ (retry-on-error
+ (lambda ()
+ (fetch-file url))
+ #:times 3
+ #:delay 5)
+ 1)
+ #:unwind? #t)))))))))
+ (iota parallelism))
+
+ (let ((result
+ nar-file-counts
+ (fold-nar-files
+ database
+ storage-root
+ (lambda (nar _)
+ (put-message channel
+ (assq-ref nar 'url))
+ #f)
+ #f
+ #:stored? #f)))
+
+ (let* ((reply-channel (make-channel))
+ (fetched-count
+ (apply
+ +
+ (map
+ (lambda _
+ (put-message channel
+ (cons 'finished reply-channel))
+ (get-message reply-channel))
+ (iota parallelism)))))
+ fetched-count))))
(define (run-mirror-pass)
(log-msg 'DEBUG "running mirror pass")
@@ -417,18 +725,38 @@
(get-storage-size storage-root))))
(metric-set storage-size-metric
initial-storage-size)
- (if no-storage-limit?
- (fast-download-nars)
- (download-nars initial-storage-size)))
- (log-msg 'DEBUG "finished mirror pass"))
-
- (call-with-new-thread
- (lambda ()
- (while #t
- (with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "mirror pass failed " exn))
- run-mirror-pass
- #:unwind? #t)
-
- (sleep 300)))))
+ (let ((fetched-count
+ (if no-storage-limit?
+ (fast-download-nars)
+ (download-nars initial-storage-size))))
+ (log-msg 'DEBUG "finished mirror pass (fetched " fetched-count " nars)"))))
+
+ (let ((channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (match (get-message channel)
+ ('full-pass
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "mirror pass failed " exn))
+ run-mirror-pass
+ #:unwind? #t))
+ (('fetch file)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to mirror " file ": " exn))
+ (lambda ()
+ (unless (file-exists?
+ (string-append storage-root
+ (uri-decode file)))
+ (fetch-file file)))
+ #:unwind? #t))))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (put-message channel 'full-pass)
+ (sleep (* 60 60 24)))))
+
+ channel))