aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2024-03-24 09:30:06 +0000
committerChristopher Baines <mail@cbaines.net>2024-03-25 14:00:41 +0000
commita1b49b3b45e5484cb93419be3711b6ab85495bee (patch)
tree973269e542b4e92705ca19fae2360b2eeec5cf11
parenta865c013ddc5ab7a20dfef75cb3a776ea9ccfe16 (diff)
downloadnar-herder-a1b49b3b45e5484cb93419be3711b6ab85495bee.tar
nar-herder-a1b49b3b45e5484cb93419be3711b6ab85495bee.tar.gz
Rework the cached compressions system
The initial implementation was flawed since guix assumes that any compression mentioned in the narinfo will be available for the lifetime of the narinfo, and the nar-herder was deleting cached compressions without taking this in to account. This commit adds support for scheduling the removal of a cached compression and this schedule is used to inform the TTLs for narinfos. I'm unsure of the value in caching narinfos so maybe some of this complexity can be removed in the future.
-rw-r--r--nar-herder/cached-compression.scm622
-rw-r--r--nar-herder/database.scm287
-rw-r--r--nar-herder/server.scm162
-rw-r--r--scripts/nar-herder.in68
4 files changed, 829 insertions, 310 deletions
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm
index e4fa52f..40dfcd1 100644
--- a/nar-herder/cached-compression.scm
+++ b/nar-herder/cached-compression.scm
@@ -18,6 +18,7 @@
(define-module (nar-herder cached-compression)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-19)
#:use-module (srfi srfi-26)
#:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
@@ -26,6 +27,10 @@
#:use-module (ice-9 threads)
#:use-module (logging logger)
#:use-module (prometheus)
+ #:use-module (fibers)
+ #:use-module (fibers timers)
+ #:use-module (fibers channels)
+ #:use-module (fibers operations)
#:use-module (web uri)
#:use-module (web client)
#:use-module (web response)
@@ -36,7 +41,9 @@
#:select (dump-port mkdir-p))
#:use-module (nar-herder utils)
#:use-module (nar-herder database)
- #:export (make-maybe-trigger-creation-of-compressed-nars))
+ #:export (start-cached-compression-management-fiber
+ start-cached-compression-removal-fiber
+ start-cached-compression-schedule-removal-fiber))
;; Nar caching overview
;;
@@ -44,7 +51,6 @@
;; - Compute the size of each cached compression directory
;; - Remove database entries if they're missing from the directory
;; - Remove files if they're missing from the database
-;; - Remove (random TODO) files if the size exceeds the max
;;
;; On nar usage
;; - Bump count
@@ -52,9 +58,17 @@
;; - Trigger generation of the cached nar
;; - Skip if the work queue already includes this job
;; - At the start of the job, check for a database entry and exit
-;; early if one exists
-;; - Update the size of the cached compression directory
-;; - If the size exceeds the max, remove (random TODO) files
+;; early if one exists bumping the atime of the file
+;; - If the file doesn't exist, check if there's free space
+;; - If there's free space, create the file
+;;
+;; Nar removal fiber
+;; - Periodically remove nars that have been scheduled for removal
+;; (when the scheduled time passes)
+;;
+;; Nar schedule removal fiber
+;; - Periodically check for nars that haven't been accessed in some
+;; time and schedule them for removal
(define (perform-cached-compression-startup database
enabled-cached-compressions)
@@ -159,164 +173,38 @@
cached-bytes-by-compression))
-(define (maybe-remove-cached-files-for-compression database
- enabled-cached-compressions
- compression
- cached-bytes-by-compression-box
- nar-cache-bytes-metric)
- (let* ((compression-details
- (assq-ref enabled-cached-compressions
- compression))
- (max-size
- (assq-ref compression-details
- 'directory-max-size)))
- (when max-size
- (let ((current-size
- (assq-ref
- (atomic-box-ref cached-bytes-by-compression-box)
- compression)))
- (when (> current-size max-size)
- (let ((bytes-to-remove
- (- current-size max-size)))
- (log-msg 'DEBUG
- "looking to remove " bytes-to-remove " bytes of "
- compression " compressed nars")
-
- (match
- (database-fold-cached-narinfo-files
- database
- (lambda (details result)
- (match result
- (#(finished? bytes files-to-remove)
- (if finished?
- result
- (if (eq? (assq-ref details 'compression)
- compression)
- (let ((new-bytes
- (+ bytes
- (assq-ref details 'size))))
- ;; finished if enough bytes are going
- ;; to be removed
- (vector (>= new-bytes bytes-to-remove)
- new-bytes
- (cons details files-to-remove)))
- result)))))
- #(#f 0 ()))
- (#(#t bytes-for-removal files-to-remove-details)
- (log-msg 'DEBUG "removing " (length files-to-remove-details)
- " " compression " compressed nars from the cache")
-
- ;; Use an explicit transaction as it handles the
- ;; database being busy,
- (database-call-with-transaction
- database
- (lambda _
- (for-each
- (lambda (details)
- ;; Remove all the database entries first, as
- ;; that'll stop these files appearing in narinfos
- (database-remove-cached-narinfo-file
- database
- (assq-ref details 'narinfo-id)
- (symbol->string compression)))
- files-to-remove-details)))
-
- (let ((directory
- (assq-ref compression-details 'directory)))
- (for-each
- (lambda (details)
- (let ((filename
- (string-append
- directory "/"
- (basename (assq-ref details 'store-path)))))
- (log-msg 'DEBUG "deleting " filename)
- (delete-file filename)))
- files-to-remove-details)
-
- (let* ((cached-bytes-by-compression
- (atomic-box-ref cached-bytes-by-compression-box))
- (new-cached-bytes
- (- (assq-ref cached-bytes-by-compression
- compression)
- bytes-for-removal)))
- (atomic-box-set!
- cached-bytes-by-compression-box
- (alist-cons
- compression
- new-cached-bytes
- (alist-delete
- compression
- cached-bytes-by-compression)))
-
- (metric-set
- nar-cache-bytes-metric
- new-cached-bytes
- #:label-values `((compression . ,compression)))
-
- (log-msg 'DEBUG "finished removing " bytes-for-removal
- " bytes of " compression
- " cached nars from " directory
- " (new size " new-cached-bytes ")")))))))))))
-
-(define* (make-maybe-trigger-creation-of-compressed-nars
+;; This fiber manages metadata around cached compressions, and
+;; delegates tasks to the thread pool to generate newly compressed
+;; nars
+(define* (start-cached-compression-management-fiber
database
metrics-registry
nar-source
enabled-cached-compressions
cached-compression-min-uses
- #:key (cached-compression-workers 2) scheduler)
+ #:key (cached-compression-workers 2)
+ scheduler)
(define nar-cache-bytes-metric
(make-gauge-metric metrics-registry
"nar_cache_size_bytes"
#:labels '(compression)))
- (let ((consider-nar-request-channel
- (make-worker-thread-set
- (lambda ()
- (let ((cached-bytes-by-compression-box
- (make-atomic-box #f)))
- (atomic-box-set!
- cached-bytes-by-compression-box
- (perform-cached-compression-startup database
- enabled-cached-compressions))
-
- (for-each
- (match-lambda
- ((compression . bytes)
- (metric-set
- nar-cache-bytes-metric
- bytes
- #:label-values `((compression . ,compression)))))
- (atomic-box-ref cached-bytes-by-compression-box))
-
- ;; Remove cached files if the max size is exceeded
- (for-each
- (match-lambda
- ((compression . _)
- (maybe-remove-cached-files-for-compression
- database
- enabled-cached-compressions
- compression
- cached-bytes-by-compression-box
- nar-cache-bytes-metric)))
- (atomic-box-ref cached-bytes-by-compression-box))
-
- (list cached-bytes-by-compression-box)))
- #:name "comp nar req"
- ;; Just make one thread, as this thread won't do much work
- ;; and relies on a hash table that shouldn't be accessed by
- ;; multiple threads
- #:parallelism 1)))
-
- (define with-usage-hash-table
- (let ((nar-cached-compression-usage-hash-table
- (make-hash-table 65536)))
- (lambda (proc)
- (monitor
- (proc nar-cached-compression-usage-hash-table)))))
-
- (define (proc narinfo-id cached-bytes-by-compression-box)
+ (define channel
+ (make-channel))
+
+ (let ((process-job
+ count-jobs
+ count-threads
+ list-jobs
+ (create-work-queue cached-compression-workers
+ (lambda (thunk)
+ (thunk))
+ #:name "cached compression")))
+
+ (define (consider-narinfo cached-bytes-by-compression
+ usage-hash-table
+ narinfo-id)
(let* ((existing-cached-files
;; This is important both to avoid trying to create
;; files twice, but also in the case where additional
@@ -333,120 +221,342 @@
(lset-difference eq?
(map car enabled-cached-compressions)
existing-compressions))
- (compress-file?
- (let ((url
- (assq-ref
- (first
- (database-select-narinfo-files-by-narinfo-id
- database
- narinfo-id))
- 'url)))
+ (narinfo-files
+ (database-select-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+ (compress-file?
+ (let ((url (assq-ref (first narinfo-files) 'url)))
;; TODO: Maybe this should be configurable?
(not (compressed-file? url)))))
(when (and compress-file?
(not (null? missing-compressions)))
(let ((new-count
- (with-usage-hash-table
- (lambda (usage)
- (let ((val (+ 1
- (or (hash-ref usage narinfo-id)
- 0))))
- (hash-set! usage
- narinfo-id
- val)
- val)))))
-
- (when (> new-count
- cached-compression-min-uses)
- (for-each
- (lambda (missing-compression)
- (let ((new-bytes
- (make-compressed-nar
- database
- nar-source
- enabled-cached-compressions
- narinfo-id
- missing-compression
- #:level (assq-ref
- (assq-ref enabled-cached-compressions
- missing-compression)
- 'level))))
-
- ;; Do this here, after creating the new cached nar,
- ;; just in case there's a lack of space
- (monitor
- (let* ((cached-bytes-by-compression
- (atomic-box-ref
- cached-bytes-by-compression-box))
- (updated-bytes-for-compression
- (+ (assq-ref cached-bytes-by-compression
- missing-compression)
- new-bytes)))
-
- (metric-set
- nar-cache-bytes-metric
- updated-bytes-for-compression
- #:label-values `((compression . ,missing-compression)))
-
- (atomic-box-set!
- cached-bytes-by-compression-box
- (alist-cons
- missing-compression
- updated-bytes-for-compression
- (alist-delete
- missing-compression
- cached-bytes-by-compression)))
-
- (maybe-remove-cached-files-for-compression
- database
- enabled-cached-compressions
- missing-compression
- cached-bytes-by-compression-box
- nar-cache-bytes-metric)))))
- missing-compressions)
-
- (with-usage-hash-table
- (lambda (usage)
- (hash-remove! usage narinfo-id))))))))
-
- (let ((process-job
- count-jobs
- count-threads
- list-jobs
- (create-work-queue cached-compression-workers
- proc
- #:name "cached compression")))
-
- (lambda (narinfo-id)
- (spawn-fiber
+ (let ((val (+ 1
+ (or (hash-ref usage-hash-table narinfo-id)
+ 0))))
+ (hash-set! usage-hash-table
+ narinfo-id
+ val)
+ val)))
+
+ (when (and (> new-count
+ cached-compression-min-uses))
+ (let* ((narinfo-details
+ (database-select-narinfo database narinfo-id))
+ (nar-size
+ (assq-ref narinfo-details 'nar-size))
+ (compressions-with-space
+ (filter
+ (lambda (compression)
+ (let ((directory-max-size
+ (assq-ref
+ (assq-ref enabled-cached-compressions
+ compression)
+ 'directory-max-size))
+ (current-directory-size
+ (assq-ref cached-bytes-by-compression
+ compression)))
+ (if directory-max-size
+ (< (+ current-directory-size
+ ;; Assume the commpressed nar could be
+ ;; as big as the uncompressed nar
+ nar-size)
+ directory-max-size)
+ #t)))
+ missing-compressions)))
+ (for-each
+ (lambda (compression)
+ (spawn-fiber
+ (lambda ()
+ (process-job
+ (lambda ()
+ (let ((new-bytes
+ (make-compressed-nar
+ narinfo-files
+ nar-source
+ enabled-cached-compressions
+ compression
+ #:level (assq-ref
+ (assq-ref enabled-cached-compressions
+ compression)
+ 'level))))
+ (put-message channel
+ (list 'cached-narinfo-added
+ narinfo-id
+ compression
+ new-bytes))))))))
+ compressions-with-space)))))))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((initial-cached-bytes-by-compression
+ (perform-cached-compression-startup
+ database
+ enabled-cached-compressions))
+ (nar-cached-compression-usage-hash-table
+ (make-hash-table 65536)))
+
+ (for-each
+ (match-lambda
+ ((compression . bytes)
+ (metric-set
+ nar-cache-bytes-metric
+ bytes
+ #:label-values `((compression . ,compression)))))
+ initial-cached-bytes-by-compression)
+
+ (let loop ((cached-bytes-by-compression
+ initial-cached-bytes-by-compression))
+ (match (get-message channel)
+ (('narinfo-id . narinfo-id)
+ (consider-narinfo cached-bytes-by-compression
+ nar-cached-compression-usage-hash-table
+ narinfo-id)
+ (loop cached-bytes-by-compression))
+
+ (((and (or 'cached-narinfo-added 'cached-narinfo-removed)
+ action)
+ narinfo-id compression size)
+ (let ((updated-bytes
+ ((if (eq? action 'cached-narinfo-added)
+ +
+ -)
+ (or (assq-ref cached-bytes-by-compression
+ compression)
+ 0)
+ size)))
+
+ (metric-set
+ nar-cache-bytes-metric
+ updated-bytes
+ #:label-values `((compression . ,compression)))
+
+ (when (eq? action 'cached-narinfo-added)
+ (database-insert-cached-narinfo-file
+ database
+ narinfo-id
+ size
+ compression)
+
+ (hash-remove! nar-cached-compression-usage-hash-table
+ narinfo-id))
+
+ (loop (alist-cons
+ cached-bytes-by-compression
+ updated-bytes
+ (alist-delete compression
+ cached-bytes-by-compression)))))))))
+ scheduler)
+
+ channel))
+
+;; Periodically check for nars that haven't been accessed in some time
+;; and schedule them for removal
+(define (start-cached-compression-schedule-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions
+ cached-compression-removal-fiber-wakeup-channel)
+
+ (define (files-to-schedule-for-removal compression-details)
+ (let* ((directory (assq-ref compression-details 'directory))
+ (unused-removal-duration
+ (assq-ref compression-details 'unused-removal-duration))
+ (atime-threshold
+ (time-second
+ (subtract-duration (current-time)
+ unused-removal-duration))))
+ (scandir
+ directory
+ (lambda (filename)
+ (and
+ (< (stat:atime (stat (string-append directory "/" filename)))
+ atime-threshold)
+ (not (member filename '("." ".."))))))))
+
+ (define (schedule-removal compression compression-details)
+ (let* ((files (files-to-schedule-for-removal compression-details))
+ (all-cached-narinfo-file-details
+ (map
+ (lambda (file)
+ (database-select-cached-narinfo-file-by-hash
+ database
+ (string-take file 32) ; hash part
+ compression))
+ files))
+ (existing-scheduled-removals
+ (map
+ (lambda (cached-narinfo-file-details)
+ (database-select-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-file-details 'id)))
+ all-cached-narinfo-file-details)))
+
+ (for-each
+ (lambda (file cached-narinfo-file-details existing-scheduled-removal)
+ (unless existing-scheduled-removal
+ (let ((removal-time
+ ;; The earliest this can be removed is the current
+ ;; time, plus the TTL
+ (add-duration
+ (current-time)
+ (make-time time-duration
+ 0
+ (assq-ref compression-details 'ttl)))))
+ (database-insert-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-file-details 'id)
+ removal-time))))
+ files
+ all-cached-narinfo-file-details
+ existing-scheduled-removals)
+
+ (when (any not existing-scheduled-removals)
+ ;; Wake the cached compression removal fiber in case one of
+ ;; the new scheduled removals is before it's scheduled to wake
+ ;; up
+ (put-message
+ cached-compression-removal-fiber-wakeup-channel
+ #t))))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((sleep-duration
+ (apply min
+ (map (lambda (compression-details)
+ (/ (time-second
+ (assq-ref compression-details
+ 'unused-removal-duration))
+ 4))
+ enabled-cached-compressions))))
+ (while #t
+ (log-msg 'DEBUG "cached-compression-schedule-removal-fiber starting pass")
+
+ (for-each
+ (match-lambda
+ ((compression . details)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "cached-compression-schedule-removal-fiber: "
+ "exception: " exn))
+ (lambda ()
+ (schedule-removal compression details))
+ #:unwind? #t)))
+ enabled-cached-compressions)
+
+ (log-msg 'DEBUG "cached-compression-schedule-removal-fiber sleeping for "
+ sleep-duration)
+ (sleep sleep-duration))))))
+
+;; Process the scheduled removals for cached nars
+(define (start-cached-compression-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions)
+
+ (define (remove id narinfo-id compression store-path)
+ ;; Use an explicit transaction as it handles the
+ ;; database being busy,
+ (database-call-with-transaction
+ database
+ (lambda _
+ (database-delete-scheduled-cached-narinfo-removal
+ database
+ id)
+
+ ;; Remove all the database entries first, as
+ ;; that'll stop these files appearing in narinfos
+ (database-remove-cached-narinfo-file
+ database
+ narinfo-id
+ (symbol->string compression))))
+
+ (let ((directory
+ (assq-ref (assq-ref enabled-cached-compressions
+ compression)
+ 'directory)))
+ (let ((filename
+ (string-append
+ directory "/"
+ (basename store-path))))
+ (log-msg 'DEBUG "deleting " filename)
+ (delete-file filename))))
+
+ (define wakeup-channel
+ (make-channel))
+
+ (define (make-pass)
+ (log-msg 'DEBUG "cached-compression-removal-fiber starting pass")
+
+ (let ((scheduled-cached-narinfo-removal
+ (database-select-oldest-scheduled-cached-narinfo-removal
+ database)))
+
+ (log-msg 'DEBUG "scheduled removal: " scheduled-cached-narinfo-removal)
+
+ (if scheduled-cached-narinfo-removal
+ (if (time<=? (assq-ref scheduled-cached-narinfo-removal
+ 'scheduled-removal-time)
+ (current-time))
+ (let ((id (assq-ref scheduled-cached-narinfo-removal 'id))
+ (narinfo-id (assq-ref scheduled-cached-narinfo-removal
+ 'narinfo-id))
+ (compression (assq-ref scheduled-cached-narinfo-removal
+ 'compression))
+ (size (assq-ref scheduled-cached-narinfo-removal
+ 'size))
+ (store-path (assq-ref scheduled-cached-narinfo-removal
+ 'store-path)))
+ (remove id narinfo-id compression store-path)
+
+ (put-message
+ cached-compression-management-channel
+ (list 'cached-narinfo-removed
+ narinfo-id
+ compression
+ size)))
+
+ (let ((duration
+ (time-difference
+ (assq-ref scheduled-cached-narinfo-removal
+ 'scheduled-removal-time)
+ (current-time))))
+ (perform-operation
+ (choice-operation
+ (sleep-operation (max 1 (+ 1 (time-second duration))))
+ (get-operation wakeup-channel)))))
+
+ ;; Sleep until woken
+ (get-message wakeup-channel))))
+
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception in cached-compression-removal-fiber: "
+ exn))
(lambda ()
- (call-with-worker-thread
- consider-nar-request-channel
- (lambda (cached-bytes-by-compression-box)
- (let ((in-progress-narinfo-ids
- (map car (list-jobs))))
-
- (unless (member narinfo-id in-progress-narinfo-ids)
- (process-job narinfo-id cached-bytes-by-compression-box)))
- #t)))
- scheduler
- #:parallel? #t)))))
-
-(define* (make-compressed-nar database
+ (with-throw-handler #t
+ make-pass
+ (lambda _
+ (backtrace))))
+ #:unwind? #t))))
+
+ wakeup-channel)
+
+(define* (make-compressed-nar narinfo-files
nar-source
enabled-cached-compressions
- narinfo-id
target-compression
#:key level)
(define cached-compression-details
(assq-ref enabled-cached-compressions target-compression))
- (define narinfo-files
- (database-select-narinfo-files-by-narinfo-id database
- narinfo-id))
-
(log-msg 'DEBUG "making " target-compression " for "
(uri-decode
(basename
@@ -565,16 +675,6 @@
(let ((bytes
(stat:size (stat dest-filename))))
-
- (database-call-with-transaction
- database
- (lambda _
- (database-insert-cached-narinfo-file
- database
- narinfo-id
- bytes
- (symbol->string target-compression))))
-
(log-msg 'DEBUG "created " dest-filename)
bytes))))
diff --git a/nar-herder/database.scm b/nar-herder/database.scm
index c8171a3..98c29d5 100644
--- a/nar-herder/database.scm
+++ b/nar-herder/database.scm
@@ -43,6 +43,7 @@
database-insert-narinfo
database-remove-narinfo
+ database-select-narinfo
database-select-narinfo-by-hash
database-select-narinfo-contents-by-hash
@@ -64,7 +65,13 @@
database-select-cached-narinfo-file-by-hash
database-select-cached-narinfo-files-by-narinfo-id
database-fold-cached-narinfo-files
- database-remove-cached-narinfo-file))
+ database-remove-cached-narinfo-file
+
+ database-select-scheduled-narinfo-removal
+ database-select-scheduled-cached-narinfo-removal
+ database-delete-scheduled-cached-narinfo-removal
+ database-select-oldest-scheduled-cached-narinfo-removal
+ database-insert-scheduled-cached-narinfo-removal))
(define-record-type <database>
(make-database database-file reader-thread-channel writer-thread-channel
@@ -145,7 +152,17 @@ CREATE TABLE cached_narinfo_files (
);
CREATE INDEX cached_narinfo_files_narinfo_id
- ON cached_narinfo_files (narinfo_id);")
+ ON cached_narinfo_files (narinfo_id);
+
+CREATE TABLE scheduled_narinfo_removal (
+ narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id),
+ removal_datetime TEXT NOT NULL
+);
+
+CREATE TABLE scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id),
+ removal_datetime TEXT NOT NULL
+);")
(sqlite-exec db schema))
@@ -206,6 +223,24 @@ CREATE INDEX cached_narinfo_files_narinfo_id
db
"ALTER TABLE narinfos ADD COLUMN added_at TEXT;"))
+ (unless (table-exists? db "scheduled_narinfo_removal")
+ (sqlite-exec
+ db
+ "
+CREATE TABLE scheduled_narinfo_removal (
+ narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id),
+ removal_datetime TEXT NOT NULL
+);"))
+
+ (unless (table-exists? db "scheduled_cached_narinfo_removal")
+ (sqlite-exec
+ db
+ "
+CREATE TABLE scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id),
+ removal_datetime TEXT NOT NULL
+);"))
+
(sqlite-exec
db
"CREATE INDEX IF NOT EXISTS narinfo_tags_narinfo_id
@@ -835,6 +870,38 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id"
#t)
#f)))))
+(define (database-select-narinfo database id)
+ (call-with-time-tracking
+ database
+ "select_narinfo"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT store_path, nar_hash, nar_size, deriver, system
+FROM narinfos
+WHERE id = :id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:id id)
+
+ (match (let ((result (sqlite-step statement)))
+ (sqlite-reset statement)
+ result)
+ (#(store_path nar_hash nar_size deriver system)
+ `((store-path . ,store_path)
+ (nar-hash . ,nar_hash)
+ (nar-size . ,nar_size)
+ (deriver . ,deriver)
+ (system . ,system)))
+ (_
+ #f))))))))
+
(define (database-select-narinfo-by-hash database hash)
(call-with-time-tracking
database
@@ -1175,7 +1242,7 @@ INSERT INTO cached_narinfo_files (
statement
#:narinfo_id narinfo-id
#:size size
- #:compression compression)
+ #:compression (symbol->string compression))
(sqlite-step statement)
(sqlite-reset statement)
@@ -1196,7 +1263,7 @@ INSERT INTO cached_narinfo_files (
(sqlite-prepare
db
"
-SELECT cached_narinfo_files.size
+SELECT cached_narinfo_files.id, cached_narinfo_files.size
FROM narinfos
INNER JOIN cached_narinfo_files
ON cached_narinfo_files.narinfo_id = narinfos.id
@@ -1207,12 +1274,13 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash
(sqlite-bind-arguments
statement
#:hash hash
- #:compression compression)
+ #:compression (symbol->string compression))
(let ((result
(match (sqlite-step statement)
- (#(size)
- `((size . ,size)))
+ (#(id size)
+ `((id . ,id)
+ (size . ,size)))
(#f #f))))
(sqlite-reset statement)
@@ -1232,7 +1300,10 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash
(sqlite-prepare
db
"
-SELECT store_path, size, compression
+SELECT cached_narinfo_files.id,
+ narinfos.store_path,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression
FROM cached_narinfo_files
INNER JOIN narinfos
ON cached_narinfo_files.narinfo_id = narinfos.id
@@ -1246,8 +1317,9 @@ WHERE narinfo_id = :narinfo_id"
(let ((result
(sqlite-map
(match-lambda
- (#(store_path size compression)
- `((store-path . ,store_path)
+ (#(id store_path size compression)
+ `((id . ,id)
+ (store-path . ,store_path)
(size . ,size)
(compression . ,(string->symbol compression)))))
statement)))
@@ -1255,6 +1327,48 @@ WHERE narinfo_id = :narinfo_id"
result)))))))
+(define (database-select-cached-narinfo-file-by-narinfo-id-and-compression
+ database
+ narinfo-id
+ compression)
+ (call-with-time-tracking
+ database
+ "select_cached_narinfo_file_by_narinfo_id_and_compression"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id,
+ narinfos.store_path,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression
+FROM cached_narinfo_files
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+WHERE narinfo_id = :narinfo_id
+ AND compression = :compression"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:compression (symbol->string compression))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(id store_path size compression)
+ `((id . ,id)
+ (store-path . ,store_path)
+ (size . ,size)
+ (compression . ,(string->symbol compression)))))))
+ (sqlite-reset statement)
+
+ result)))))))
+
(define (database-fold-cached-narinfo-files database
proc
init)
@@ -1306,3 +1420,156 @@ WHERE narinfo_id = :narinfo_id
(sqlite-step statement)
(sqlite-reset statement)))))
+
+(define (database-select-scheduled-narinfo-removal database narinfo-id)
+ (call-with-time-tracking
+ database
+ "select_scheduled_narinfo_removal"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT removal_datetime
+FROM scheduled_narinfo_removal
+WHERE narinfo_id = :narinfo_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(datetime)
+ (date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))))
+
+(define (database-select-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id)
+ (call-with-time-tracking
+ database
+ "select_scheduled_narinfo_removal"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT removal_datetime
+FROM scheduled_cached_narinfo_removal
+WHERE cached_narinfo_file_id = :cached_narinfo_file_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(datetime)
+ (date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))))
+
+(define (database-delete-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+DELETE FROM scheduled_cached_narinfo_removal
+WHERE cached_narinfo_file_id = :cached_narinfo_file_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id)
+
+ (sqlite-step statement)
+ (sqlite-reset statement)
+
+ #t))))
+
+(define (database-select-oldest-scheduled-cached-narinfo-removal database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id,
+ cached_narinfo_files.narinfo_id,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression,
+ narinfos.store_path,
+ scheduled_cached_narinfo_removal.removal_datetime
+FROM scheduled_cached_narinfo_removal
+INNER JOIN cached_narinfo_files
+ ON scheduled_cached_narinfo_removal.cached_narinfo_file_id =
+ cached_narinfo_files.id
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+ORDER BY scheduled_cached_narinfo_removal.removal_datetime DESC
+LIMIT 1"
+ #:cache? #t)))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(id narinfo_id size compression store_path datetime)
+ `((id . ,id)
+ (narinfo-id . ,narinfo_id)
+ (size . ,size)
+ (compression . ,(string->symbol compression))
+ (store-path . ,store_path)
+ (scheduled-removal-time . ,(date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))
+
+(define (database-insert-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id
+ removal-datetime)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id, removal_datetime
+) VALUES (
+ :cached_narinfo_file_id, :removal_datetime
+)"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id
+ #:removal_datetime (date->string
+ (time-utc->date removal-datetime)
+ "~Y-~m-~d ~H:~M:~S"))
+
+ (sqlite-step statement)
+ (sqlite-reset statement)
+
+ #t))))
diff --git a/nar-herder/server.scm b/nar-herder/server.scm
index ff3f131..583b4a3 100644
--- a/nar-herder/server.scm
+++ b/nar-herder/server.scm
@@ -19,6 +19,7 @@
(define-module (nar-herder server)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-11)
+ #:use-module (srfi srfi-19)
#:use-module (srfi srfi-34)
#:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
@@ -147,10 +148,8 @@
(proc port size)))))
-(define (add-cached-compressions-to-narinfo
- database
- narinfo-id
- initial-narinfo-contents)
+(define (add-cached-compressions-to-narinfo initial-narinfo-contents
+ cached-narinfo-files)
(let ((cached-nar-strings
(map (lambda (cached-nar-details)
(let ((compression
@@ -163,10 +162,9 @@
"\n"
"Compression: " compression "\n"
"FileSize: " (number->string
- (assq-ref cached-nar-details 'size)))))
- (database-select-cached-narinfo-files-by-narinfo-id
- database
- narinfo-id))))
+ (assq-ref cached-nar-details 'size))
+ "\n")))
+ cached-narinfo-files)))
(string-append
initial-narinfo-contents
(string-join
@@ -177,7 +175,8 @@
#:key base-ttl base-cached-compressions-ttl
negative-ttl logger
metrics-registry
- maybe-trigger-creation-of-compressed-nars)
+ maybe-trigger-creation-of-cached-nars
+ cached-compression-nar-requested-hook)
(define hostname
(gethostname))
@@ -244,11 +243,55 @@
"404"))
(if base-narinfo-contents
- (let ((narinfo-contents
- (add-cached-compressions-to-narinfo
- database
- narinfo-id
- base-narinfo-contents)))
+ (let* ((cached-narinfo-files
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+ (narinfo-contents
+ (if (null? cached-narinfo-files)
+ base-narinfo-contents
+ (add-cached-compressions-to-narinfo
+ base-narinfo-contents
+ cached-narinfo-files)))
+ (potential-ttls
+ (remove
+ not
+ `(,(if (null? cached-narinfo-files)
+ base-ttl
+ base-cached-compressions-ttl)
+
+ ,(and=> (database-select-scheduled-narinfo-removal
+ database
+ narinfo-id)
+ (lambda (scheduled-removal-time)
+ (list
+ (max
+ (- (time-second
+ (time-difference scheduled-removal-time
+ (current-time)))
+ 60)
+ 0))))
+
+ ,@(if (null? cached-narinfo-files)
+ '()
+ (map
+ (lambda (details)
+ (and=>
+ (database-select-scheduled-cached-narinfo-removal
+ database
+ (assq-ref details 'id))
+ (lambda (scheduled-removal-time)
+ (max
+ (- (time-second
+ (time-difference scheduled-removal-time
+ (current-time)))
+ 60)
+ 0))))
+ cached-narinfo-files)))))
+ (ttl
+ (cond
+ ((null? potential-ttls) #f)
+ (else (apply min potential-ttls)))))
(values `((content-type . (text/plain))
,@(if ttl
@@ -320,8 +363,8 @@
(request-via request))))
(when (and (not loop?)
- maybe-trigger-creation-of-compressed-nars)
- (maybe-trigger-creation-of-compressed-nars
+ maybe-trigger-creation-of-cached-nars)
+ (maybe-trigger-creation-of-cached-nars
(assq-ref narinfo 'id)))
(when loop?
@@ -359,10 +402,15 @@
#f)))
(let ((cached-narinfo-file
(and compression-symbol
+ ;; Check that the filename given in the
+ ;; request matches the narinfo store-path
+ (string=? filename
+ (basename
+ (assq-ref narinfo 'store-path)))
(database-select-cached-narinfo-file-by-hash
database
hash
- compression))))
+ compression-symbol))))
(when (or cached-narinfo-file
;; Check for a common compression to avoid lots of
@@ -380,6 +428,10 @@
'()))
'())))
+ (when cached-narinfo-file
+ (cached-compression-nar-requested-hook compression-symbol
+ filename))
+
(if cached-narinfo-file
(values (build-response
#:code 200
@@ -592,7 +644,27 @@
(match-lambda
(('cached-compression-directory-max-size . details) details)
(_ #f))
+ opts))
+ (cached-compression-ttls
+ (filter-map
+ (match-lambda
+ (('cached-compression-ttl . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-new-ttls
+ (filter-map
+ (match-lambda
+ (('cached-compression-new-ttl . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-unused-removal-durations
+ (filter-map
+ (match-lambda
+ (('cached-compression-unused-removal-duration . details)
+ details)
+ (_ #f))
opts)))
+
(filter-map
(match-lambda
(('cached-compression . details)
@@ -607,6 +679,15 @@
compression)))
(directory-max-size
. ,(assq-ref cached-compression-directories-max-sizes
+ compression))
+ (ttl
+ . ,(assq-ref cached-compression-ttls
+ compression))
+ (new-ttl
+ . ,(assq-ref cached-compression-new-ttls
+ compression))
+ (unused-removal-duration
+ . ,(assq-ref cached-compression-unused-removal-durations
compression))))))
(_ #f))
opts)))
@@ -614,10 +695,10 @@
(cached-compression-min-uses
(assq-ref opts 'cached-compression-min-uses))
- (maybe-trigger-creation-of-compressed-nars
+ (cached-compression-management-channel
(if (null? enabled-cached-compressions)
#f
- (make-maybe-trigger-creation-of-compressed-nars
+ (start-cached-compression-management-fiber
database
metrics-registry
(or (assq-ref opts 'cached-compression-nar-source)
@@ -626,7 +707,30 @@
cached-compression-min-uses
#:cached-compression-workers
(assq-ref opts 'cached-compression-workers)
- #:scheduler maintenance-scheduler))))
+ #:scheduler maintenance-scheduler)))
+
+ (maybe-trigger-creation-of-cached-nars
+ (if (null? enabled-cached-compressions)
+ #f
+ (lambda (narinfo-id)
+ (spawn-fiber
+ (lambda ()
+ (put-message cached-compression-management-channel
+ (cons 'narinfo-id narinfo-id)))
+ maintenance-scheduler))))
+
+ (cached-compression-nar-requested-hook
+ (if (null? enabled-cached-compressions)
+ #f
+ (lambda (compression filename)
+ (spawn-fiber
+ (lambda ()
+ (let* ((directory
+ (assq-ref (assq-ref enabled-cached-compressions
+ compression)
+ 'directory)))
+ (utime (peek "UTIME" (string-append directory "/" filename)))))
+ maintenance-scheduler)))))
(if (string=? (assq-ref opts 'database-dump)
"disabled")
@@ -726,6 +830,18 @@
addition-channel
removal-channel))
+ (unless (null? enabled-cached-compressions)
+ (let ((cached-compression-removal-fiber-wakeup-channel
+ (start-cached-compression-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions)))
+ (start-cached-compression-schedule-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions
+ cached-compression-removal-fiber-wakeup-channel)))
+
(log-msg 'DEBUG "finished maintenance setup")
(wait finished?))
#:scheduler maintenance-scheduler
@@ -767,8 +883,10 @@
#:negative-ttl (assq-ref opts 'narinfo-negative-ttl)
#:logger lgr
#:metrics-registry metrics-registry
- #:maybe-trigger-creation-of-compressed-nars
- maybe-trigger-creation-of-compressed-nars)
+ #:maybe-trigger-creation-of-cached-nars
+ maybe-trigger-creation-of-cached-nars
+ #:cached-compression-nar-requested-hook
+ cached-compression-nar-requested-hook)
#:host (assq-ref opts 'host)
#:port (assq-ref opts 'port))
diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in
index e81da92..e4ce9c2 100644
--- a/scripts/nar-herder.in
+++ b/scripts/nar-herder.in
@@ -208,18 +208,49 @@
arg
(alist-delete 'cached-compression-nar-source
result))))
-
- (option '("ttl") #t #f
+ (option '("cached-compressions-unused-removal-duration") #t #f
(lambda (opt name arg result)
- (let ((duration (string->duration arg)))
- (unless duration
- (simple-format (current-error-port)
- "~A: invalid duration\n"
- arg)
+ (alist-cons
+ 'cached-compression-unused-removal-duration
+ (match (string-split arg #\=)
+ ((_)
+ (simple-format
+ (current-error-port)
+ "cached-compressions-unused-removal-duration: you must specify compression and value\n")
(exit 1))
- (alist-cons 'narinfo-ttl (time-second duration)
- result))))
- (option '("new-ttl") #t #f
+ ((type duration-string)
+ (cons (string->symbol type)
+ (let ((duration (string->duration duration-string)))
+ (unless duration
+ (simple-format
+ (current-error-port)
+ "~A: cached-compressions-unused-removal-duration: invalid duration\n"
+ arg)
+ (exit 1))
+
+ duration))))
+ result)))
+ (option '("cached-compressions-ttl") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'cached-compression-ttl
+ (match (string-split arg #\=)
+ ((_)
+ (simple-format
+ (current-error-port)
+ "cached-compressions-ttl: you must specify compression and value\n")
+ (exit 1))
+ ((type ttl-string)
+ (let ((duration (string->duration ttl-string)))
+ (unless duration
+ (simple-format (current-error-port)
+ "~A: invalid duration\n"
+ arg)
+ (exit 1))
+
+ (cons (string->symbol type)
+ (time-second duration)))))
+ result)))
+ (option '("cached-compressions-new-ttl") #t #f
(lambda (opt name arg result)
(let ((duration (string->duration arg)))
(unless duration
@@ -227,9 +258,14 @@
"~A: invalid duration\n"
arg)
(exit 1))
- (alist-cons 'new-narinfo-ttl (time-second duration)
+ (alist-cons 'cached-compression-new-ttl
+ (match (string-split arg #\=)
+ ((type size)
+ (cons (string->symbol type)
+ (time-second duration))))
result))))
- (option '("cached-compressions-ttl") #t #f
+
+ (option '("ttl") #t #f
(lambda (opt name arg result)
(let ((duration (string->duration arg)))
(unless duration
@@ -237,10 +273,9 @@
"~A: invalid duration\n"
arg)
(exit 1))
- (alist-cons 'cached-compressions-narinfo-ttl
- (time-second duration)
+ (alist-cons 'narinfo-ttl (time-second duration)
result))))
- (option '("new-cached-compressions-ttl") #t #f
+ (option '("new-ttl") #t #f
(lambda (opt name arg result)
(let ((duration (string->duration arg)))
(unless duration
@@ -248,8 +283,7 @@
"~A: invalid duration\n"
arg)
(exit 1))
- (alist-cons 'new-cached-compressions-narinfo-ttl
- (time-second duration)
+ (alist-cons 'new-narinfo-ttl (time-second duration)
result))))
(option '("negative-ttl") #t #f
(lambda (opt name arg result)