diff options
Diffstat (limited to 'nar-herder/storage.scm')
-rw-r--r-- | nar-herder/storage.scm | 724 |
1 files changed, 526 insertions, 198 deletions
diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm index c017685..df8ec4d 100644 --- a/nar-herder/storage.scm +++ b/nar-herder/storage.scm @@ -18,6 +18,7 @@ (define-module (nar-herder storage) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 threads) @@ -25,21 +26,28 @@ #:use-module (web uri) #:use-module (web client) #:use-module (web response) + #:use-module (fibers) + #:use-module (fibers channels) #:use-module (logging logger) #:use-module (logging port-log) #:use-module (prometheus) #:use-module (json) #:use-module ((guix build utils) #:select (dump-port mkdir-p)) #:use-module ((guix store) #:select (store-path-hash-part)) + #:use-module (guix progress) #:use-module (nar-herder utils) #:use-module (nar-herder database) #:export (store-item-in-local-storage? remove-nar-files-by-hash - get-nar-files + initialise-storage-metrics + update-nar-files-metric + check-storage - start-nar-removal-thread - start-mirroring-thread)) + removal-channel-remove-nar-from-storage + + start-nar-removal-fiber + start-mirroring-fiber)) (define (store-item-in-local-storage? database storage-root hash) (let ((narinfo-files (database-select-narinfo-files database hash))) @@ -52,17 +60,12 @@ (assq-ref file 'url))))) narinfo-files))) -(define (remove-nar-files-by-hash database storage-root metrics-registry - hash) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) - +(define* (remove-nar-files-by-hash database storage-root metrics-registry + hash + #:key (error-unless-files-to-remove? #t)) (let ((narinfo-files (database-select-narinfo-files database hash))) - (when (null? narinfo-files) + (when (and (null? narinfo-files) + error-unless-files-to-remove?) (error "no narinfo files")) (for-each (lambda (file) @@ -76,14 +79,21 @@ (remove-nar-from-storage storage-root (assq-ref file 'url))) - (metric-decrement nar-files-metric - #:label-values - `((stored . ,(if exists? "true" "false")))))) + (and=> (metrics-registry-fetch-metric metrics-registry + "nar_files_total") + (lambda (metric) + ;; Just update this metric if it exists, since if it + ;; does, it should be set to a value + (metric-decrement + metric + #:label-values `((stored . ,(if exists? "true" "false")))))))) narinfo-files))) (define (get-storage-size storage-root) (define enter? (const #t)) (define (leaf name stat result) + ;; Allow other fibers to run + (sleep 0) (+ result (or (and=> (stat:blocks stat) (lambda (blocks) @@ -159,50 +169,228 @@ (unrecognised-files . ,(hash-map->list (lambda (key _) key) files-hash))))) -(define* (get-nar-files database storage-root metrics-registry - #:key stored?) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) - - (let* ((index (index-storage database storage-root)) - (selected-files - (filter - (lambda (file) - (eq? (assq-ref file 'stored?) stored?)) - (assq-ref index 'narinfo-files)))) - - (let ((selected-files-count - (length selected-files)) - (all-files-count - (length (assq-ref index 'narinfo-files)))) - - (metric-set nar-files-metric - selected-files-count - #:label-values `((stored . ,(if stored? "true" "false")))) - (metric-set nar-files-metric - (- all-files-count selected-files-count) - #:label-values `((stored . ,(if stored? "false" "true"))))) - - selected-files)) - -(define (start-nar-removal-thread database - storage-root storage-limit - metrics-registry - nar-removal-criteria) +(define* (fold-nar-files database storage-root + proc init + #:key stored?) + (define stored-files-count 0) + (define not-stored-files-count 0) + + (let ((result + (database-fold-all-narinfo-files + database + (lambda (nar result) + (let* ((url + (uri-decode + (assq-ref nar 'url))) + (nar-stored? + (if storage-root + (file-exists? + (string-append storage-root url)) + #f))) + + (if nar-stored? + (set! stored-files-count (1+ stored-files-count)) + (set! not-stored-files-count (1+ not-stored-files-count))) + + (if (or (eq? stored? 'both) + (and stored? nar-stored?) + (and (not stored?) + (not nar-stored?))) + (proc nar result) + result))) + init))) + + (values result + `((stored . ,stored-files-count) + (not-stored . ,not-stored-files-count))))) + +(define* (update-nar-files-metric metrics-registry + nar-file-counts + #:key fetched-count removed-count + not-stored-addition-count) + + ;; Avoid incrementing or decrementing the metric if it hasn't been + ;; set yet + (when (or (metrics-registry-fetch-metric metrics-registry + "nar_files_total") + (= (length nar-file-counts) 2)) + + (let ((nar-files-metric + (or (metrics-registry-fetch-metric metrics-registry + "nar_files_total") + (make-gauge-metric metrics-registry + "nar_files_total" + #:labels '(stored))))) + + ;; Set the values if the counts are known + (and=> + (assq-ref nar-file-counts 'stored) + (lambda (stored-count) + (metric-set nar-files-metric + stored-count + #:label-values '((stored . "true"))))) + (and=> + (assq-ref nar-file-counts 'not-stored) + (lambda (not-stored-count) + (metric-set nar-files-metric + not-stored-count + #:label-values '((stored . "false"))))) + + ;; Then adjust by the fetched or removed counts + (when fetched-count + (metric-increment nar-files-metric + #:by fetched-count + #:label-values '((stored . "true"))) + (metric-decrement nar-files-metric + #:by fetched-count + #:label-values '((stored . "false")))) + (when removed-count + (metric-decrement nar-files-metric + #:by removed-count + #:label-values '((stored . "true"))) + (metric-increment nar-files-metric + #:by removed-count + #:label-values '((stored . "false")))) + + (when not-stored-addition-count + (metric-increment nar-files-metric + #:by not-stored-addition-count + #:label-values '((stored . "false"))))))) + +(define (initialise-storage-metrics database storage-root metrics-registry) + ;; Use a database transaction to block changes + (database-call-with-transaction + database + (lambda _ + (log-msg 'INFO "starting to initialise storage metrics") + (let ((_ + counts + (fold-nar-files + database + storage-root + (const #f) + #f + #:stored? 'both))) + (update-nar-files-metric + metrics-registry + counts)) + (log-msg 'INFO "finished initialising storage metrics")))) + +(define (check-storage database storage-root metrics-registry) + (define files-count + (database-count-narinfo-files database)) + + (call-with-progress-reporter + (progress-reporter/bar files-count + (simple-format #f "checking ~A files" files-count) + (current-error-port)) + (lambda (report) + (fold-nar-files + database + storage-root + (lambda (file _) + (let* ((full-filename + (string-append storage-root + (uri-decode (assq-ref file 'url)))) + (file-size + (stat:size (stat full-filename))) + (database-size + (assq-ref file 'size))) + (report) + (unless (= file-size database-size) + (newline) + (log-msg 'WARN "file " full-filename + " has inconsistent size (database: " + database-size ", file: " file-size ")")) + #f)) + #f + #:stored? 'both)))) + +(define (at-most max-length lst) + "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise +return its MAX-LENGTH first elements and its tail." + (let loop ((len 0) + (lst lst) + (result '())) + (match lst + (() + (values (reverse result) '())) + ((head . tail) + (if (>= len max-length) + (values (reverse result) lst) + (loop (+ 1 len) tail (cons head result))))))) + +(define %max-cached-connections + ;; Maximum number of connections kept in cache by + ;; 'open-connection-for-uri/cached'. + 16) + +(define open-socket-for-uri/cached + (let ((cache '())) + (lambda* (uri #:key fresh? verify-certificate?) + "Return a connection for URI, possibly reusing a cached connection. +When FRESH? is true, delete any cached connections for URI and open a new one. +Return #f if URI's scheme is 'file' or #f. + +When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." + (define host (uri-host uri)) + (define scheme (uri-scheme uri)) + (define key (list host scheme (uri-port uri))) + + (and (not (memq scheme '(file #f))) + (match (assoc-ref cache key) + (#f + ;; Open a new connection to URI and evict old entries from + ;; CACHE, if any. + (let ((socket + (open-socket-for-uri* + uri + #:verify-certificate? verify-certificate?)) + (new-cache evicted + (at-most (- %max-cached-connections 1) cache))) + (for-each (match-lambda + ((_ . port) + (false-if-exception (close-port port)))) + evicted) + (set! cache (alist-cons key socket new-cache)) + socket)) + (socket + (if (or fresh? (port-closed? socket)) + (begin + (false-if-exception (close-port socket)) + (set! cache (alist-delete key cache)) + (open-socket-for-uri/cached uri + #:verify-certificate? + verify-certificate?)) + (begin + ;; Drain input left from the previous use. + (drain-input socket) + socket)))))))) + +(define (call-with-cached-connection uri proc) + (let ((port (open-socket-for-uri/cached uri))) + (with-throw-handler #t + (lambda () + (proc port)) + (lambda _ + (close-port port))))) + +(define (removal-channel-remove-nar-from-storage + channel file) + (let ((reply (make-channel))) + (put-message channel (list 'remove-from-storage reply file)) + (get-message reply))) + +(define (start-nar-removal-fiber database + storage-root storage-limit + metrics-registry + nar-removal-criteria) (define storage-size-metric (make-gauge-metric metrics-registry "storage_size_bytes")) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) + (define removal-channel + (make-channel)) (define (check-removal-criteria nar criteria) (define narinfo @@ -222,21 +410,29 @@ (store-path-hash-part (assq-ref narinfo 'store-path)) ".narinfo/info")))) - (call-with-values - (lambda () - (retry-on-error + (with-port-timeouts + (lambda () + (call-with-values (lambda () - (http-get uri #:decode-body? #f)) - #:times 3 - #:delay 5)) - (lambda (response body) - (and (= (response-code response) - 200) - - (let ((json-body (json-string->scm - (utf8->string body)))) - (eq? (assoc-ref json-body "stored") - #t))))))))) + (retry-on-error + (lambda () + (call-with-cached-connection uri + (lambda (port) + (http-get uri + #:port port + #:decode-body? #f + #:keep-alive? #t + #:streaming? #t)))) + #:times 3 + #:delay 5)) + (lambda (response body) + (and (= (response-code response) + 200) + + (let ((json-body (json->scm body))) + (eq? (assoc-ref json-body "stored") + #t)))))) + #:timeout 30))))) (define (nar-can-be-removed? nar) (any (lambda (criteria) @@ -252,43 +448,97 @@ (metric-set storage-size-metric initial-storage-size) - (let loop ((storage-size - initial-storage-size) - (stored-nar-files - (with-time-logging "getting stored nar files" - (get-nar-files database storage-root metrics-registry - #:stored? #t)))) - ;; Look through items in local storage, check if the removal - ;; criteria have been met, and if so, delete it - - (unless (null? stored-nar-files) - (let ((nar-to-consider (car stored-nar-files))) - (if (nar-can-be-removed? nar-to-consider) - (begin - (remove-nar-from-storage - storage-root - (uri-decode - (assq-ref nar-to-consider 'url))) - - (metric-decrement nar-files-metric - #:label-values '((stored . "true"))) - (metric-increment nar-files-metric - #:label-values '((stored . "false"))) - - (let ((storage-size-estimate - (- storage-size - (assq-ref nar-to-consider 'size)))) - (when (> storage-size-estimate storage-limit) - (loop storage-size-estimate - (cdr stored-nar-files))))) - (loop storage-size - (cdr stored-nar-files))))))) - (log-msg 'INFO "finished looking for nars to remove")) + ;; Look through items in local storage, check if the removal + ;; criteria have been met, and if so, delete it + (let ((result + nar-file-counts + (fold-nar-files + database + storage-root + (lambda (nar result) + (match result + ((storage-size . removed-count) + (if (and (> storage-size storage-limit) + (nar-can-be-removed? nar)) + (let ((response + (removal-channel-remove-nar-from-storage + removal-channel + (assq-ref nar 'url)))) + + (if (eq? response 'removed) + (let ((storage-size-estimate + (- storage-size + (assq-ref nar 'size)))) + (cons storage-size-estimate + (+ removed-count 1))) + (cons storage-size + removed-count))) + (cons storage-size + removed-count))))) + (cons initial-storage-size 0) + #:stored? #t))) + + (match result + ((storage-size . removed-count) + + (log-msg 'INFO "finished looking for nars to remove, removed " + removed-count " files")))))) (when (null? nar-removal-criteria) (error "must be some removal criteria")) - (call-with-new-thread + (spawn-fiber + (lambda () + (while #t + (match (get-message removal-channel) + (('remove-from-storage reply file) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "nar remove from storage failed (" + file "): " exn) + (put-message reply + (cons 'exn exn))) + (lambda () + (with-throw-handler #t + (lambda () + (cond + ((not (file-exists? + (string-append storage-root + (uri-decode file)))) + (put-message reply 'does-not-exist)) + ((not (nar-can-be-removed? + `((url . ,file)))) + (put-message reply + 'removal-criteria-not-met)) + (else + (remove-nar-from-storage + storage-root + (uri-decode file)) + + (update-nar-files-metric + metrics-registry + '() + #:removed-count 1) + + (put-message reply 'removed)))) + (lambda _ + (backtrace)))) + #:unwind? #t)) + (('remove file) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to remove " file ": " exn)) + (lambda () + ;; TODO: Do more checking at this point + (remove-nar-from-storage + storage-root + (uri-decode file)) + (update-nar-files-metric metrics-registry + '() + #:removed-count 1)) + #:unwind? #t)))))) + + (spawn-fiber (lambda () (while #t (with-exception-handler @@ -296,11 +546,12 @@ (log-msg 'ERROR "nar removal pass failed " exn)) run-removal-pass #:unwind? #t) + (sleep (* 60 60 24))))) - (sleep 300))))) + removal-channel) -(define (start-mirroring-thread database mirror storage-limit storage-root - metrics-registry) +(define (start-mirroring-fiber database mirror storage-limit storage-root + metrics-registry) (define no-storage-limit? (not (integer? storage-limit))) @@ -309,13 +560,6 @@ (make-gauge-metric metrics-registry "storage_size_bytes")) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) - (define (fetch-file file) (let* ((string-url (string-append mirror file)) @@ -333,83 +577,147 @@ (when (file-exists? tmp-file-name) (delete-file tmp-file-name)) - (call-with-values - (lambda () - (http-get uri - #:decode-body? #f - #:streaming? #t)) - (lambda (response body) - (unless (= (response-code response) - 200) - (error "unknown response code")) - - (call-with-output-file tmp-file-name - (lambda (output-port) - (dump-port body output-port))) - (rename-file tmp-file-name - destination-file-name) - - (metric-increment nar-files-metric - #:label-values '((stored . "true"))) - (metric-decrement nar-files-metric - #:label-values '((stored . "false"))))))) + (with-exception-handler + (lambda (exn) + (when (file-exists? tmp-file-name) + (delete-file tmp-file-name)) + + (raise-exception exn)) + (lambda () + (with-port-timeouts + (lambda () + (call-with-values + (lambda () + (let ((port + socket + (open-socket-for-uri* uri))) + (http-get uri + #:port port + #:decode-body? #f + #:streaming? #t))) + (lambda (response body) + (unless (= (response-code response) + 200) + (error "unknown response code" + (response-code response))) + + (call-with-output-file tmp-file-name + (lambda (output-port) + (dump-port body output-port)))))) + #:timeout 30)) + #:unwind? #t) + + (rename-file tmp-file-name + destination-file-name) + + (update-nar-files-metric + metrics-registry + '() + #:fetched-count 1))) (define (download-nars initial-storage-size) ;; If there's free space, then consider downloading missing nars - (when (< initial-storage-size storage-limit) - (let loop ((storage-size initial-storage-size) - (missing-nar-files (get-nar-files - database storage-root metrics-registry - #:stored? #f))) - (unless (null? missing-nar-files) - (let ((file (car missing-nar-files))) - (log-msg 'DEBUG "considering " - (assq-ref file 'url)) - (let ((file-bytes (assq-ref file 'size))) - (if (or no-storage-limit? - (< (+ storage-size file-bytes) - storage-limit)) - (let ((success? - (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "failed to fetch " - (assq-ref file 'url) - ": " exn) - #f) - (lambda () - (fetch-file (assq-ref file 'url)) - #t) - #:unwind? #t))) - (loop (if success? - (+ storage-size file-bytes) - storage-size) - (cdr missing-nar-files))) - ;; This file won't fit, so try the next one - (loop storage-size - (cdr missing-nar-files))))))))) + (if (< initial-storage-size storage-limit) + (let ((result + nar-file-counts + (fold-nar-files + database + storage-root + (lambda (file result) + (log-msg 'DEBUG "considering " + (assq-ref file 'url)) + (match result + ((storage-size . fetched-count) + (let ((file-bytes (assq-ref file 'size))) + (if (or no-storage-limit? + (< (+ storage-size file-bytes) + storage-limit)) + (let ((success? + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to fetch " + (assq-ref file 'url) + ": " exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (retry-on-error + (lambda () + (fetch-file (assq-ref file 'url))) + #:times 3 + #:delay 5)) + (lambda _ + (backtrace))) + #t) + #:unwind? #t))) + (if success? + (cons (+ storage-size file-bytes) + (1+ fetched-count)) + result)) + ;; This file won't fit, so try the next one + result))))) + initial-storage-size + #:stored? #f))) + + (match result + ((storage-size . fetched-count) + fetched-count))) + 0)) (define (fast-download-nars) (define parallelism 3) - (let ((missing-nar-files (get-nar-files - database storage-root metrics-registry - #:stored? #f))) - (n-par-for-each - parallelism - (lambda (file) - (log-msg 'DEBUG "considering " - (assq-ref file 'url)) - (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "failed to fetch " - (assq-ref file 'url) - ": " exn) - #f) - (lambda () - (fetch-file (assq-ref file 'url)) - #t) - #:unwind? #t)) - missing-nar-files))) + (let ((channel (make-channel))) + (for-each + (lambda _ + (spawn-fiber + (lambda () + (let loop ((fetched-count 0)) + (match (get-message channel) + (('finished . reply) + (put-message reply fetched-count)) + (url + (log-msg 'DEBUG "considering " url) + (loop + (+ fetched-count + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to fetch " url ": " exn) + 0) + (lambda () + (retry-on-error + (lambda () + (fetch-file url)) + #:times 3 + #:delay 5) + 1) + #:unwind? #t))))))))) + (iota parallelism)) + + (let ((result + nar-file-counts + (fold-nar-files + database + storage-root + (lambda (nar _) + (put-message channel + (assq-ref nar 'url)) + #f) + #f + #:stored? #f))) + + (let* ((reply-channel (make-channel)) + (fetched-count + (apply + + + (map + (lambda _ + (put-message channel + (cons 'finished reply-channel)) + (get-message reply-channel)) + (iota parallelism))))) + fetched-count)))) (define (run-mirror-pass) (log-msg 'DEBUG "running mirror pass") @@ -417,18 +725,38 @@ (get-storage-size storage-root)))) (metric-set storage-size-metric initial-storage-size) - (if no-storage-limit? - (fast-download-nars) - (download-nars initial-storage-size))) - (log-msg 'DEBUG "finished mirror pass")) - - (call-with-new-thread - (lambda () - (while #t - (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "mirror pass failed " exn)) - run-mirror-pass - #:unwind? #t) - - (sleep 300))))) + (let ((fetched-count + (if no-storage-limit? + (fast-download-nars) + (download-nars initial-storage-size)))) + (log-msg 'DEBUG "finished mirror pass (fetched " fetched-count " nars)")))) + + (let ((channel (make-channel))) + (spawn-fiber + (lambda () + (while #t + (match (get-message channel) + ('full-pass + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "mirror pass failed " exn)) + run-mirror-pass + #:unwind? #t)) + (('fetch file) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to mirror " file ": " exn)) + (lambda () + (unless (file-exists? + (string-append storage-root + (uri-decode file))) + (fetch-file file))) + #:unwind? #t)))))) + + (spawn-fiber + (lambda () + (while #t + (put-message channel 'full-pass) + (sleep (* 60 60 24))))) + + channel)) |