;;; Nar Herder ;;; ;;; Copyright © 2021 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (nar-herder storage) #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (rnrs bytevectors) #:use-module (web uri) #:use-module (web client) #:use-module (web response) #:use-module (fibers) #:use-module (fibers channels) #:use-module (logging logger) #:use-module (logging port-log) #:use-module (prometheus) #:use-module (json) #:use-module ((guix build utils) #:select (dump-port mkdir-p)) #:use-module ((guix store) #:select (store-path-hash-part)) #:use-module (guix progress) #:use-module (nar-herder utils) #:use-module (nar-herder database) #:export (store-item-in-local-storage? remove-nar-files-by-hash initialise-storage-metrics check-storage removal-channel-remove-nar-from-storage start-nar-removal-fiber start-mirroring-fiber)) (define (store-item-in-local-storage? database storage-root hash) (let ((narinfo-files (database-select-narinfo-files database hash))) (when (null? narinfo-files) (error "no narinfo files")) (every (lambda (file) (file-exists? (string-append storage-root (uri-decode (assq-ref file 'url))))) narinfo-files))) (define* (remove-nar-files-by-hash database storage-root metrics-registry hash #:key (error-unless-files-to-remove? #t)) (let ((narinfo-files (database-select-narinfo-files database hash))) (when (and (null? narinfo-files) error-unless-files-to-remove?) (error "no narinfo files")) (for-each (lambda (file) (let* ((filename (string-append storage-root (uri-decode (assq-ref file 'url)))) (exists? (file-exists? filename))) (when exists? (remove-nar-from-storage storage-root (assq-ref file 'url))) (and=> (metrics-registry-fetch-metric metrics-registry "nar_files_total") (lambda (metric) ;; Just update this metric if it exists, since if it ;; does, it should be set to a value (metric-decrement metric #:label-values `((stored . ,(if exists? "true" "false")))))))) narinfo-files))) (define (get-storage-size storage-root) (define enter? (const #t)) (define (leaf name stat result) (+ result (or (and=> (stat:blocks stat) (lambda (blocks) (* blocks 512))) (stat:size stat)))) (define (down name stat result) result) (define (up name stat result) result) (define (skip name stat result) result) (define (error name stat errno result) (format (current-error-port) "warning: ~a: ~a~%" name (strerror errno)) result) (file-system-fold enter? leaf down up skip error 0 ; Start counting at 0 storage-root)) (define (remove-nar-from-storage storage-root nar-file) (let* ((filename (string-append storage-root "/" nar-file))) (log-msg 'INFO "removing nar " nar-file) (delete-file filename)) #t) (define (index-storage database storage-root) (define (get-files-hash) (define storage-root-length (string-length storage-root)) (define enter? (const #t)) (define (leaf name stat result) (hash-set! result (string-drop name storage-root-length) #t) result) (define (down name stat result) result) (define (up name stat result) result) (define (skip name stat result) result) (define (error name stat errno result) (format (current-error-port) "warning: ~a: ~a~%" name (strerror errno)) result) (file-system-fold enter? leaf down up skip error (make-hash-table (expt 2 19)) storage-root)) (let* ((files-hash (if storage-root (get-files-hash) (make-hash-table))) (narinfo-files (database-map-all-narinfo-files database (lambda (file) (let* ((url (uri-decode (assq-ref file 'url))) (stored? (hash-ref files-hash url))) (when stored? ;; Delete the hash entry, so ;; that the hash at the end will ;; just contain the unrecognised ;; files (hash-remove! files-hash url)) `(,@file (stored? . ,stored?))))))) `((narinfo-files . ,narinfo-files) (unrecognised-files . ,(hash-map->list (lambda (key _) key) files-hash))))) ;; TODO Maybe remove the metrics-registry argument? (define* (fold-nar-files database storage-root metrics-registry proc init #:key stored?) (define stored-files-count 0) (define not-stored-files-count 0) (let ((result (database-fold-all-narinfo-files database (lambda (nar result) (let* ((url (uri-decode (assq-ref nar 'url))) (nar-stored? (file-exists? (string-append storage-root url)))) (if nar-stored? (set! stored-files-count (1+ stored-files-count)) (set! not-stored-files-count (1+ not-stored-files-count))) (if (or (eq? stored? 'both) (and stored? nar-stored?) (and (not stored?) (not nar-stored?))) (proc nar result) result))) init))) (values result `((stored . ,stored-files-count) (not-stored . ,not-stored-files-count))))) (define* (update-nar-files-metric metrics-registry nar-file-counts #:key fetched-count removed-count) ;; Avoid incrementing or decrementing the metric if it hasn't been ;; set yet (when (or (metrics-registry-fetch-metric metrics-registry "nar_files_total") (= (length nar-file-counts) 2)) (let ((nar-files-metric (or (metrics-registry-fetch-metric metrics-registry "nar_files_total") (make-gauge-metric metrics-registry "nar_files_total" #:labels '(stored))))) ;; Set the values if the counts are known (and=> (assq-ref nar-file-counts 'stored) (lambda (stored-count) (metric-set nar-files-metric stored-count #:label-values '((stored . "true"))))) (and=> (assq-ref nar-file-counts 'not-stored) (lambda (not-stored-count) (metric-set nar-files-metric not-stored-count #:label-values '((stored . "false"))))) ;; Then adjust by the fetched or removed counts (when fetched-count (metric-increment nar-files-metric #:by fetched-count #:label-values '((stored . "true"))) (metric-decrement nar-files-metric #:by fetched-count #:label-values '((stored . "false")))) (when removed-count (metric-decrement nar-files-metric #:by removed-count #:label-values '((stored . "true"))) (metric-increment nar-files-metric #:by removed-count #:label-values '((stored . "false"))))))) (define (initialise-storage-metrics database storage-root metrics-registry) ;; Use a database transaction to block changes (database-call-with-transaction database (lambda _ (log-msg 'INFO "starting to initialise storage metrics") (let ((_ counts (fold-nar-files database storage-root metrics-registry (const #f) #f #:stored? 'both))) (update-nar-files-metric metrics-registry counts)) (log-msg 'INFO "finished initialising storage metrics")))) (define (check-storage database storage-root metrics-registry) (define files-count (database-count-narinfo-files database)) (call-with-progress-reporter (progress-reporter/bar files-count (simple-format #f "checking ~A files" files-count) (current-error-port)) (lambda (report) (fold-nar-files database storage-root metrics-registry (lambda (file _) (let* ((full-filename (string-append storage-root (uri-decode (assq-ref file 'url)))) (file-size (stat:size (stat full-filename))) (database-size (assq-ref file 'size))) (report) (unless (= file-size database-size) (newline) (log-msg 'WARN "file " full-filename " has inconsistent size (database: " database-size ", file: " file-size ")")) #f)) #f #:stored? 'both)))) (define (at-most max-length lst) "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise return its MAX-LENGTH first elements and its tail." (let loop ((len 0) (lst lst) (result '())) (match lst (() (values (reverse result) '())) ((head . tail) (if (>= len max-length) (values (reverse result) lst) (loop (+ 1 len) tail (cons head result))))))) (define %max-cached-connections ;; Maximum number of connections kept in cache by ;; 'open-connection-for-uri/cached'. 16) (define open-socket-for-uri/cached (let ((cache '())) (lambda* (uri #:key fresh? verify-certificate?) "Return a connection for URI, possibly reusing a cached connection. When FRESH? is true, delete any cached connections for URI and open a new one. Return #f if URI's scheme is 'file' or #f. When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." (define host (uri-host uri)) (define scheme (uri-scheme uri)) (define key (list host scheme (uri-port uri))) (and (not (memq scheme '(file #f))) (match (assoc-ref cache key) (#f ;; Open a new connection to URI and evict old entries from ;; CACHE, if any. (let ((socket (open-socket-for-uri* uri #:verify-certificate? verify-certificate?)) (new-cache evicted (at-most (- %max-cached-connections 1) cache))) (for-each (match-lambda ((_ . port) (false-if-exception (close-port port)))) evicted) (set! cache (alist-cons key socket new-cache)) socket)) (socket (if (or fresh? (port-closed? socket)) (begin (false-if-exception (close-port socket)) (set! cache (alist-delete key cache)) (open-socket-for-uri/cached uri #:verify-certificate? verify-certificate?)) (begin ;; Drain input left from the previous use. (drain-input socket) socket)))))))) (define (call-with-cached-connection uri proc) (let ((port (open-socket-for-uri/cached uri))) (with-throw-handler #t (lambda () (proc port)) (lambda _ (close-port port))))) (define (removal-channel-remove-nar-from-storage channel file) (let ((reply (make-channel))) (put-message channel (list 'remove-from-storage reply file)) (get-message reply))) (define (start-nar-removal-fiber database storage-root storage-limit metrics-registry nar-removal-criteria) (define storage-size-metric (make-gauge-metric metrics-registry "storage_size_bytes")) (define removal-channel (make-channel)) (define (check-removal-criteria nar criteria) (define narinfo (database-select-narinfo-for-file database (assq-ref nar 'url))) (match criteria (('and and-criteria) (every (lambda (c) (check-removal-criteria nar c)) and-criteria)) (('stored-on url) (let ((uri (string->uri (string-append (if (symbol? url) (symbol->string url) url) "/" (store-path-hash-part (assq-ref narinfo 'store-path)) ".narinfo/info")))) (with-port-timeouts (lambda () (call-with-values (lambda () (retry-on-error (lambda () (call-with-cached-connection uri (lambda (port) (http-get uri #:port port #:decode-body? #f #:keep-alive? #t #:streaming? #t)))) #:times 3 #:delay 5)) (lambda (response body) (and (= (response-code response) 200) (let ((json-body (json->scm body))) (eq? (assoc-ref json-body "stored") #t)))))) #:timeout 30))))) (define (nar-can-be-removed? nar) (any (lambda (criteria) (check-removal-criteria nar criteria)) nar-removal-criteria)) (define (run-removal-pass) (log-msg 'INFO "looking for nars to remove") (let ((initial-storage-size (with-time-logging "getting storage size" (get-storage-size storage-root)))) (log-msg 'DEBUG "initial storage size " initial-storage-size) (metric-set storage-size-metric initial-storage-size) ;; Look through items in local storage, check if the removal ;; criteria have been met, and if so, delete it (let ((result nar-file-counts (fold-nar-files database storage-root metrics-registry (lambda (nar result) (match result ((storage-size . removed-count) (if (and (> storage-size storage-limit) (nar-can-be-removed? nar)) (let ((response (removal-channel-remove-nar-from-storage removal-channel (assq-ref nar 'url)))) (if (eq? response 'removed) (let ((storage-size-estimate (- storage-size (assq-ref nar 'size)))) (cons storage-size-estimate (+ removed-count 1))) (cons storage-size removed-count))) (cons storage-size removed-count))))) (cons initial-storage-size 0) #:stored? #t))) (match result ((storage-size . removed-count) (log-msg 'INFO "finished looking for nars to remove, removed " removed-count " files")))))) (when (null? nar-removal-criteria) (error "must be some removal criteria")) (spawn-fiber (lambda () (while #t (match (get-message removal-channel) (('remove-from-storage reply file) (with-exception-handler (lambda (exn) (log-msg 'ERROR "nar remove from storage failed (" file "): " exn) (put-message reply (cons 'exn exn))) (lambda () (with-throw-handler #t (lambda () (cond ((not (file-exists? (string-append storage-root (uri-decode file)))) (put-message reply 'does-not-exist)) ((not (nar-can-be-removed? `((url . ,file)))) (put-message reply 'removal-criteria-not-met)) (else (remove-nar-from-storage storage-root (uri-decode file)) (update-nar-files-metric metrics-registry '() #:removed-count 1) (put-message reply 'removed)))) (lambda _ (backtrace)))) #:unwind? #t)) (('remove file) (with-exception-handler (lambda (exn) (log-msg 'ERROR "failed to remove " file ": " exn)) (lambda () ;; TODO: Do more checking at this point (remove-nar-from-storage storage-root (uri-decode file)) (update-nar-files-metric metrics-registry '() #:removed-count 1)) #:unwind? #t)))))) (spawn-fiber (lambda () (while #t (with-exception-handler (lambda (exn) (log-msg 'ERROR "nar removal pass failed " exn)) run-removal-pass #:unwind? #t) (sleep (* 60 60 24))))) removal-channel) (define (start-mirroring-fiber database mirror storage-limit storage-root metrics-registry) (define no-storage-limit? (not (integer? storage-limit))) (define storage-size-metric (make-gauge-metric metrics-registry "storage_size_bytes")) (define (fetch-file file) (let* ((string-url (string-append mirror file)) (uri (string->uri (string-append mirror file))) (destination-file-name (string-append storage-root (uri-decode file))) (tmp-file-name (string-append destination-file-name "-tmp"))) (log-msg 'INFO "fetching " string-url) (mkdir-p (dirname destination-file-name)) (when (file-exists? tmp-file-name) (delete-file tmp-file-name)) (with-exception-handler (lambda (exn) (when (file-exists? tmp-file-name) (delete-file tmp-file-name)) (raise-exception exn)) (lambda () (with-port-timeouts (lambda () (call-with-values (lambda () (let ((port socket (open-socket-for-uri* uri))) (http-get uri #:port port #:decode-body? #f #:streaming? #t))) (lambda (response body) (unless (= (response-code response) 200) (error "unknown response code" (response-code response))) (call-with-output-file tmp-file-name (lambda (output-port) (dump-port body output-port)))))) #:timeout 30)) #:unwind? #t) (rename-file tmp-file-name destination-file-name) (update-nar-files-metric metrics-registry '() #:fetched-count 1))) (define (download-nars initial-storage-size) ;; If there's free space, then consider downloading missing nars (if (< initial-storage-size storage-limit) (let ((result nar-file-counts (fold-nar-files database storage-root metrics-registry (lambda (file result) (log-msg 'DEBUG "considering " (assq-ref file 'url)) (match result ((storage-size . fetched-count) (let ((file-bytes (assq-ref file 'size))) (if (or no-storage-limit? (< (+ storage-size file-bytes) storage-limit)) (let ((success? (with-exception-handler (lambda (exn) (log-msg 'ERROR "failed to fetch " (assq-ref file 'url) ": " exn) #f) (lambda () (with-throw-handler #t (lambda () (retry-on-error (lambda () (fetch-file (assq-ref file 'url))) #:times 3 #:delay 5)) (lambda _ (backtrace))) #t) #:unwind? #t))) (if success? (cons (+ storage-size file-bytes) (1+ fetched-count)) result)) ;; This file won't fit, so try the next one result))))) initial-storage-size #:stored? #f))) (match result ((storage-size . fetched-count) fetched-count))) 0)) (define (fast-download-nars) (define parallelism 3) (let ((channel (make-channel))) (for-each (lambda _ (spawn-fiber (lambda () (let loop ((fetched-count 0)) (match (get-message channel) (('finished . reply) (put-message reply fetched-count)) (url (log-msg 'DEBUG "considering " url) (loop (+ fetched-count (with-exception-handler (lambda (exn) (log-msg 'ERROR "failed to fetch " url ": " exn) 0) (lambda () (retry-on-error (lambda () (fetch-file url)) #:times 3 #:delay 5) 1) #:unwind? #t))))))))) (iota parallelism)) (let ((result nar-file-counts (fold-nar-files database storage-root metrics-registry (lambda (nar _) (put-message channel (assq-ref nar 'url)) #f) #f #:stored? #f))) (let* ((reply-channel (make-channel)) (fetched-count (apply + (map (lambda _ (put-message channel (cons 'finished reply-channel)) (get-message reply-channel)) (iota parallelism))))) fetched-count)))) (define (run-mirror-pass) (log-msg 'DEBUG "running mirror pass") (let ((initial-storage-size (with-time-logging "getting storage size" (get-storage-size storage-root)))) (metric-set storage-size-metric initial-storage-size) (let ((fetched-count (if no-storage-limit? (fast-download-nars) (download-nars initial-storage-size)))) (log-msg 'DEBUG "finished mirror pass (fetched " fetched-count " nars)")))) (let ((channel (make-channel))) (spawn-fiber (lambda () (while #t (match (get-message channel) ('full-pass (with-exception-handler (lambda (exn) (log-msg 'ERROR "mirror pass failed " exn)) run-mirror-pass #:unwind? #t)) (('fetch file) (with-exception-handler (lambda (exn) (log-msg 'ERROR "failed to mirror " file ": " exn)) (lambda () (fetch-file file) (update-nar-files-metric metrics-registry '() #:fetched-count 1)) #:unwind? #t)))))) (spawn-fiber (lambda () (while #t (put-message channel 'full-pass) (sleep (* 60 60 24))))) channel))