;;; Nar Herder ;;; ;;; Copyright © 2022, 2023 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (nar-herder cached-compression) #:use-module (srfi srfi-1) #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 atomic) #:use-module (ice-9 threads) #:use-module (logging logger) #:use-module (prometheus) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (web uri) #:use-module (web client) #:use-module (web response) #:use-module (guix store) #:use-module ((guix utils) #:select (compressed-file? call-with-decompressed-port)) #:use-module ((guix build utils) #:select (dump-port mkdir-p)) #:use-module (nar-herder utils) #:use-module (nar-herder database) #:export (start-cached-compression-management-fiber start-cached-compression-removal-fiber start-cached-compression-schedule-removal-fiber)) ;; Nar caching overview ;; ;; On start ;; - Compute the size of each cached compression directory ;; - Remove database entries if they're missing from the directory ;; - Remove files if they're missing from the database ;; ;; On nar usage ;; - Bump count ;; - If count is sufficient ;; - Trigger generation of the cached nar ;; - Skip if the work queue already includes this job ;; - At the start of the job, check for a database entry and exit ;; early if one exists bumping the atime of the file ;; - If the file doesn't exist, check if there's free space ;; - If there's free space, create the file ;; ;; Nar removal fiber ;; - Periodically remove nars that have been scheduled for removal ;; (when the scheduled time passes) ;; ;; Nar schedule removal fiber ;; - Periodically check for nars that haven't been accessed in some ;; time and schedule them for removal (define (perform-cached-compression-startup database enabled-cached-compressions nar-cache-files) (let* ((database-entries-missing-files ;; List tracking entries in the database for cached files, ;; where the file is missing from the disk. ;; ;; These database entries will be removed at the end of the ;; startup procecss. '()) ;; alist of compression to hash table of files ;; ;; Entries from the hash tables will be removed when the ;; database entry is processed below, so these hash tables ;; will be left reflecting files in the directories, but with ;; no entry in the database. ;; ;; These files will be deleted at the end of the startup ;; process. (files-by-compression (map (match-lambda ((compression . details) (let ((result (make-hash-table))) (for-each (lambda (file) (hash-set! result file #t)) (scandir (assq-ref details 'directory) (negate (cut member <> '("." ".."))))) (cons compression result)))) enabled-cached-compressions)) (cached-bytes-by-compression (database-fold-cached-narinfo-files database (lambda (details result) (let ((compression (assq-ref details 'compression)) (filename (store-path-base (assq-ref details 'store-path)))) (let ((files-hash (assq-ref files-by-compression compression))) (if (hash-ref files-hash filename) (begin (hash-remove! files-hash filename) (metric-increment nar-cache-files #:label-values `((compression . ,compression))) `((,compression . ,(+ (assq-ref details 'size) (or (assq-ref result compression) 0))) ,@(alist-delete compression result))) ;; Database entry, but file missing (begin (set! database-entries-missing-files (cons details database-entries-missing-files)) result))))) (map (lambda (compression) (cons compression 0)) (map car enabled-cached-compressions))))) ;; Delete cached files with no database entries (for-each (match-lambda ((compression . hash-table) (let ((count (hash-count (const #t) hash-table))) (unless (= 0 count) (let ((directory (assq-ref (assq-ref enabled-cached-compressions compression) 'directory))) (simple-format #t "deleting ~A cached files from ~A\n" count directory) (hash-for-each (lambda (filename _) (delete-file (string-append directory "/" filename))) hash-table)))))) files-by-compression) ;; Delete database entries where the file is missing (let ((count (length database-entries-missing-files))) (unless (= 0 count) (simple-format #t "deleting ~A cached_narinfo_files entries due to missing files\n" count) (for-each (lambda (details) (database-remove-cached-narinfo-file database (assq-ref details 'narinfo-id) (symbol->string (assq-ref details 'compression)))) database-entries-missing-files))) cached-bytes-by-compression)) ;; This fiber manages metadata around cached compressions, and ;; delegates tasks to the thread pool to generate newly compressed ;; nars (define* (start-cached-compression-management-fiber database metrics-registry nar-source enabled-cached-compressions cached-compression-min-uses #:key (cached-compression-workers 2) scheduler) (define nar-cache-bytes-metric (make-gauge-metric metrics-registry "nar_cache_size_bytes" #:labels '(compression))) (define nar-cache-files (make-gauge-metric metrics-registry "nar_cache_files" #:labels '(compression))) (define channel (make-channel)) (let ((process-job count-jobs count-threads list-jobs (create-work-queue cached-compression-workers (lambda (thunk) (thunk)) #:name "cached compression"))) (define (consider-narinfo cached-bytes-by-compression usage-hash-table narinfo-id) (let* ((existing-cached-files ;; This is important both to avoid trying to create ;; files twice, but also in the case where additional ;; compressions are enabled and more files need to be ;; generated (database-select-cached-narinfo-files-by-narinfo-id database narinfo-id)) (existing-compressions (map (lambda (details) (assq-ref details 'compression)) existing-cached-files)) (missing-compressions (lset-difference eq? (map car enabled-cached-compressions) existing-compressions)) (narinfo-files (database-select-narinfo-files-by-narinfo-id database narinfo-id)) (compress-file? (let ((url (assq-ref (first narinfo-files) 'url))) ;; TODO: Maybe this should be configurable? (not (compressed-file? url))))) (when (and compress-file? (not (null? missing-compressions))) (let ((new-count (let ((val (+ 1 (or (hash-ref usage-hash-table narinfo-id) 0)))) (hash-set! usage-hash-table narinfo-id val) val))) (when (and (> new-count cached-compression-min-uses)) (let* ((narinfo-details (database-select-narinfo database narinfo-id)) (nar-size (assq-ref narinfo-details 'nar-size)) (compressions-with-space (filter (lambda (compression) (let ((directory-max-size (assq-ref (assq-ref enabled-cached-compressions compression) 'directory-max-size)) (current-directory-size (assq-ref cached-bytes-by-compression compression))) (unless current-directory-size (log-msg 'ERROR "current-directory-size unset: " current-directory-size)) (unless nar-size (log-msg 'ERROR "nar-size unset: " nar-size)) (if directory-max-size (< (+ current-directory-size ;; Assume the commpressed nar could be ;; as big as the uncompressed nar nar-size) directory-max-size) #t))) missing-compressions))) (for-each (lambda (compression) (spawn-fiber (lambda () (process-job (lambda () (let ((new-bytes (make-compressed-nar narinfo-files nar-source enabled-cached-compressions compression #:level (assq-ref (assq-ref enabled-cached-compressions compression) 'level)))) (put-message channel (list 'cached-narinfo-added narinfo-id compression new-bytes #f)))))))) compressions-with-space))))))) (spawn-fiber (lambda () (let ((initial-cached-bytes-by-compression (perform-cached-compression-startup database enabled-cached-compressions nar-cache-files)) (nar-cached-compression-usage-hash-table (make-hash-table 65536))) (for-each (match-lambda ((compression . bytes) (metric-set nar-cache-bytes-metric bytes #:label-values `((compression . ,compression))))) initial-cached-bytes-by-compression) (let loop ((cached-bytes-by-compression initial-cached-bytes-by-compression)) (match (get-message channel) (('narinfo-id . narinfo-id) (with-exception-handler (lambda (exn) (log-msg 'ERROR "exception considering narinfo (" narinfo-id "): " exn) #f) (lambda () (consider-narinfo cached-bytes-by-compression nar-cached-compression-usage-hash-table narinfo-id)) #:unwind? #t) (loop cached-bytes-by-compression)) (((and (or 'cached-narinfo-added 'cached-narinfo-removed) action) narinfo-id compression size reply) (let ((updated-bytes ((if (eq? action 'cached-narinfo-added) + -) (or (assq-ref cached-bytes-by-compression compression) 0) size))) (metric-set nar-cache-bytes-metric updated-bytes #:label-values `((compression . ,compression))) ((if (eq? action 'cached-narinfo-added) metric-increment metric-decrement) nar-cache-files #:label-values `((compression . ,compression))) ;; Use an explicit transaction as it handles the ;; database being busy, (database-call-with-transaction database (lambda _ (if (eq? action 'cached-narinfo-added) (database-insert-cached-narinfo-file database narinfo-id size compression) (let ((cached-narinfo-details (database-select-cached-narinfo-file-by-narinfo-id-and-compression database narinfo-id compression))) ;; It might not have been scheduled for ;; removal, but remove any schedule that ;; exists (let ((schedule-removed? (database-delete-scheduled-cached-narinfo-removal database (assq-ref cached-narinfo-details 'id)))) (when schedule-removed? (metric-decrement (metrics-registry-fetch-metric metrics-registry "database_scheduled_cached_narinfo_removal_total")))) ;; Remove all the database entries first, as ;; that'll stop these files appearing in narinfos (database-remove-cached-narinfo-file database narinfo-id (symbol->string compression)))))) (hash-remove! nar-cached-compression-usage-hash-table narinfo-id) (when reply (put-message reply #t)) (loop (alist-cons compression updated-bytes (alist-delete compression cached-bytes-by-compression))))))))) scheduler) channel)) ;; Periodically check for nars that haven't been accessed in some time ;; and schedule them for removal (define (start-cached-compression-schedule-removal-fiber database metrics-registry cached-compression-management-channel enabled-cached-compressions cached-compression-removal-fiber-wakeup-channel base-ttl) (define (files-to-schedule-for-removal compression-details) (let* ((directory (assq-ref compression-details 'directory)) (unused-removal-duration (assq-ref compression-details 'unused-removal-duration)) (atime-threshold (time-second (subtract-duration (current-time) unused-removal-duration)))) (scandir directory (lambda (filename) (and (< (stat:atime (stat (string-append directory "/" filename))) atime-threshold) (not (member filename '("." "..")))))))) (define (schedule-removal compression compression-details) (let ((files (let ((files (with-time-logging "files-to-schedule-for-removal" (files-to-schedule-for-removal compression-details)))) (log-msg 'INFO "cached-compression-schedule-removal-fiber " "looking at " (length files) " files") files)) (count-metric (metrics-registry-fetch-metric metrics-registry "database_scheduled_cached_narinfo_removal_total"))) (with-time-logging "inserting scheduled-cached-narinfo-removals" (for-each (lambda (file) (let* ((cached-narinfo-file-details (database-select-cached-narinfo-file-by-hash database (string-take file 32) ; hash part compression)) (existing-scheduled-removal (database-select-scheduled-cached-narinfo-removal database (assq-ref cached-narinfo-file-details 'id)))) (unless existing-scheduled-removal (let ((removal-time ;; The earliest this can be removed is the current ;; time, plus the TTL (add-duration (current-time) (make-time time-duration 0 (or (assq-ref compression-details 'ttl) base-ttl))))) (database-insert-scheduled-cached-narinfo-removal database (assq-ref cached-narinfo-file-details 'id) removal-time) (metric-increment count-metric))))) files)) ;; Wake the cached compression removal fiber in case one of ;; the new scheduled removals is before it's scheduled to wake ;; up (put-message cached-compression-removal-fiber-wakeup-channel #t))) (spawn-fiber (lambda () (let ((sleep-duration (max (* 60 60 6) ; Maybe this be confirgurable (apply min (map (lambda (compression-details) (/ (time-second (assq-ref compression-details 'unused-removal-duration)) 4)) enabled-cached-compressions)))) (start-count (database-count-scheduled-cached-narinfo-removal database))) (metric-set (or (metrics-registry-fetch-metric metrics-registry "database_scheduled_cached_narinfo_removal_total") (make-gauge-metric metrics-registry "database_scheduled_cached_narinfo_removal_total")) start-count) (while #t (log-msg 'INFO "cached-compression-schedule-removal-fiber starting pass") (for-each (match-lambda ((compression . details) (with-exception-handler (lambda (exn) (log-msg 'ERROR "cached-compression-schedule-removal-fiber: " "exception: " exn)) (lambda () (schedule-removal compression details)) #:unwind? #t))) enabled-cached-compressions) (log-msg 'INFO "cached-compression-schedule-removal-fiber sleeping for " sleep-duration) (sleep sleep-duration)))))) ;; Process the scheduled removals for cached nars (define (start-cached-compression-removal-fiber database cached-compression-management-channel enabled-cached-compressions) (define wakeup-channel (make-channel)) (define (make-pass) (log-msg 'INFO "cached-compression-removal-fiber starting pass") (let ((scheduled-cached-narinfo-removal (database-select-oldest-scheduled-cached-narinfo-removal database))) (log-msg 'INFO "scheduled removal: " scheduled-cached-narinfo-removal) (if scheduled-cached-narinfo-removal (if (time<=? (assq-ref scheduled-cached-narinfo-removal 'scheduled-removal-time) (current-time)) (let ((id (assq-ref scheduled-cached-narinfo-removal 'id)) (narinfo-id (assq-ref scheduled-cached-narinfo-removal 'narinfo-id)) (compression (assq-ref scheduled-cached-narinfo-removal 'compression)) (size (assq-ref scheduled-cached-narinfo-removal 'size)) (store-path (assq-ref scheduled-cached-narinfo-removal 'store-path))) (let ((reply (make-channel))) (put-message cached-compression-management-channel (list 'cached-narinfo-removed narinfo-id compression size reply)) ;; Wait for the management fiber to delete the ;; database entry before removing the file. (get-message reply)) (let ((directory (assq-ref (assq-ref enabled-cached-compressions compression) 'directory))) (let ((filename (string-append directory "/" (basename store-path)))) (log-msg 'INFO "deleting " filename) (delete-file filename)))) (let ((duration (time-difference (assq-ref scheduled-cached-narinfo-removal 'scheduled-removal-time) (current-time)))) (perform-operation (choice-operation (sleep-operation (max 1 (+ 1 (time-second duration)))) (get-operation wakeup-channel))))) ;; Sleep until woken (get-message wakeup-channel)))) (spawn-fiber (lambda () (while #t (with-exception-handler (lambda (exn) (log-msg 'ERROR "exception in cached-compression-removal-fiber: " exn)) (lambda () (with-throw-handler #t make-pass (lambda _ (backtrace)))) #:unwind? #t)))) wakeup-channel) (define* (make-compressed-nar narinfo-files nar-source enabled-cached-compressions target-compression #:key level) (define cached-compression-details (assq-ref enabled-cached-compressions target-compression)) (log-msg 'INFO "making " target-compression " for " (uri-decode (basename (assq-ref (first narinfo-files) 'url)))) (let* ((source-narinfo-file ;; There's no specific logic to this, it should be possible ;; to use any file (first narinfo-files)) (source-filename (cond ((string-prefix? "http" nar-source) (let* ((output-port (mkstemp "/tmp/nar-herder-source-nar-XXXXXX")) (filename (port-filename output-port)) (uri (string->uri (string-append nar-source (assq-ref source-narinfo-file 'url))))) (log-msg 'DEBUG "downloading " (uri->string uri)) (with-exception-handler (lambda (exn) (close-port output-port) (delete-file filename) (raise-exception exn)) (lambda () (with-port-timeouts (lambda () (call-with-values (lambda () (let ((port socket (open-socket-for-uri* uri))) (http-get uri #:port port #:decode-body? #f #:streaming? #t))) (lambda (response body) (unless (= (response-code response) 200) (error "unknown response code" (response-code response))) (dump-port body output-port))) (close-port output-port)) #:timeout 30)) #:unwind? #t) filename)) ((string-prefix? "/" nar-source) (string-append ;; If it's a filename, then it's the canonical path to ;; the storage directory nar-source (uri-decode (assq-ref source-narinfo-file 'url)))) (else (error "unknown nar source"))))) (let* ((dest-directory (assq-ref cached-compression-details 'directory)) (dest-filename (string-append dest-directory "/" (uri-decode (basename (assq-ref source-narinfo-file 'url))))) (tmp-dest-filename (string-append dest-filename ".tmp"))) (when (file-exists? tmp-dest-filename) (delete-file tmp-dest-filename)) (when (file-exists? dest-filename) (delete-file dest-filename)) (mkdir-p dest-directory) (call-with-input-file source-filename (lambda (source-port) (call-with-decompressed-port (string->symbol (assq-ref source-narinfo-file 'compression)) source-port (lambda (decompressed-source-port) (let ((call-with-compressed-output-port* (match target-compression ('gzip (@ (zlib) call-with-gzip-output-port)) ('lzip (@ (lzlib) call-with-lzip-output-port)) ('zstd (@ (zstd) call-with-zstd-output-port)) ('none (lambda (port proc) (proc port)))))) (apply call-with-compressed-output-port* (open-output-file tmp-dest-filename) (lambda (compressed-port) (dump-port decompressed-source-port compressed-port)) (if level `(#:level ,level) '()))))))) (rename-file tmp-dest-filename dest-filename) (when (string-prefix? "http" nar-source) (log-msg 'DEBUG "deleting temporary file " source-filename) (delete-file source-filename)) (let ((bytes (stat:size (stat dest-filename)))) (log-msg 'INFO "created " dest-filename) bytes))))