aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nar-herder/cached-compression.scm622
-rw-r--r--nar-herder/database.scm287
-rw-r--r--nar-herder/server.scm162
-rw-r--r--scripts/nar-herder.in68
4 files changed, 829 insertions, 310 deletions
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm
index e4fa52f..40dfcd1 100644
--- a/nar-herder/cached-compression.scm
+++ b/nar-herder/cached-compression.scm
@@ -18,6 +18,7 @@
(define-module (nar-herder cached-compression)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-19)
#:use-module (srfi srfi-26)
#:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
@@ -26,6 +27,10 @@
#:use-module (ice-9 threads)
#:use-module (logging logger)
#:use-module (prometheus)
+ #:use-module (fibers)
+ #:use-module (fibers timers)
+ #:use-module (fibers channels)
+ #:use-module (fibers operations)
#:use-module (web uri)
#:use-module (web client)
#:use-module (web response)
@@ -36,7 +41,9 @@
#:select (dump-port mkdir-p))
#:use-module (nar-herder utils)
#:use-module (nar-herder database)
- #:export (make-maybe-trigger-creation-of-compressed-nars))
+ #:export (start-cached-compression-management-fiber
+ start-cached-compression-removal-fiber
+ start-cached-compression-schedule-removal-fiber))
;; Nar caching overview
;;
@@ -44,7 +51,6 @@
;; - Compute the size of each cached compression directory
;; - Remove database entries if they're missing from the directory
;; - Remove files if they're missing from the database
-;; - Remove (random TODO) files if the size exceeds the max
;;
;; On nar usage
;; - Bump count
@@ -52,9 +58,17 @@
;; - Trigger generation of the cached nar
;; - Skip if the work queue already includes this job
;; - At the start of the job, check for a database entry and exit
-;; early if one exists
-;; - Update the size of the cached compression directory
-;; - If the size exceeds the max, remove (random TODO) files
+;; early if one exists bumping the atime of the file
+;; - If the file doesn't exist, check if there's free space
+;; - If there's free space, create the file
+;;
+;; Nar removal fiber
+;; - Periodically remove nars that have been scheduled for removal
+;; (when the scheduled time passes)
+;;
+;; Nar schedule removal fiber
+;; - Periodically check for nars that haven't been accessed in some
+;; time and schedule them for removal
(define (perform-cached-compression-startup database
enabled-cached-compressions)
@@ -159,164 +173,38 @@
cached-bytes-by-compression))
-(define (maybe-remove-cached-files-for-compression database
- enabled-cached-compressions
- compression
- cached-bytes-by-compression-box
- nar-cache-bytes-metric)
- (let* ((compression-details
- (assq-ref enabled-cached-compressions
- compression))
- (max-size
- (assq-ref compression-details
- 'directory-max-size)))
- (when max-size
- (let ((current-size
- (assq-ref
- (atomic-box-ref cached-bytes-by-compression-box)
- compression)))
- (when (> current-size max-size)
- (let ((bytes-to-remove
- (- current-size max-size)))
- (log-msg 'DEBUG
- "looking to remove " bytes-to-remove " bytes of "
- compression " compressed nars")
-
- (match
- (database-fold-cached-narinfo-files
- database
- (lambda (details result)
- (match result
- (#(finished? bytes files-to-remove)
- (if finished?
- result
- (if (eq? (assq-ref details 'compression)
- compression)
- (let ((new-bytes
- (+ bytes
- (assq-ref details 'size))))
- ;; finished if enough bytes are going
- ;; to be removed
- (vector (>= new-bytes bytes-to-remove)
- new-bytes
- (cons details files-to-remove)))
- result)))))
- #(#f 0 ()))
- (#(#t bytes-for-removal files-to-remove-details)
- (log-msg 'DEBUG "removing " (length files-to-remove-details)
- " " compression " compressed nars from the cache")
-
- ;; Use an explicit transaction as it handles the
- ;; database being busy,
- (database-call-with-transaction
- database
- (lambda _
- (for-each
- (lambda (details)
- ;; Remove all the database entries first, as
- ;; that'll stop these files appearing in narinfos
- (database-remove-cached-narinfo-file
- database
- (assq-ref details 'narinfo-id)
- (symbol->string compression)))
- files-to-remove-details)))
-
- (let ((directory
- (assq-ref compression-details 'directory)))
- (for-each
- (lambda (details)
- (let ((filename
- (string-append
- directory "/"
- (basename (assq-ref details 'store-path)))))
- (log-msg 'DEBUG "deleting " filename)
- (delete-file filename)))
- files-to-remove-details)
-
- (let* ((cached-bytes-by-compression
- (atomic-box-ref cached-bytes-by-compression-box))
- (new-cached-bytes
- (- (assq-ref cached-bytes-by-compression
- compression)
- bytes-for-removal)))
- (atomic-box-set!
- cached-bytes-by-compression-box
- (alist-cons
- compression
- new-cached-bytes
- (alist-delete
- compression
- cached-bytes-by-compression)))
-
- (metric-set
- nar-cache-bytes-metric
- new-cached-bytes
- #:label-values `((compression . ,compression)))
-
- (log-msg 'DEBUG "finished removing " bytes-for-removal
- " bytes of " compression
- " cached nars from " directory
- " (new size " new-cached-bytes ")")))))))))))
-
-(define* (make-maybe-trigger-creation-of-compressed-nars
+;; This fiber manages metadata around cached compressions, and
+;; delegates tasks to the thread pool to generate newly compressed
+;; nars
+(define* (start-cached-compression-management-fiber
database
metrics-registry
nar-source
enabled-cached-compressions
cached-compression-min-uses
- #:key (cached-compression-workers 2) scheduler)
+ #:key (cached-compression-workers 2)
+ scheduler)
(define nar-cache-bytes-metric
(make-gauge-metric metrics-registry
"nar_cache_size_bytes"
#:labels '(compression)))
- (let ((consider-nar-request-channel
- (make-worker-thread-set
- (lambda ()
- (let ((cached-bytes-by-compression-box
- (make-atomic-box #f)))
- (atomic-box-set!
- cached-bytes-by-compression-box
- (perform-cached-compression-startup database
- enabled-cached-compressions))
-
- (for-each
- (match-lambda
- ((compression . bytes)
- (metric-set
- nar-cache-bytes-metric
- bytes
- #:label-values `((compression . ,compression)))))
- (atomic-box-ref cached-bytes-by-compression-box))
-
- ;; Remove cached files if the max size is exceeded
- (for-each
- (match-lambda
- ((compression . _)
- (maybe-remove-cached-files-for-compression
- database
- enabled-cached-compressions
- compression
- cached-bytes-by-compression-box
- nar-cache-bytes-metric)))
- (atomic-box-ref cached-bytes-by-compression-box))
-
- (list cached-bytes-by-compression-box)))
- #:name "comp nar req"
- ;; Just make one thread, as this thread won't do much work
- ;; and relies on a hash table that shouldn't be accessed by
- ;; multiple threads
- #:parallelism 1)))
-
- (define with-usage-hash-table
- (let ((nar-cached-compression-usage-hash-table
- (make-hash-table 65536)))
- (lambda (proc)
- (monitor
- (proc nar-cached-compression-usage-hash-table)))))
-
- (define (proc narinfo-id cached-bytes-by-compression-box)
+ (define channel
+ (make-channel))
+
+ (let ((process-job
+ count-jobs
+ count-threads
+ list-jobs
+ (create-work-queue cached-compression-workers
+ (lambda (thunk)
+ (thunk))
+ #:name "cached compression")))
+
+ (define (consider-narinfo cached-bytes-by-compression
+ usage-hash-table
+ narinfo-id)
(let* ((existing-cached-files
;; This is important both to avoid trying to create
;; files twice, but also in the case where additional
@@ -333,120 +221,342 @@
(lset-difference eq?
(map car enabled-cached-compressions)
existing-compressions))
- (compress-file?
- (let ((url
- (assq-ref
- (first
- (database-select-narinfo-files-by-narinfo-id
- database
- narinfo-id))
- 'url)))
+ (narinfo-files
+ (database-select-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+ (compress-file?
+ (let ((url (assq-ref (first narinfo-files) 'url)))
;; TODO: Maybe this should be configurable?
(not (compressed-file? url)))))
(when (and compress-file?
(not (null? missing-compressions)))
(let ((new-count
- (with-usage-hash-table
- (lambda (usage)
- (let ((val (+ 1
- (or (hash-ref usage narinfo-id)
- 0))))
- (hash-set! usage
- narinfo-id
- val)
- val)))))
-
- (when (> new-count
- cached-compression-min-uses)
- (for-each
- (lambda (missing-compression)
- (let ((new-bytes
- (make-compressed-nar
- database
- nar-source
- enabled-cached-compressions
- narinfo-id
- missing-compression
- #:level (assq-ref
- (assq-ref enabled-cached-compressions
- missing-compression)
- 'level))))
-
- ;; Do this here, after creating the new cached nar,
- ;; just in case there's a lack of space
- (monitor
- (let* ((cached-bytes-by-compression
- (atomic-box-ref
- cached-bytes-by-compression-box))
- (updated-bytes-for-compression
- (+ (assq-ref cached-bytes-by-compression
- missing-compression)
- new-bytes)))
-
- (metric-set
- nar-cache-bytes-metric
- updated-bytes-for-compression
- #:label-values `((compression . ,missing-compression)))
-
- (atomic-box-set!
- cached-bytes-by-compression-box
- (alist-cons
- missing-compression
- updated-bytes-for-compression
- (alist-delete
- missing-compression
- cached-bytes-by-compression)))
-
- (maybe-remove-cached-files-for-compression
- database
- enabled-cached-compressions
- missing-compression
- cached-bytes-by-compression-box
- nar-cache-bytes-metric)))))
- missing-compressions)
-
- (with-usage-hash-table
- (lambda (usage)
- (hash-remove! usage narinfo-id))))))))
-
- (let ((process-job
- count-jobs
- count-threads
- list-jobs
- (create-work-queue cached-compression-workers
- proc
- #:name "cached compression")))
-
- (lambda (narinfo-id)
- (spawn-fiber
+ (let ((val (+ 1
+ (or (hash-ref usage-hash-table narinfo-id)
+ 0))))
+ (hash-set! usage-hash-table
+ narinfo-id
+ val)
+ val)))
+
+ (when (and (> new-count
+ cached-compression-min-uses))
+ (let* ((narinfo-details
+ (database-select-narinfo database narinfo-id))
+ (nar-size
+ (assq-ref narinfo-details 'nar-size))
+ (compressions-with-space
+ (filter
+ (lambda (compression)
+ (let ((directory-max-size
+ (assq-ref
+ (assq-ref enabled-cached-compressions
+ compression)
+ 'directory-max-size))
+ (current-directory-size
+ (assq-ref cached-bytes-by-compression
+ compression)))
+ (if directory-max-size
+ (< (+ current-directory-size
+ ;; Assume the commpressed nar could be
+ ;; as big as the uncompressed nar
+ nar-size)
+ directory-max-size)
+ #t)))
+ missing-compressions)))
+ (for-each
+ (lambda (compression)
+ (spawn-fiber
+ (lambda ()
+ (process-job
+ (lambda ()
+ (let ((new-bytes
+ (make-compressed-nar
+ narinfo-files
+ nar-source
+ enabled-cached-compressions
+ compression
+ #:level (assq-ref
+ (assq-ref enabled-cached-compressions
+ compression)
+ 'level))))
+ (put-message channel
+ (list 'cached-narinfo-added
+ narinfo-id
+ compression
+ new-bytes))))))))
+ compressions-with-space)))))))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((initial-cached-bytes-by-compression
+ (perform-cached-compression-startup
+ database
+ enabled-cached-compressions))
+ (nar-cached-compression-usage-hash-table
+ (make-hash-table 65536)))
+
+ (for-each
+ (match-lambda
+ ((compression . bytes)
+ (metric-set
+ nar-cache-bytes-metric
+ bytes
+ #:label-values `((compression . ,compression)))))
+ initial-cached-bytes-by-compression)
+
+ (let loop ((cached-bytes-by-compression
+ initial-cached-bytes-by-compression))
+ (match (get-message channel)
+ (('narinfo-id . narinfo-id)
+ (consider-narinfo cached-bytes-by-compression
+ nar-cached-compression-usage-hash-table
+ narinfo-id)
+ (loop cached-bytes-by-compression))
+
+ (((and (or 'cached-narinfo-added 'cached-narinfo-removed)
+ action)
+ narinfo-id compression size)
+ (let ((updated-bytes
+ ((if (eq? action 'cached-narinfo-added)
+ +
+ -)
+ (or (assq-ref cached-bytes-by-compression
+ compression)
+ 0)
+ size)))
+
+ (metric-set
+ nar-cache-bytes-metric
+ updated-bytes
+ #:label-values `((compression . ,compression)))
+
+ (when (eq? action 'cached-narinfo-added)
+ (database-insert-cached-narinfo-file
+ database
+ narinfo-id
+ size
+ compression)
+
+ (hash-remove! nar-cached-compression-usage-hash-table
+ narinfo-id))
+
+ (loop (alist-cons
+ cached-bytes-by-compression
+ updated-bytes
+ (alist-delete compression
+ cached-bytes-by-compression)))))))))
+ scheduler)
+
+ channel))
+
+;; Periodically check for nars that haven't been accessed in some time
+;; and schedule them for removal
+(define (start-cached-compression-schedule-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions
+ cached-compression-removal-fiber-wakeup-channel)
+
+ (define (files-to-schedule-for-removal compression-details)
+ (let* ((directory (assq-ref compression-details 'directory))
+ (unused-removal-duration
+ (assq-ref compression-details 'unused-removal-duration))
+ (atime-threshold
+ (time-second
+ (subtract-duration (current-time)
+ unused-removal-duration))))
+ (scandir
+ directory
+ (lambda (filename)
+ (and
+ (< (stat:atime (stat (string-append directory "/" filename)))
+ atime-threshold)
+ (not (member filename '("." ".."))))))))
+
+ (define (schedule-removal compression compression-details)
+ (let* ((files (files-to-schedule-for-removal compression-details))
+ (all-cached-narinfo-file-details
+ (map
+ (lambda (file)
+ (database-select-cached-narinfo-file-by-hash
+ database
+ (string-take file 32) ; hash part
+ compression))
+ files))
+ (existing-scheduled-removals
+ (map
+ (lambda (cached-narinfo-file-details)
+ (database-select-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-file-details 'id)))
+ all-cached-narinfo-file-details)))
+
+ (for-each
+ (lambda (file cached-narinfo-file-details existing-scheduled-removal)
+ (unless existing-scheduled-removal
+ (let ((removal-time
+ ;; The earliest this can be removed is the current
+ ;; time, plus the TTL
+ (add-duration
+ (current-time)
+ (make-time time-duration
+ 0
+ (assq-ref compression-details 'ttl)))))
+ (database-insert-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-file-details 'id)
+ removal-time))))
+ files
+ all-cached-narinfo-file-details
+ existing-scheduled-removals)
+
+ (when (any not existing-scheduled-removals)
+ ;; Wake the cached compression removal fiber in case one of
+ ;; the new scheduled removals is before it's scheduled to wake
+ ;; up
+ (put-message
+ cached-compression-removal-fiber-wakeup-channel
+ #t))))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((sleep-duration
+ (apply min
+ (map (lambda (compression-details)
+ (/ (time-second
+ (assq-ref compression-details
+ 'unused-removal-duration))
+ 4))
+ enabled-cached-compressions))))
+ (while #t
+ (log-msg 'DEBUG "cached-compression-schedule-removal-fiber starting pass")
+
+ (for-each
+ (match-lambda
+ ((compression . details)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "cached-compression-schedule-removal-fiber: "
+ "exception: " exn))
+ (lambda ()
+ (schedule-removal compression details))
+ #:unwind? #t)))
+ enabled-cached-compressions)
+
+ (log-msg 'DEBUG "cached-compression-schedule-removal-fiber sleeping for "
+ sleep-duration)
+ (sleep sleep-duration))))))
+
+;; Process the scheduled removals for cached nars
+(define (start-cached-compression-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions)
+
+ (define (remove id narinfo-id compression store-path)
+ ;; Use an explicit transaction as it handles the
+ ;; database being busy,
+ (database-call-with-transaction
+ database
+ (lambda _
+ (database-delete-scheduled-cached-narinfo-removal
+ database
+ id)
+
+ ;; Remove all the database entries first, as
+ ;; that'll stop these files appearing in narinfos
+ (database-remove-cached-narinfo-file
+ database
+ narinfo-id
+ (symbol->string compression))))
+
+ (let ((directory
+ (assq-ref (assq-ref enabled-cached-compressions
+ compression)
+ 'directory)))
+ (let ((filename
+ (string-append
+ directory "/"
+ (basename store-path))))
+ (log-msg 'DEBUG "deleting " filename)
+ (delete-file filename))))
+
+ (define wakeup-channel
+ (make-channel))
+
+ (define (make-pass)
+ (log-msg 'DEBUG "cached-compression-removal-fiber starting pass")
+
+ (let ((scheduled-cached-narinfo-removal
+ (database-select-oldest-scheduled-cached-narinfo-removal
+ database)))
+
+ (log-msg 'DEBUG "scheduled removal: " scheduled-cached-narinfo-removal)
+
+ (if scheduled-cached-narinfo-removal
+ (if (time<=? (assq-ref scheduled-cached-narinfo-removal
+ 'scheduled-removal-time)
+ (current-time))
+ (let ((id (assq-ref scheduled-cached-narinfo-removal 'id))
+ (narinfo-id (assq-ref scheduled-cached-narinfo-removal
+ 'narinfo-id))
+ (compression (assq-ref scheduled-cached-narinfo-removal
+ 'compression))
+ (size (assq-ref scheduled-cached-narinfo-removal
+ 'size))
+ (store-path (assq-ref scheduled-cached-narinfo-removal
+ 'store-path)))
+ (remove id narinfo-id compression store-path)
+
+ (put-message
+ cached-compression-management-channel
+ (list 'cached-narinfo-removed
+ narinfo-id
+ compression
+ size)))
+
+ (let ((duration
+ (time-difference
+ (assq-ref scheduled-cached-narinfo-removal
+ 'scheduled-removal-time)
+ (current-time))))
+ (perform-operation
+ (choice-operation
+ (sleep-operation (max 1 (+ 1 (time-second duration))))
+ (get-operation wakeup-channel)))))
+
+ ;; Sleep until woken
+ (get-message wakeup-channel))))
+
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception in cached-compression-removal-fiber: "
+ exn))
(lambda ()
- (call-with-worker-thread
- consider-nar-request-channel
- (lambda (cached-bytes-by-compression-box)
- (let ((in-progress-narinfo-ids
- (map car (list-jobs))))
-
- (unless (member narinfo-id in-progress-narinfo-ids)
- (process-job narinfo-id cached-bytes-by-compression-box)))
- #t)))
- scheduler
- #:parallel? #t)))))
-
-(define* (make-compressed-nar database
+ (with-throw-handler #t
+ make-pass
+ (lambda _
+ (backtrace))))
+ #:unwind? #t))))
+
+ wakeup-channel)
+
+(define* (make-compressed-nar narinfo-files
nar-source
enabled-cached-compressions
- narinfo-id
target-compression
#:key level)
(define cached-compression-details
(assq-ref enabled-cached-compressions target-compression))
- (define narinfo-files
- (database-select-narinfo-files-by-narinfo-id database
- narinfo-id))
-
(log-msg 'DEBUG "making " target-compression " for "
(uri-decode
(basename
@@ -565,16 +675,6 @@
(let ((bytes
(stat:size (stat dest-filename))))
-
- (database-call-with-transaction
- database
- (lambda _
- (database-insert-cached-narinfo-file
- database
- narinfo-id
- bytes
- (symbol->string target-compression))))
-
(log-msg 'DEBUG "created " dest-filename)
bytes))))
diff --git a/nar-herder/database.scm b/nar-herder/database.scm
index c8171a3..98c29d5 100644
--- a/nar-herder/database.scm
+++ b/nar-herder/database.scm
@@ -43,6 +43,7 @@
database-insert-narinfo
database-remove-narinfo
+ database-select-narinfo
database-select-narinfo-by-hash
database-select-narinfo-contents-by-hash
@@ -64,7 +65,13 @@
database-select-cached-narinfo-file-by-hash
database-select-cached-narinfo-files-by-narinfo-id
database-fold-cached-narinfo-files
- database-remove-cached-narinfo-file))
+ database-remove-cached-narinfo-file
+
+ database-select-scheduled-narinfo-removal
+ database-select-scheduled-cached-narinfo-removal
+ database-delete-scheduled-cached-narinfo-removal
+ database-select-oldest-scheduled-cached-narinfo-removal
+ database-insert-scheduled-cached-narinfo-removal))
(define-record-type <database>
(make-database database-file reader-thread-channel writer-thread-channel
@@ -145,7 +152,17 @@ CREATE TABLE cached_narinfo_files (
);
CREATE INDEX cached_narinfo_files_narinfo_id
- ON cached_narinfo_files (narinfo_id);")
+ ON cached_narinfo_files (narinfo_id);
+
+CREATE TABLE scheduled_narinfo_removal (
+ narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id),
+ removal_datetime TEXT NOT NULL
+);
+
+CREATE TABLE scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id),
+ removal_datetime TEXT NOT NULL
+);")
(sqlite-exec db schema))
@@ -206,6 +223,24 @@ CREATE INDEX cached_narinfo_files_narinfo_id
db
"ALTER TABLE narinfos ADD COLUMN added_at TEXT;"))
+ (unless (table-exists? db "scheduled_narinfo_removal")
+ (sqlite-exec
+ db
+ "
+CREATE TABLE scheduled_narinfo_removal (
+ narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id),
+ removal_datetime TEXT NOT NULL
+);"))
+
+ (unless (table-exists? db "scheduled_cached_narinfo_removal")
+ (sqlite-exec
+ db
+ "
+CREATE TABLE scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id),
+ removal_datetime TEXT NOT NULL
+);"))
+
(sqlite-exec
db
"CREATE INDEX IF NOT EXISTS narinfo_tags_narinfo_id
@@ -835,6 +870,38 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id"
#t)
#f)))))
+(define (database-select-narinfo database id)
+ (call-with-time-tracking
+ database
+ "select_narinfo"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT store_path, nar_hash, nar_size, deriver, system
+FROM narinfos
+WHERE id = :id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:id id)
+
+ (match (let ((result (sqlite-step statement)))
+ (sqlite-reset statement)
+ result)
+ (#(store_path nar_hash nar_size deriver system)
+ `((store-path . ,store_path)
+ (nar-hash . ,nar_hash)
+ (nar-size . ,nar_size)
+ (deriver . ,deriver)
+ (system . ,system)))
+ (_
+ #f))))))))
+
(define (database-select-narinfo-by-hash database hash)
(call-with-time-tracking
database
@@ -1175,7 +1242,7 @@ INSERT INTO cached_narinfo_files (
statement
#:narinfo_id narinfo-id
#:size size
- #:compression compression)
+ #:compression (symbol->string compression))
(sqlite-step statement)
(sqlite-reset statement)
@@ -1196,7 +1263,7 @@ INSERT INTO cached_narinfo_files (
(sqlite-prepare
db
"
-SELECT cached_narinfo_files.size
+SELECT cached_narinfo_files.id, cached_narinfo_files.size
FROM narinfos
INNER JOIN cached_narinfo_files
ON cached_narinfo_files.narinfo_id = narinfos.id
@@ -1207,12 +1274,13 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash
(sqlite-bind-arguments
statement
#:hash hash
- #:compression compression)
+ #:compression (symbol->string compression))
(let ((result
(match (sqlite-step statement)
- (#(size)
- `((size . ,size)))
+ (#(id size)
+ `((id . ,id)
+ (size . ,size)))
(#f #f))))
(sqlite-reset statement)
@@ -1232,7 +1300,10 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash
(sqlite-prepare
db
"
-SELECT store_path, size, compression
+SELECT cached_narinfo_files.id,
+ narinfos.store_path,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression
FROM cached_narinfo_files
INNER JOIN narinfos
ON cached_narinfo_files.narinfo_id = narinfos.id
@@ -1246,8 +1317,9 @@ WHERE narinfo_id = :narinfo_id"
(let ((result
(sqlite-map
(match-lambda
- (#(store_path size compression)
- `((store-path . ,store_path)
+ (#(id store_path size compression)
+ `((id . ,id)
+ (store-path . ,store_path)
(size . ,size)
(compression . ,(string->symbol compression)))))
statement)))
@@ -1255,6 +1327,48 @@ WHERE narinfo_id = :narinfo_id"
result)))))))
+(define (database-select-cached-narinfo-file-by-narinfo-id-and-compression
+ database
+ narinfo-id
+ compression)
+ (call-with-time-tracking
+ database
+ "select_cached_narinfo_file_by_narinfo_id_and_compression"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id,
+ narinfos.store_path,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression
+FROM cached_narinfo_files
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+WHERE narinfo_id = :narinfo_id
+ AND compression = :compression"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:compression (symbol->string compression))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(id store_path size compression)
+ `((id . ,id)
+ (store-path . ,store_path)
+ (size . ,size)
+ (compression . ,(string->symbol compression)))))))
+ (sqlite-reset statement)
+
+ result)))))))
+
(define (database-fold-cached-narinfo-files database
proc
init)
@@ -1306,3 +1420,156 @@ WHERE narinfo_id = :narinfo_id
(sqlite-step statement)
(sqlite-reset statement)))))
+
+(define (database-select-scheduled-narinfo-removal database narinfo-id)
+ (call-with-time-tracking
+ database
+ "select_scheduled_narinfo_removal"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT removal_datetime
+FROM scheduled_narinfo_removal
+WHERE narinfo_id = :narinfo_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(datetime)
+ (date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))))
+
+(define (database-select-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id)
+ (call-with-time-tracking
+ database
+ "select_scheduled_narinfo_removal"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT removal_datetime
+FROM scheduled_cached_narinfo_removal
+WHERE cached_narinfo_file_id = :cached_narinfo_file_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(datetime)
+ (date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))))
+
+(define (database-delete-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+DELETE FROM scheduled_cached_narinfo_removal
+WHERE cached_narinfo_file_id = :cached_narinfo_file_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id)
+
+ (sqlite-step statement)
+ (sqlite-reset statement)
+
+ #t))))
+
+(define (database-select-oldest-scheduled-cached-narinfo-removal database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id,
+ cached_narinfo_files.narinfo_id,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression,
+ narinfos.store_path,
+ scheduled_cached_narinfo_removal.removal_datetime
+FROM scheduled_cached_narinfo_removal
+INNER JOIN cached_narinfo_files
+ ON scheduled_cached_narinfo_removal.cached_narinfo_file_id =
+ cached_narinfo_files.id
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+ORDER BY scheduled_cached_narinfo_removal.removal_datetime DESC
+LIMIT 1"
+ #:cache? #t)))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(id narinfo_id size compression store_path datetime)
+ `((id . ,id)
+ (narinfo-id . ,narinfo_id)
+ (size . ,size)
+ (compression . ,(string->symbol compression))
+ (store-path . ,store_path)
+ (scheduled-removal-time . ,(date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))
+
+(define (database-insert-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id
+ removal-datetime)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id, removal_datetime
+) VALUES (
+ :cached_narinfo_file_id, :removal_datetime
+)"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id
+ #:removal_datetime (date->string
+ (time-utc->date removal-datetime)
+ "~Y-~m-~d ~H:~M:~S"))
+
+ (sqlite-step statement)
+ (sqlite-reset statement)
+
+ #t))))
diff --git a/nar-herder/server.scm b/nar-herder/server.scm
index ff3f131..583b4a3 100644
--- a/nar-herder/server.scm
+++ b/nar-herder/server.scm
@@ -19,6 +19,7 @@
(define-module (nar-herder server)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-11)
+ #:use-module (srfi srfi-19)
#:use-module (srfi srfi-34)
#:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
@@ -147,10 +148,8 @@
(proc port size)))))
-(define (add-cached-compressions-to-narinfo
- database
- narinfo-id
- initial-narinfo-contents)
+(define (add-cached-compressions-to-narinfo initial-narinfo-contents
+ cached-narinfo-files)
(let ((cached-nar-strings
(map (lambda (cached-nar-details)
(let ((compression
@@ -163,10 +162,9 @@
"\n"
"Compression: " compression "\n"
"FileSize: " (number->string
- (assq-ref cached-nar-details 'size)))))
- (database-select-cached-narinfo-files-by-narinfo-id
- database
- narinfo-id))))
+ (assq-ref cached-nar-details 'size))
+ "\n")))
+ cached-narinfo-files)))
(string-append
initial-narinfo-contents
(string-join
@@ -177,7 +175,8 @@
#:key base-ttl base-cached-compressions-ttl
negative-ttl logger
metrics-registry
- maybe-trigger-creation-of-compressed-nars)
+ maybe-trigger-creation-of-cached-nars
+ cached-compression-nar-requested-hook)
(define hostname
(gethostname))
@@ -244,11 +243,55 @@
"404"))
(if base-narinfo-contents
- (let ((narinfo-contents
- (add-cached-compressions-to-narinfo
- database
- narinfo-id
- base-narinfo-contents)))
+ (let* ((cached-narinfo-files
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+ (narinfo-contents
+ (if (null? cached-narinfo-files)
+ base-narinfo-contents
+ (add-cached-compressions-to-narinfo
+ base-narinfo-contents
+ cached-narinfo-files)))
+ (potential-ttls
+ (remove
+ not
+ `(,(if (null? cached-narinfo-files)
+ base-ttl
+ base-cached-compressions-ttl)
+
+ ,(and=> (database-select-scheduled-narinfo-removal
+ database
+ narinfo-id)
+ (lambda (scheduled-removal-time)
+ (list
+ (max
+ (- (time-second
+ (time-difference scheduled-removal-time
+ (current-time)))
+ 60)
+ 0))))
+
+ ,@(if (null? cached-narinfo-files)
+ '()
+ (map
+ (lambda (details)
+ (and=>
+ (database-select-scheduled-cached-narinfo-removal
+ database
+ (assq-ref details 'id))
+ (lambda (scheduled-removal-time)
+ (max
+ (- (time-second
+ (time-difference scheduled-removal-time
+ (current-time)))
+ 60)
+ 0))))
+ cached-narinfo-files)))))
+ (ttl
+ (cond
+ ((null? potential-ttls) #f)
+ (else (apply min potential-ttls)))))
(values `((content-type . (text/plain))
,@(if ttl
@@ -320,8 +363,8 @@
(request-via request))))
(when (and (not loop?)
- maybe-trigger-creation-of-compressed-nars)
- (maybe-trigger-creation-of-compressed-nars
+ maybe-trigger-creation-of-cached-nars)
+ (maybe-trigger-creation-of-cached-nars
(assq-ref narinfo 'id)))
(when loop?
@@ -359,10 +402,15 @@
#f)))
(let ((cached-narinfo-file
(and compression-symbol
+ ;; Check that the filename given in the
+ ;; request matches the narinfo store-path
+ (string=? filename
+ (basename
+ (assq-ref narinfo 'store-path)))
(database-select-cached-narinfo-file-by-hash
database
hash
- compression))))
+ compression-symbol))))
(when (or cached-narinfo-file
;; Check for a common compression to avoid lots of
@@ -380,6 +428,10 @@
'()))
'())))
+ (when cached-narinfo-file
+ (cached-compression-nar-requested-hook compression-symbol
+ filename))
+
(if cached-narinfo-file
(values (build-response
#:code 200
@@ -592,7 +644,27 @@
(match-lambda
(('cached-compression-directory-max-size . details) details)
(_ #f))
+ opts))
+ (cached-compression-ttls
+ (filter-map
+ (match-lambda
+ (('cached-compression-ttl . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-new-ttls
+ (filter-map
+ (match-lambda
+ (('cached-compression-new-ttl . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-unused-removal-durations
+ (filter-map
+ (match-lambda
+ (('cached-compression-unused-removal-duration . details)
+ details)
+ (_ #f))
opts)))
+
(filter-map
(match-lambda
(('cached-compression . details)
@@ -607,6 +679,15 @@
compression)))
(directory-max-size
. ,(assq-ref cached-compression-directories-max-sizes
+ compression))
+ (ttl
+ . ,(assq-ref cached-compression-ttls
+ compression))
+ (new-ttl
+ . ,(assq-ref cached-compression-new-ttls
+ compression))
+ (unused-removal-duration
+ . ,(assq-ref cached-compression-unused-removal-durations
compression))))))
(_ #f))
opts)))
@@ -614,10 +695,10 @@
(cached-compression-min-uses
(assq-ref opts 'cached-compression-min-uses))
- (maybe-trigger-creation-of-compressed-nars
+ (cached-compression-management-channel
(if (null? enabled-cached-compressions)
#f
- (make-maybe-trigger-creation-of-compressed-nars
+ (start-cached-compression-management-fiber
database
metrics-registry
(or (assq-ref opts 'cached-compression-nar-source)
@@ -626,7 +707,30 @@
cached-compression-min-uses
#:cached-compression-workers
(assq-ref opts 'cached-compression-workers)
- #:scheduler maintenance-scheduler))))
+ #:scheduler maintenance-scheduler)))
+
+ (maybe-trigger-creation-of-cached-nars
+ (if (null? enabled-cached-compressions)
+ #f
+ (lambda (narinfo-id)
+ (spawn-fiber
+ (lambda ()
+ (put-message cached-compression-management-channel
+ (cons 'narinfo-id narinfo-id)))
+ maintenance-scheduler))))
+
+ (cached-compression-nar-requested-hook
+ (if (null? enabled-cached-compressions)
+ #f
+ (lambda (compression filename)
+ (spawn-fiber
+ (lambda ()
+ (let* ((directory
+ (assq-ref (assq-ref enabled-cached-compressions
+ compression)
+ 'directory)))
+ (utime (peek "UTIME" (string-append directory "/" filename)))))
+ maintenance-scheduler)))))
(if (string=? (assq-ref opts 'database-dump)
"disabled")
@@ -726,6 +830,18 @@
addition-channel
removal-channel))
+ (unless (null? enabled-cached-compressions)
+ (let ((cached-compression-removal-fiber-wakeup-channel
+ (start-cached-compression-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions)))
+ (start-cached-compression-schedule-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions
+ cached-compression-removal-fiber-wakeup-channel)))
+
(log-msg 'DEBUG "finished maintenance setup")
(wait finished?))
#:scheduler maintenance-scheduler
@@ -767,8 +883,10 @@
#:negative-ttl (assq-ref opts 'narinfo-negative-ttl)
#:logger lgr
#:metrics-registry metrics-registry
- #:maybe-trigger-creation-of-compressed-nars
- maybe-trigger-creation-of-compressed-nars)
+ #:maybe-trigger-creation-of-cached-nars
+ maybe-trigger-creation-of-cached-nars
+ #:cached-compression-nar-requested-hook
+ cached-compression-nar-requested-hook)
#:host (assq-ref opts 'host)
#:port (assq-ref opts 'port))
diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in
index e81da92..e4ce9c2 100644
--- a/scripts/nar-herder.in
+++ b/scripts/nar-herder.in
@@ -208,18 +208,49 @@
arg
(alist-delete 'cached-compression-nar-source
result))))
-
- (option '("ttl") #t #f
+ (option '("cached-compressions-unused-removal-duration") #t #f
(lambda (opt name arg result)
- (let ((duration (string->duration arg)))
- (unless duration
- (simple-format (current-error-port)
- "~A: invalid duration\n"
- arg)
+ (alist-cons
+ 'cached-compression-unused-removal-duration
+ (match (string-split arg #\=)
+ ((_)
+ (simple-format
+ (current-error-port)
+ "cached-compressions-unused-removal-duration: you must specify compression and value\n")
(exit 1))
- (alist-cons 'narinfo-ttl (time-second duration)
- result))))
- (option '("new-ttl") #t #f
+ ((type duration-string)
+ (cons (string->symbol type)
+ (let ((duration (string->duration duration-string)))
+ (unless duration
+ (simple-format
+ (current-error-port)
+ "~A: cached-compressions-unused-removal-duration: invalid duration\n"
+ arg)
+ (exit 1))
+
+ duration))))
+ result)))
+ (option '("cached-compressions-ttl") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'cached-compression-ttl
+ (match (string-split arg #\=)
+ ((_)
+ (simple-format
+ (current-error-port)
+ "cached-compressions-ttl: you must specify compression and value\n")
+ (exit 1))
+ ((type ttl-string)
+ (let ((duration (string->duration ttl-string)))
+ (unless duration
+ (simple-format (current-error-port)
+ "~A: invalid duration\n"
+ arg)
+ (exit 1))
+
+ (cons (string->symbol type)
+ (time-second duration)))))
+ result)))
+ (option '("cached-compressions-new-ttl") #t #f
(lambda (opt name arg result)
(let ((duration (string->duration arg)))
(unless duration
@@ -227,9 +258,14 @@
"~A: invalid duration\n"
arg)
(exit 1))
- (alist-cons 'new-narinfo-ttl (time-second duration)
+ (alist-cons 'cached-compression-new-ttl
+ (match (string-split arg #\=)
+ ((type size)
+ (cons (string->symbol type)
+ (time-second duration))))
result))))
- (option '("cached-compressions-ttl") #t #f
+
+ (option '("ttl") #t #f
(lambda (opt name arg result)
(let ((duration (string->duration arg)))
(unless duration
@@ -237,10 +273,9 @@
"~A: invalid duration\n"
arg)
(exit 1))
- (alist-cons 'cached-compressions-narinfo-ttl
- (time-second duration)
+ (alist-cons 'narinfo-ttl (time-second duration)
result))))
- (option '("new-cached-compressions-ttl") #t #f
+ (option '("new-ttl") #t #f
(lambda (opt name arg result)
(let ((duration (string->duration arg)))
(unless duration
@@ -248,8 +283,7 @@
"~A: invalid duration\n"
arg)
(exit 1))
- (alist-cons 'new-cached-compressions-narinfo-ttl
- (time-second duration)
+ (alist-cons 'new-narinfo-ttl (time-second duration)
result))))
(option '("negative-ttl") #t #f
(lambda (opt name arg result)