aboutsummaryrefslogtreecommitdiff
path: root/nar-herder/cached-compression.scm
diff options
context:
space:
mode:
Diffstat (limited to 'nar-herder/cached-compression.scm')
-rw-r--r--nar-herder/cached-compression.scm753
1 files changed, 753 insertions, 0 deletions
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm
new file mode 100644
index 0000000..5c257dc
--- /dev/null
+++ b/nar-herder/cached-compression.scm
@@ -0,0 +1,753 @@
+;;; Nar Herder
+;;;
+;;; Copyright © 2022, 2023 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (nar-herder cached-compression)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-19)
+ #:use-module (srfi srfi-26)
+ #:use-module (srfi srfi-71)
+ #:use-module (ice-9 ftw)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 threads)
+ #:use-module (logging logger)
+ #:use-module (prometheus)
+ #:use-module (fibers)
+ #:use-module (fibers timers)
+ #:use-module (fibers channels)
+ #:use-module (fibers operations)
+ #:use-module (web uri)
+ #:use-module (web client)
+ #:use-module (web response)
+ #:use-module (guix store)
+ #:use-module ((guix utils) #:select (compressed-file?
+ call-with-decompressed-port))
+ #:use-module ((guix build utils)
+ #:select (dump-port mkdir-p))
+ #:use-module (nar-herder utils)
+ #:use-module (nar-herder database)
+ #:export (start-cached-compression-management-fiber
+ start-cached-compression-removal-fiber
+ start-cached-compression-schedule-removal-fiber))
+
+;; Nar caching overview
+;;
+;; On start
+;; - Compute the size of each cached compression directory
+;; - Remove database entries if they're missing from the directory
+;; - Remove files if they're missing from the database
+;;
+;; On nar usage
+;; - Bump count
+;; - If count is sufficient
+;; - Trigger generation of the cached nar
+;; - Skip if the work queue already includes this job
+;; - At the start of the job, check for a database entry and exit
+;; early if one exists bumping the atime of the file
+;; - If the file doesn't exist, check if there's free space
+;; - If there's free space, create the file
+;;
+;; Nar removal fiber
+;; - Periodically remove nars that have been scheduled for removal
+;; (when the scheduled time passes)
+;;
+;; Nar schedule removal fiber
+;; - Periodically check for nars that haven't been accessed in some
+;; time and schedule them for removal
+
+(define (perform-cached-compression-startup database
+ enabled-cached-compressions
+ nar-cache-files)
+ (let* ((database-entries-missing-files
+ ;; List tracking entries in the database for cached files,
+ ;; where the file is missing from the disk.
+ ;;
+ ;; These database entries will be removed at the end of the
+ ;; startup procecss.
+ '())
+
+ ;; alist of compression to hash table of files
+ ;;
+ ;; Entries from the hash tables will be removed when the
+ ;; database entry is processed below, so these hash tables
+ ;; will be left reflecting files in the directories, but with
+ ;; no entry in the database.
+ ;;
+ ;; These files will be deleted at the end of the startup
+ ;; process.
+ (files-by-compression
+ (map
+ (match-lambda
+ ((compression . details)
+ (let ((result (make-hash-table)))
+ (for-each
+ (lambda (file)
+ (hash-set! result file #t))
+ (scandir (assq-ref details 'directory)
+ (negate (cut member <> '("." "..")))))
+ (cons compression
+ result))))
+ enabled-cached-compressions))
+
+ (cached-bytes-by-compression
+ (database-fold-cached-narinfo-files
+ database
+ (lambda (details result)
+ (let ((compression
+ (assq-ref details 'compression))
+ (filename
+ (store-path-base
+ (assq-ref details 'store-path))))
+
+ (let ((files-hash
+ (assq-ref files-by-compression compression)))
+ (if (hash-ref files-hash filename)
+ (begin
+ (hash-remove! files-hash filename)
+
+ (metric-increment nar-cache-files
+ #:label-values
+ `((compression . ,compression)))
+
+ `((,compression . ,(+ (assq-ref details 'size)
+ (or (assq-ref result compression)
+ 0)))
+ ,@(alist-delete compression result)))
+
+ ;; Database entry, but file missing
+ (begin
+ (set! database-entries-missing-files
+ (cons details
+ database-entries-missing-files))
+ result)))))
+ (map
+ (lambda (compression)
+ (cons compression 0))
+ (map car enabled-cached-compressions)))))
+
+ ;; Delete cached files with no database entries
+ (for-each
+ (match-lambda
+ ((compression . hash-table)
+ (let ((count (hash-count (const #t)
+ hash-table)))
+ (unless (= 0 count)
+ (let ((directory
+ (assq-ref
+ (assq-ref enabled-cached-compressions compression)
+ 'directory)))
+ (simple-format #t "deleting ~A cached files from ~A\n"
+ count
+ directory)
+ (hash-for-each
+ (lambda (filename _)
+ (delete-file (string-append directory "/" filename)))
+ hash-table))))))
+ files-by-compression)
+
+ ;; Delete database entries where the file is missing
+ (let ((count (length database-entries-missing-files)))
+ (unless (= 0 count)
+ (simple-format
+ #t
+ "deleting ~A cached_narinfo_files entries due to missing files\n"
+ count)
+
+ (for-each
+ (lambda (details)
+ (database-remove-cached-narinfo-file database
+ (assq-ref details 'narinfo-id)
+ (symbol->string
+ (assq-ref details 'compression))))
+ database-entries-missing-files)))
+
+ cached-bytes-by-compression))
+
+;; This fiber manages metadata around cached compressions, and
+;; delegates tasks to the thread pool to generate newly compressed
+;; nars
+(define* (start-cached-compression-management-fiber
+ database
+ metrics-registry
+ nar-source
+ enabled-cached-compressions
+ cached-compression-min-uses
+ #:key (cached-compression-workers 2)
+ scheduler)
+
+ (define nar-cache-bytes-metric
+ (make-gauge-metric metrics-registry
+ "nar_cache_size_bytes"
+ #:labels '(compression)))
+
+ (define nar-cache-files
+ (make-gauge-metric metrics-registry
+ "nar_cache_files"
+ #:labels '(compression)))
+
+ (define channel
+ (make-channel))
+
+ (let ((process-job
+ count-jobs
+ count-threads
+ list-jobs
+ (create-work-queue cached-compression-workers
+ (lambda (thunk)
+ (thunk))
+ #:name "cached compression")))
+
+ (define (consider-narinfo cached-bytes-by-compression
+ usage-hash-table
+ narinfo-id)
+ (let* ((existing-cached-files
+ ;; This is important both to avoid trying to create
+ ;; files twice, but also in the case where additional
+ ;; compressions are enabled and more files need to be
+ ;; generated
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+ (existing-compressions
+ (map (lambda (details)
+ (assq-ref details 'compression))
+ existing-cached-files))
+ (missing-compressions
+ (lset-difference eq?
+ (map car enabled-cached-compressions)
+ existing-compressions))
+ (narinfo-files
+ (database-select-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+
+ (compress-file?
+ (let ((url (assq-ref (first narinfo-files) 'url)))
+ ;; TODO: Maybe this should be configurable?
+ (not (compressed-file? url)))))
+
+ (when (and compress-file?
+ (not (null? missing-compressions)))
+ (let ((new-count
+ (let ((val (+ 1
+ (or (hash-ref usage-hash-table narinfo-id)
+ 0))))
+ (hash-set! usage-hash-table
+ narinfo-id
+ val)
+ val)))
+
+ (when (and (> new-count
+ cached-compression-min-uses))
+ (let* ((narinfo-details
+ (database-select-narinfo database narinfo-id))
+ (nar-size
+ (assq-ref narinfo-details 'nar-size))
+ (compressions-with-space
+ (filter
+ (lambda (compression)
+ (let ((directory-max-size
+ (assq-ref
+ (assq-ref enabled-cached-compressions
+ compression)
+ 'directory-max-size))
+ (current-directory-size
+ (assq-ref cached-bytes-by-compression
+ compression)))
+
+ (unless current-directory-size
+ (log-msg 'ERROR "current-directory-size unset: "
+ current-directory-size))
+ (unless nar-size
+ (log-msg 'ERROR "nar-size unset: "
+ nar-size))
+
+ (if directory-max-size
+ (< (+ current-directory-size
+ ;; Assume the commpressed nar could be
+ ;; as big as the uncompressed nar
+ nar-size)
+ directory-max-size)
+ #t)))
+ missing-compressions)))
+ (for-each
+ (lambda (compression)
+ (spawn-fiber
+ (lambda ()
+ (process-job
+ (lambda ()
+ (let ((new-bytes
+ (make-compressed-nar
+ narinfo-files
+ nar-source
+ enabled-cached-compressions
+ compression
+ #:level (assq-ref
+ (assq-ref enabled-cached-compressions
+ compression)
+ 'level))))
+ (put-message channel
+ (list 'cached-narinfo-added
+ narinfo-id
+ compression
+ new-bytes
+ #f))))))))
+ compressions-with-space)))))))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((initial-cached-bytes-by-compression
+ (perform-cached-compression-startup
+ database
+ enabled-cached-compressions
+ nar-cache-files))
+ (nar-cached-compression-usage-hash-table
+ (make-hash-table 65536)))
+
+ (for-each
+ (match-lambda
+ ((compression . bytes)
+ (metric-set
+ nar-cache-bytes-metric
+ bytes
+ #:label-values `((compression . ,compression)))))
+ initial-cached-bytes-by-compression)
+
+ (let loop ((cached-bytes-by-compression
+ initial-cached-bytes-by-compression))
+ (match (get-message channel)
+ (('narinfo-id . narinfo-id)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception considering narinfo ("
+ narinfo-id "): " exn)
+ #f)
+ (lambda ()
+ (consider-narinfo cached-bytes-by-compression
+ nar-cached-compression-usage-hash-table
+ narinfo-id))
+ #:unwind? #t)
+ (loop cached-bytes-by-compression))
+
+ (((and (or 'cached-narinfo-added 'cached-narinfo-removed)
+ action)
+ narinfo-id compression size reply)
+ (let ((updated-bytes
+ ((if (eq? action 'cached-narinfo-added)
+ +
+ -)
+ (or (assq-ref cached-bytes-by-compression
+ compression)
+ 0)
+ size)))
+
+ (metric-set
+ nar-cache-bytes-metric
+ updated-bytes
+ #:label-values `((compression . ,compression)))
+
+ ((if (eq? action 'cached-narinfo-added)
+ metric-increment
+ metric-decrement)
+ nar-cache-files
+ #:label-values `((compression . ,compression)))
+
+ ;; Use an explicit transaction as it handles the
+ ;; database being busy,
+ (database-call-with-transaction
+ database
+ (lambda _
+ (if (eq? action 'cached-narinfo-added)
+ (database-insert-cached-narinfo-file
+ database
+ narinfo-id
+ size
+ compression)
+ (let ((cached-narinfo-details
+ (database-select-cached-narinfo-file-by-narinfo-id-and-compression
+ database
+ narinfo-id
+ compression)))
+
+ ;; It might not have been scheduled for
+ ;; removal, but remove any schedule that
+ ;; exists
+ (let ((schedule-removed?
+ (database-delete-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-details 'id))))
+ (when schedule-removed?
+ (metric-decrement
+ (metrics-registry-fetch-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total"))))
+
+ ;; Remove all the database entries first, as
+ ;; that'll stop these files appearing in narinfos
+ (database-remove-cached-narinfo-file
+ database
+ narinfo-id
+ (symbol->string compression))))))
+
+ (hash-remove! nar-cached-compression-usage-hash-table
+ narinfo-id)
+
+ (when reply
+ (put-message reply #t))
+
+ (loop (alist-cons
+ compression
+ updated-bytes
+ (alist-delete compression
+ cached-bytes-by-compression)))))))))
+ scheduler)
+
+ channel))
+
+;; Periodically check for nars that haven't been accessed in some time
+;; and schedule them for removal
+(define (start-cached-compression-schedule-removal-fiber
+ database
+ metrics-registry
+ cached-compression-management-channel
+ enabled-cached-compressions
+ cached-compression-removal-fiber-wakeup-channel
+ base-ttl)
+
+ (define (files-to-schedule-for-removal compression-details)
+ (let* ((directory (assq-ref compression-details 'directory))
+ (unused-removal-duration
+ (assq-ref compression-details 'unused-removal-duration))
+ (atime-threshold
+ (time-second
+ (subtract-duration (current-time)
+ unused-removal-duration))))
+ (scandir
+ directory
+ (lambda (filename)
+ (and
+ (< (stat:atime (stat (string-append directory "/" filename)))
+ atime-threshold)
+ (not (member filename '("." ".."))))))))
+
+ (define (schedule-removal compression compression-details)
+ (let ((files
+ (let ((files
+ (with-time-logging "files-to-schedule-for-removal"
+ (files-to-schedule-for-removal compression-details))))
+ (log-msg 'INFO "cached-compression-schedule-removal-fiber "
+ "looking at " (length files) " files")
+ files))
+ (count-metric
+ (metrics-registry-fetch-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total")))
+
+ (with-time-logging "inserting scheduled-cached-narinfo-removals"
+ (for-each
+ (lambda (file)
+ (let* ((cached-narinfo-file-details
+ (database-select-cached-narinfo-file-by-hash
+ database
+ (string-take file 32) ; hash part
+ compression))
+ (existing-scheduled-removal
+ (database-select-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-file-details 'id))))
+ (unless existing-scheduled-removal
+ (let ((removal-time
+ ;; The earliest this can be removed is the current
+ ;; time, plus the TTL
+ (add-duration
+ (current-time)
+ (make-time time-duration
+ 0
+ (or (assq-ref compression-details 'ttl)
+ base-ttl)))))
+ (database-insert-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-file-details 'id)
+ removal-time)
+
+ (metric-increment count-metric)))))
+ files))
+
+ ;; Wake the cached compression removal fiber in case one of
+ ;; the new scheduled removals is before it's scheduled to wake
+ ;; up
+ (put-message cached-compression-removal-fiber-wakeup-channel
+ #t)))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((sleep-duration
+ (max
+ (* 60 60 6) ; Maybe this be confirgurable
+ (apply min
+ (map (lambda (compression-details)
+ (/ (time-second
+ (assq-ref compression-details
+ 'unused-removal-duration))
+ 4))
+ enabled-cached-compressions))))
+ (start-count
+ (database-count-scheduled-cached-narinfo-removal
+ database)))
+
+ (metric-set
+ (or (metrics-registry-fetch-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total")
+ (make-gauge-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total"))
+ start-count)
+
+ (while #t
+ (log-msg 'INFO "cached-compression-schedule-removal-fiber starting pass")
+
+ (for-each
+ (match-lambda
+ ((compression . details)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "cached-compression-schedule-removal-fiber: "
+ "exception: " exn))
+ (lambda ()
+ (schedule-removal compression details))
+ #:unwind? #t)))
+ enabled-cached-compressions)
+
+ (log-msg 'INFO "cached-compression-schedule-removal-fiber sleeping for "
+ sleep-duration)
+ (sleep sleep-duration))))))
+
+;; Process the scheduled removals for cached nars
+(define (start-cached-compression-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions)
+
+ (define wakeup-channel
+ (make-channel))
+
+ (define (make-pass)
+ (log-msg 'INFO "cached-compression-removal-fiber starting pass")
+
+ (let ((scheduled-cached-narinfo-removal
+ (database-select-oldest-scheduled-cached-narinfo-removal
+ database)))
+
+ (log-msg 'INFO "scheduled removal: " scheduled-cached-narinfo-removal)
+
+ (if scheduled-cached-narinfo-removal
+ (if (time<=? (assq-ref scheduled-cached-narinfo-removal
+ 'scheduled-removal-time)
+ (current-time))
+ (let ((id (assq-ref scheduled-cached-narinfo-removal 'id))
+ (narinfo-id (assq-ref scheduled-cached-narinfo-removal
+ 'narinfo-id))
+ (compression (assq-ref scheduled-cached-narinfo-removal
+ 'compression))
+ (size (assq-ref scheduled-cached-narinfo-removal
+ 'size))
+ (store-path (assq-ref scheduled-cached-narinfo-removal
+ 'store-path)))
+ (let ((reply (make-channel)))
+ (put-message
+ cached-compression-management-channel
+ (list 'cached-narinfo-removed
+ narinfo-id
+ compression
+ size
+ reply))
+
+ ;; Wait for the management fiber to delete the
+ ;; database entry before removing the file.
+ (get-message reply))
+
+ (let ((directory
+ (assq-ref (assq-ref enabled-cached-compressions
+ compression)
+ 'directory)))
+ (let ((filename
+ (string-append
+ directory "/"
+ (basename store-path))))
+ (log-msg 'INFO "deleting " filename)
+ (delete-file filename))))
+
+ (let ((duration
+ (time-difference
+ (assq-ref scheduled-cached-narinfo-removal
+ 'scheduled-removal-time)
+ (current-time))))
+ (perform-operation
+ (choice-operation
+ (sleep-operation (max 1 (+ 1 (time-second duration))))
+ (get-operation wakeup-channel)))))
+
+ ;; Sleep until woken
+ (get-message wakeup-channel))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception in cached-compression-removal-fiber: "
+ exn))
+ (lambda ()
+ (with-throw-handler #t
+ make-pass
+ (lambda _
+ (backtrace))))
+ #:unwind? #t))))
+
+ wakeup-channel)
+
+(define* (make-compressed-nar narinfo-files
+ nar-source
+ enabled-cached-compressions
+ target-compression
+ #:key level)
+ (define cached-compression-details
+ (assq-ref enabled-cached-compressions target-compression))
+
+ (log-msg 'INFO "making " target-compression " for "
+ (uri-decode
+ (basename
+ (assq-ref (first narinfo-files) 'url))))
+
+ (let* ((source-narinfo-file
+ ;; There's no specific logic to this, it should be possible
+ ;; to use any file
+ (first narinfo-files))
+ (source-filename
+ (cond
+ ((string-prefix? "http" nar-source)
+ (let* ((output-port (mkstemp "/tmp/nar-herder-source-nar-XXXXXX"))
+ (filename
+ (port-filename output-port))
+ (uri
+ (string->uri
+ (string-append nar-source
+ (assq-ref source-narinfo-file 'url)))))
+
+ (log-msg 'DEBUG "downloading " (uri->string uri))
+ (with-exception-handler
+ (lambda (exn)
+ (close-port output-port)
+ (delete-file filename)
+ (raise-exception exn))
+ (lambda ()
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (let ((port
+ socket
+ (open-socket-for-uri* uri)))
+ (http-get uri
+ #:port port
+ #:decode-body? #f
+ #:streaming? #t)))
+ (lambda (response body)
+ (unless (= (response-code response)
+ 200)
+ (error "unknown response code"
+ (response-code response)))
+
+ (dump-port body output-port)))
+ (close-port output-port))
+ #:timeout 30))
+ #:unwind? #t)
+
+ filename))
+ ((string-prefix? "/" nar-source)
+ (string-append
+ ;; If it's a filename, then it's the canonical path to
+ ;; the storage directory
+ nar-source
+ (uri-decode (assq-ref source-narinfo-file 'url))))
+ (else
+ (error "unknown nar source")))))
+
+ (let* ((dest-directory
+ (assq-ref cached-compression-details
+ 'directory))
+ (dest-filename
+ (string-append
+ dest-directory
+ "/"
+ (uri-decode
+ (basename
+ (assq-ref source-narinfo-file 'url)))))
+ (tmp-dest-filename
+ (string-append dest-filename ".tmp")))
+
+ (when (file-exists? tmp-dest-filename)
+ (delete-file tmp-dest-filename))
+ (when (file-exists? dest-filename)
+ (delete-file dest-filename))
+
+ (mkdir-p dest-directory)
+
+ (call-with-input-file
+ source-filename
+ (lambda (source-port)
+ (call-with-decompressed-port
+ (string->symbol
+ (assq-ref source-narinfo-file
+ 'compression))
+ source-port
+ (lambda (decompressed-source-port)
+ (let ((call-with-compressed-output-port*
+ (match target-compression
+ ('gzip
+ (@ (zlib) call-with-gzip-output-port))
+ ('lzip
+ (@ (lzlib) call-with-lzip-output-port))
+ ('zstd
+ (@ (zstd) call-with-zstd-output-port))
+ ('none
+ (lambda (port proc)
+ (proc port))))))
+ (apply
+ call-with-compressed-output-port*
+ (open-output-file tmp-dest-filename)
+ (lambda (compressed-port)
+ (dump-port decompressed-source-port
+ compressed-port))
+ (if level
+ `(#:level ,level)
+ '())))))))
+ (rename-file
+ tmp-dest-filename
+ dest-filename)
+
+ (when (string-prefix? "http" nar-source)
+ (log-msg 'DEBUG "deleting temporary file " source-filename)
+ (delete-file source-filename))
+
+ (let ((bytes
+ (stat:size (stat dest-filename))))
+ (log-msg 'INFO "created " dest-filename)
+
+ bytes))))