diff options
Diffstat (limited to 'nar-herder/cached-compression.scm')
-rw-r--r-- | nar-herder/cached-compression.scm | 753 |
1 files changed, 753 insertions, 0 deletions
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm new file mode 100644 index 0000000..5c257dc --- /dev/null +++ b/nar-herder/cached-compression.scm @@ -0,0 +1,753 @@ +;;; Nar Herder +;;; +;;; Copyright © 2022, 2023 Christopher Baines <mail@cbaines.net> +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; <http://www.gnu.org/licenses/>. + +(define-module (nar-herder cached-compression) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-19) + #:use-module (srfi srfi-26) + #:use-module (srfi srfi-71) + #:use-module (ice-9 ftw) + #:use-module (ice-9 match) + #:use-module (ice-9 atomic) + #:use-module (ice-9 threads) + #:use-module (logging logger) + #:use-module (prometheus) + #:use-module (fibers) + #:use-module (fibers timers) + #:use-module (fibers channels) + #:use-module (fibers operations) + #:use-module (web uri) + #:use-module (web client) + #:use-module (web response) + #:use-module (guix store) + #:use-module ((guix utils) #:select (compressed-file? + call-with-decompressed-port)) + #:use-module ((guix build utils) + #:select (dump-port mkdir-p)) + #:use-module (nar-herder utils) + #:use-module (nar-herder database) + #:export (start-cached-compression-management-fiber + start-cached-compression-removal-fiber + start-cached-compression-schedule-removal-fiber)) + +;; Nar caching overview +;; +;; On start +;; - Compute the size of each cached compression directory +;; - Remove database entries if they're missing from the directory +;; - Remove files if they're missing from the database +;; +;; On nar usage +;; - Bump count +;; - If count is sufficient +;; - Trigger generation of the cached nar +;; - Skip if the work queue already includes this job +;; - At the start of the job, check for a database entry and exit +;; early if one exists bumping the atime of the file +;; - If the file doesn't exist, check if there's free space +;; - If there's free space, create the file +;; +;; Nar removal fiber +;; - Periodically remove nars that have been scheduled for removal +;; (when the scheduled time passes) +;; +;; Nar schedule removal fiber +;; - Periodically check for nars that haven't been accessed in some +;; time and schedule them for removal + +(define (perform-cached-compression-startup database + enabled-cached-compressions + nar-cache-files) + (let* ((database-entries-missing-files + ;; List tracking entries in the database for cached files, + ;; where the file is missing from the disk. + ;; + ;; These database entries will be removed at the end of the + ;; startup procecss. + '()) + + ;; alist of compression to hash table of files + ;; + ;; Entries from the hash tables will be removed when the + ;; database entry is processed below, so these hash tables + ;; will be left reflecting files in the directories, but with + ;; no entry in the database. + ;; + ;; These files will be deleted at the end of the startup + ;; process. + (files-by-compression + (map + (match-lambda + ((compression . details) + (let ((result (make-hash-table))) + (for-each + (lambda (file) + (hash-set! result file #t)) + (scandir (assq-ref details 'directory) + (negate (cut member <> '("." ".."))))) + (cons compression + result)))) + enabled-cached-compressions)) + + (cached-bytes-by-compression + (database-fold-cached-narinfo-files + database + (lambda (details result) + (let ((compression + (assq-ref details 'compression)) + (filename + (store-path-base + (assq-ref details 'store-path)))) + + (let ((files-hash + (assq-ref files-by-compression compression))) + (if (hash-ref files-hash filename) + (begin + (hash-remove! files-hash filename) + + (metric-increment nar-cache-files + #:label-values + `((compression . ,compression))) + + `((,compression . ,(+ (assq-ref details 'size) + (or (assq-ref result compression) + 0))) + ,@(alist-delete compression result))) + + ;; Database entry, but file missing + (begin + (set! database-entries-missing-files + (cons details + database-entries-missing-files)) + result))))) + (map + (lambda (compression) + (cons compression 0)) + (map car enabled-cached-compressions))))) + + ;; Delete cached files with no database entries + (for-each + (match-lambda + ((compression . hash-table) + (let ((count (hash-count (const #t) + hash-table))) + (unless (= 0 count) + (let ((directory + (assq-ref + (assq-ref enabled-cached-compressions compression) + 'directory))) + (simple-format #t "deleting ~A cached files from ~A\n" + count + directory) + (hash-for-each + (lambda (filename _) + (delete-file (string-append directory "/" filename))) + hash-table)))))) + files-by-compression) + + ;; Delete database entries where the file is missing + (let ((count (length database-entries-missing-files))) + (unless (= 0 count) + (simple-format + #t + "deleting ~A cached_narinfo_files entries due to missing files\n" + count) + + (for-each + (lambda (details) + (database-remove-cached-narinfo-file database + (assq-ref details 'narinfo-id) + (symbol->string + (assq-ref details 'compression)))) + database-entries-missing-files))) + + cached-bytes-by-compression)) + +;; This fiber manages metadata around cached compressions, and +;; delegates tasks to the thread pool to generate newly compressed +;; nars +(define* (start-cached-compression-management-fiber + database + metrics-registry + nar-source + enabled-cached-compressions + cached-compression-min-uses + #:key (cached-compression-workers 2) + scheduler) + + (define nar-cache-bytes-metric + (make-gauge-metric metrics-registry + "nar_cache_size_bytes" + #:labels '(compression))) + + (define nar-cache-files + (make-gauge-metric metrics-registry + "nar_cache_files" + #:labels '(compression))) + + (define channel + (make-channel)) + + (let ((process-job + count-jobs + count-threads + list-jobs + (create-work-queue cached-compression-workers + (lambda (thunk) + (thunk)) + #:name "cached compression"))) + + (define (consider-narinfo cached-bytes-by-compression + usage-hash-table + narinfo-id) + (let* ((existing-cached-files + ;; This is important both to avoid trying to create + ;; files twice, but also in the case where additional + ;; compressions are enabled and more files need to be + ;; generated + (database-select-cached-narinfo-files-by-narinfo-id + database + narinfo-id)) + (existing-compressions + (map (lambda (details) + (assq-ref details 'compression)) + existing-cached-files)) + (missing-compressions + (lset-difference eq? + (map car enabled-cached-compressions) + existing-compressions)) + (narinfo-files + (database-select-narinfo-files-by-narinfo-id + database + narinfo-id)) + + (compress-file? + (let ((url (assq-ref (first narinfo-files) 'url))) + ;; TODO: Maybe this should be configurable? + (not (compressed-file? url))))) + + (when (and compress-file? + (not (null? missing-compressions))) + (let ((new-count + (let ((val (+ 1 + (or (hash-ref usage-hash-table narinfo-id) + 0)))) + (hash-set! usage-hash-table + narinfo-id + val) + val))) + + (when (and (> new-count + cached-compression-min-uses)) + (let* ((narinfo-details + (database-select-narinfo database narinfo-id)) + (nar-size + (assq-ref narinfo-details 'nar-size)) + (compressions-with-space + (filter + (lambda (compression) + (let ((directory-max-size + (assq-ref + (assq-ref enabled-cached-compressions + compression) + 'directory-max-size)) + (current-directory-size + (assq-ref cached-bytes-by-compression + compression))) + + (unless current-directory-size + (log-msg 'ERROR "current-directory-size unset: " + current-directory-size)) + (unless nar-size + (log-msg 'ERROR "nar-size unset: " + nar-size)) + + (if directory-max-size + (< (+ current-directory-size + ;; Assume the commpressed nar could be + ;; as big as the uncompressed nar + nar-size) + directory-max-size) + #t))) + missing-compressions))) + (for-each + (lambda (compression) + (spawn-fiber + (lambda () + (process-job + (lambda () + (let ((new-bytes + (make-compressed-nar + narinfo-files + nar-source + enabled-cached-compressions + compression + #:level (assq-ref + (assq-ref enabled-cached-compressions + compression) + 'level)))) + (put-message channel + (list 'cached-narinfo-added + narinfo-id + compression + new-bytes + #f)))))))) + compressions-with-space))))))) + + (spawn-fiber + (lambda () + (let ((initial-cached-bytes-by-compression + (perform-cached-compression-startup + database + enabled-cached-compressions + nar-cache-files)) + (nar-cached-compression-usage-hash-table + (make-hash-table 65536))) + + (for-each + (match-lambda + ((compression . bytes) + (metric-set + nar-cache-bytes-metric + bytes + #:label-values `((compression . ,compression))))) + initial-cached-bytes-by-compression) + + (let loop ((cached-bytes-by-compression + initial-cached-bytes-by-compression)) + (match (get-message channel) + (('narinfo-id . narinfo-id) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception considering narinfo (" + narinfo-id "): " exn) + #f) + (lambda () + (consider-narinfo cached-bytes-by-compression + nar-cached-compression-usage-hash-table + narinfo-id)) + #:unwind? #t) + (loop cached-bytes-by-compression)) + + (((and (or 'cached-narinfo-added 'cached-narinfo-removed) + action) + narinfo-id compression size reply) + (let ((updated-bytes + ((if (eq? action 'cached-narinfo-added) + + + -) + (or (assq-ref cached-bytes-by-compression + compression) + 0) + size))) + + (metric-set + nar-cache-bytes-metric + updated-bytes + #:label-values `((compression . ,compression))) + + ((if (eq? action 'cached-narinfo-added) + metric-increment + metric-decrement) + nar-cache-files + #:label-values `((compression . ,compression))) + + ;; Use an explicit transaction as it handles the + ;; database being busy, + (database-call-with-transaction + database + (lambda _ + (if (eq? action 'cached-narinfo-added) + (database-insert-cached-narinfo-file + database + narinfo-id + size + compression) + (let ((cached-narinfo-details + (database-select-cached-narinfo-file-by-narinfo-id-and-compression + database + narinfo-id + compression))) + + ;; It might not have been scheduled for + ;; removal, but remove any schedule that + ;; exists + (let ((schedule-removed? + (database-delete-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-details 'id)))) + (when schedule-removed? + (metric-decrement + (metrics-registry-fetch-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total")))) + + ;; Remove all the database entries first, as + ;; that'll stop these files appearing in narinfos + (database-remove-cached-narinfo-file + database + narinfo-id + (symbol->string compression)))))) + + (hash-remove! nar-cached-compression-usage-hash-table + narinfo-id) + + (when reply + (put-message reply #t)) + + (loop (alist-cons + compression + updated-bytes + (alist-delete compression + cached-bytes-by-compression))))))))) + scheduler) + + channel)) + +;; Periodically check for nars that haven't been accessed in some time +;; and schedule them for removal +(define (start-cached-compression-schedule-removal-fiber + database + metrics-registry + cached-compression-management-channel + enabled-cached-compressions + cached-compression-removal-fiber-wakeup-channel + base-ttl) + + (define (files-to-schedule-for-removal compression-details) + (let* ((directory (assq-ref compression-details 'directory)) + (unused-removal-duration + (assq-ref compression-details 'unused-removal-duration)) + (atime-threshold + (time-second + (subtract-duration (current-time) + unused-removal-duration)))) + (scandir + directory + (lambda (filename) + (and + (< (stat:atime (stat (string-append directory "/" filename))) + atime-threshold) + (not (member filename '("." "..")))))))) + + (define (schedule-removal compression compression-details) + (let ((files + (let ((files + (with-time-logging "files-to-schedule-for-removal" + (files-to-schedule-for-removal compression-details)))) + (log-msg 'INFO "cached-compression-schedule-removal-fiber " + "looking at " (length files) " files") + files)) + (count-metric + (metrics-registry-fetch-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total"))) + + (with-time-logging "inserting scheduled-cached-narinfo-removals" + (for-each + (lambda (file) + (let* ((cached-narinfo-file-details + (database-select-cached-narinfo-file-by-hash + database + (string-take file 32) ; hash part + compression)) + (existing-scheduled-removal + (database-select-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-file-details 'id)))) + (unless existing-scheduled-removal + (let ((removal-time + ;; The earliest this can be removed is the current + ;; time, plus the TTL + (add-duration + (current-time) + (make-time time-duration + 0 + (or (assq-ref compression-details 'ttl) + base-ttl))))) + (database-insert-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-file-details 'id) + removal-time) + + (metric-increment count-metric))))) + files)) + + ;; Wake the cached compression removal fiber in case one of + ;; the new scheduled removals is before it's scheduled to wake + ;; up + (put-message cached-compression-removal-fiber-wakeup-channel + #t))) + + (spawn-fiber + (lambda () + (let ((sleep-duration + (max + (* 60 60 6) ; Maybe this be confirgurable + (apply min + (map (lambda (compression-details) + (/ (time-second + (assq-ref compression-details + 'unused-removal-duration)) + 4)) + enabled-cached-compressions)))) + (start-count + (database-count-scheduled-cached-narinfo-removal + database))) + + (metric-set + (or (metrics-registry-fetch-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total") + (make-gauge-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total")) + start-count) + + (while #t + (log-msg 'INFO "cached-compression-schedule-removal-fiber starting pass") + + (for-each + (match-lambda + ((compression . details) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "cached-compression-schedule-removal-fiber: " + "exception: " exn)) + (lambda () + (schedule-removal compression details)) + #:unwind? #t))) + enabled-cached-compressions) + + (log-msg 'INFO "cached-compression-schedule-removal-fiber sleeping for " + sleep-duration) + (sleep sleep-duration)))))) + +;; Process the scheduled removals for cached nars +(define (start-cached-compression-removal-fiber + database + cached-compression-management-channel + enabled-cached-compressions) + + (define wakeup-channel + (make-channel)) + + (define (make-pass) + (log-msg 'INFO "cached-compression-removal-fiber starting pass") + + (let ((scheduled-cached-narinfo-removal + (database-select-oldest-scheduled-cached-narinfo-removal + database))) + + (log-msg 'INFO "scheduled removal: " scheduled-cached-narinfo-removal) + + (if scheduled-cached-narinfo-removal + (if (time<=? (assq-ref scheduled-cached-narinfo-removal + 'scheduled-removal-time) + (current-time)) + (let ((id (assq-ref scheduled-cached-narinfo-removal 'id)) + (narinfo-id (assq-ref scheduled-cached-narinfo-removal + 'narinfo-id)) + (compression (assq-ref scheduled-cached-narinfo-removal + 'compression)) + (size (assq-ref scheduled-cached-narinfo-removal + 'size)) + (store-path (assq-ref scheduled-cached-narinfo-removal + 'store-path))) + (let ((reply (make-channel))) + (put-message + cached-compression-management-channel + (list 'cached-narinfo-removed + narinfo-id + compression + size + reply)) + + ;; Wait for the management fiber to delete the + ;; database entry before removing the file. + (get-message reply)) + + (let ((directory + (assq-ref (assq-ref enabled-cached-compressions + compression) + 'directory))) + (let ((filename + (string-append + directory "/" + (basename store-path)))) + (log-msg 'INFO "deleting " filename) + (delete-file filename)))) + + (let ((duration + (time-difference + (assq-ref scheduled-cached-narinfo-removal + 'scheduled-removal-time) + (current-time)))) + (perform-operation + (choice-operation + (sleep-operation (max 1 (+ 1 (time-second duration)))) + (get-operation wakeup-channel))))) + + ;; Sleep until woken + (get-message wakeup-channel)))) + + (spawn-fiber + (lambda () + (while #t + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception in cached-compression-removal-fiber: " + exn)) + (lambda () + (with-throw-handler #t + make-pass + (lambda _ + (backtrace)))) + #:unwind? #t)))) + + wakeup-channel) + +(define* (make-compressed-nar narinfo-files + nar-source + enabled-cached-compressions + target-compression + #:key level) + (define cached-compression-details + (assq-ref enabled-cached-compressions target-compression)) + + (log-msg 'INFO "making " target-compression " for " + (uri-decode + (basename + (assq-ref (first narinfo-files) 'url)))) + + (let* ((source-narinfo-file + ;; There's no specific logic to this, it should be possible + ;; to use any file + (first narinfo-files)) + (source-filename + (cond + ((string-prefix? "http" nar-source) + (let* ((output-port (mkstemp "/tmp/nar-herder-source-nar-XXXXXX")) + (filename + (port-filename output-port)) + (uri + (string->uri + (string-append nar-source + (assq-ref source-narinfo-file 'url))))) + + (log-msg 'DEBUG "downloading " (uri->string uri)) + (with-exception-handler + (lambda (exn) + (close-port output-port) + (delete-file filename) + (raise-exception exn)) + (lambda () + (with-port-timeouts + (lambda () + (call-with-values + (lambda () + (let ((port + socket + (open-socket-for-uri* uri))) + (http-get uri + #:port port + #:decode-body? #f + #:streaming? #t))) + (lambda (response body) + (unless (= (response-code response) + 200) + (error "unknown response code" + (response-code response))) + + (dump-port body output-port))) + (close-port output-port)) + #:timeout 30)) + #:unwind? #t) + + filename)) + ((string-prefix? "/" nar-source) + (string-append + ;; If it's a filename, then it's the canonical path to + ;; the storage directory + nar-source + (uri-decode (assq-ref source-narinfo-file 'url)))) + (else + (error "unknown nar source"))))) + + (let* ((dest-directory + (assq-ref cached-compression-details + 'directory)) + (dest-filename + (string-append + dest-directory + "/" + (uri-decode + (basename + (assq-ref source-narinfo-file 'url))))) + (tmp-dest-filename + (string-append dest-filename ".tmp"))) + + (when (file-exists? tmp-dest-filename) + (delete-file tmp-dest-filename)) + (when (file-exists? dest-filename) + (delete-file dest-filename)) + + (mkdir-p dest-directory) + + (call-with-input-file + source-filename + (lambda (source-port) + (call-with-decompressed-port + (string->symbol + (assq-ref source-narinfo-file + 'compression)) + source-port + (lambda (decompressed-source-port) + (let ((call-with-compressed-output-port* + (match target-compression + ('gzip + (@ (zlib) call-with-gzip-output-port)) + ('lzip + (@ (lzlib) call-with-lzip-output-port)) + ('zstd + (@ (zstd) call-with-zstd-output-port)) + ('none + (lambda (port proc) + (proc port)))))) + (apply + call-with-compressed-output-port* + (open-output-file tmp-dest-filename) + (lambda (compressed-port) + (dump-port decompressed-source-port + compressed-port)) + (if level + `(#:level ,level) + '()))))))) + (rename-file + tmp-dest-filename + dest-filename) + + (when (string-prefix? "http" nar-source) + (log-msg 'DEBUG "deleting temporary file " source-filename) + (delete-file source-filename)) + + (let ((bytes + (stat:size (stat dest-filename)))) + (log-msg 'INFO "created " dest-filename) + + bytes)))) |