diff options
-rw-r--r-- | nar-herder/cached-compression.scm | 622 | ||||
-rw-r--r-- | nar-herder/database.scm | 287 | ||||
-rw-r--r-- | nar-herder/server.scm | 162 | ||||
-rw-r--r-- | scripts/nar-herder.in | 68 |
4 files changed, 829 insertions, 310 deletions
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm index e4fa52f..40dfcd1 100644 --- a/nar-herder/cached-compression.scm +++ b/nar-herder/cached-compression.scm @@ -18,6 +18,7 @@ (define-module (nar-herder cached-compression) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) @@ -26,6 +27,10 @@ #:use-module (ice-9 threads) #:use-module (logging logger) #:use-module (prometheus) + #:use-module (fibers) + #:use-module (fibers timers) + #:use-module (fibers channels) + #:use-module (fibers operations) #:use-module (web uri) #:use-module (web client) #:use-module (web response) @@ -36,7 +41,9 @@ #:select (dump-port mkdir-p)) #:use-module (nar-herder utils) #:use-module (nar-herder database) - #:export (make-maybe-trigger-creation-of-compressed-nars)) + #:export (start-cached-compression-management-fiber + start-cached-compression-removal-fiber + start-cached-compression-schedule-removal-fiber)) ;; Nar caching overview ;; @@ -44,7 +51,6 @@ ;; - Compute the size of each cached compression directory ;; - Remove database entries if they're missing from the directory ;; - Remove files if they're missing from the database -;; - Remove (random TODO) files if the size exceeds the max ;; ;; On nar usage ;; - Bump count @@ -52,9 +58,17 @@ ;; - Trigger generation of the cached nar ;; - Skip if the work queue already includes this job ;; - At the start of the job, check for a database entry and exit -;; early if one exists -;; - Update the size of the cached compression directory -;; - If the size exceeds the max, remove (random TODO) files +;; early if one exists bumping the atime of the file +;; - If the file doesn't exist, check if there's free space +;; - If there's free space, create the file +;; +;; Nar removal fiber +;; - Periodically remove nars that have been scheduled for removal +;; (when the scheduled time passes) +;; +;; Nar schedule removal fiber +;; - Periodically check for nars that haven't been accessed in some +;; time and schedule them for removal (define (perform-cached-compression-startup database enabled-cached-compressions) @@ -159,164 +173,38 @@ cached-bytes-by-compression)) -(define (maybe-remove-cached-files-for-compression database - enabled-cached-compressions - compression - cached-bytes-by-compression-box - nar-cache-bytes-metric) - (let* ((compression-details - (assq-ref enabled-cached-compressions - compression)) - (max-size - (assq-ref compression-details - 'directory-max-size))) - (when max-size - (let ((current-size - (assq-ref - (atomic-box-ref cached-bytes-by-compression-box) - compression))) - (when (> current-size max-size) - (let ((bytes-to-remove - (- current-size max-size))) - (log-msg 'DEBUG - "looking to remove " bytes-to-remove " bytes of " - compression " compressed nars") - - (match - (database-fold-cached-narinfo-files - database - (lambda (details result) - (match result - (#(finished? bytes files-to-remove) - (if finished? - result - (if (eq? (assq-ref details 'compression) - compression) - (let ((new-bytes - (+ bytes - (assq-ref details 'size)))) - ;; finished if enough bytes are going - ;; to be removed - (vector (>= new-bytes bytes-to-remove) - new-bytes - (cons details files-to-remove))) - result))))) - #(#f 0 ())) - (#(#t bytes-for-removal files-to-remove-details) - (log-msg 'DEBUG "removing " (length files-to-remove-details) - " " compression " compressed nars from the cache") - - ;; Use an explicit transaction as it handles the - ;; database being busy, - (database-call-with-transaction - database - (lambda _ - (for-each - (lambda (details) - ;; Remove all the database entries first, as - ;; that'll stop these files appearing in narinfos - (database-remove-cached-narinfo-file - database - (assq-ref details 'narinfo-id) - (symbol->string compression))) - files-to-remove-details))) - - (let ((directory - (assq-ref compression-details 'directory))) - (for-each - (lambda (details) - (let ((filename - (string-append - directory "/" - (basename (assq-ref details 'store-path))))) - (log-msg 'DEBUG "deleting " filename) - (delete-file filename))) - files-to-remove-details) - - (let* ((cached-bytes-by-compression - (atomic-box-ref cached-bytes-by-compression-box)) - (new-cached-bytes - (- (assq-ref cached-bytes-by-compression - compression) - bytes-for-removal))) - (atomic-box-set! - cached-bytes-by-compression-box - (alist-cons - compression - new-cached-bytes - (alist-delete - compression - cached-bytes-by-compression))) - - (metric-set - nar-cache-bytes-metric - new-cached-bytes - #:label-values `((compression . ,compression))) - - (log-msg 'DEBUG "finished removing " bytes-for-removal - " bytes of " compression - " cached nars from " directory - " (new size " new-cached-bytes ")"))))))))))) - -(define* (make-maybe-trigger-creation-of-compressed-nars +;; This fiber manages metadata around cached compressions, and +;; delegates tasks to the thread pool to generate newly compressed +;; nars +(define* (start-cached-compression-management-fiber database metrics-registry nar-source enabled-cached-compressions cached-compression-min-uses - #:key (cached-compression-workers 2) scheduler) + #:key (cached-compression-workers 2) + scheduler) (define nar-cache-bytes-metric (make-gauge-metric metrics-registry "nar_cache_size_bytes" #:labels '(compression))) - (let ((consider-nar-request-channel - (make-worker-thread-set - (lambda () - (let ((cached-bytes-by-compression-box - (make-atomic-box #f))) - (atomic-box-set! - cached-bytes-by-compression-box - (perform-cached-compression-startup database - enabled-cached-compressions)) - - (for-each - (match-lambda - ((compression . bytes) - (metric-set - nar-cache-bytes-metric - bytes - #:label-values `((compression . ,compression))))) - (atomic-box-ref cached-bytes-by-compression-box)) - - ;; Remove cached files if the max size is exceeded - (for-each - (match-lambda - ((compression . _) - (maybe-remove-cached-files-for-compression - database - enabled-cached-compressions - compression - cached-bytes-by-compression-box - nar-cache-bytes-metric))) - (atomic-box-ref cached-bytes-by-compression-box)) - - (list cached-bytes-by-compression-box))) - #:name "comp nar req" - ;; Just make one thread, as this thread won't do much work - ;; and relies on a hash table that shouldn't be accessed by - ;; multiple threads - #:parallelism 1))) - - (define with-usage-hash-table - (let ((nar-cached-compression-usage-hash-table - (make-hash-table 65536))) - (lambda (proc) - (monitor - (proc nar-cached-compression-usage-hash-table))))) - - (define (proc narinfo-id cached-bytes-by-compression-box) + (define channel + (make-channel)) + + (let ((process-job + count-jobs + count-threads + list-jobs + (create-work-queue cached-compression-workers + (lambda (thunk) + (thunk)) + #:name "cached compression"))) + + (define (consider-narinfo cached-bytes-by-compression + usage-hash-table + narinfo-id) (let* ((existing-cached-files ;; This is important both to avoid trying to create ;; files twice, but also in the case where additional @@ -333,120 +221,342 @@ (lset-difference eq? (map car enabled-cached-compressions) existing-compressions)) - (compress-file? - (let ((url - (assq-ref - (first - (database-select-narinfo-files-by-narinfo-id - database - narinfo-id)) - 'url))) + (narinfo-files + (database-select-narinfo-files-by-narinfo-id + database + narinfo-id)) + (compress-file? + (let ((url (assq-ref (first narinfo-files) 'url))) ;; TODO: Maybe this should be configurable? (not (compressed-file? url))))) (when (and compress-file? (not (null? missing-compressions))) (let ((new-count - (with-usage-hash-table - (lambda (usage) - (let ((val (+ 1 - (or (hash-ref usage narinfo-id) - 0)))) - (hash-set! usage - narinfo-id - val) - val))))) - - (when (> new-count - cached-compression-min-uses) - (for-each - (lambda (missing-compression) - (let ((new-bytes - (make-compressed-nar - database - nar-source - enabled-cached-compressions - narinfo-id - missing-compression - #:level (assq-ref - (assq-ref enabled-cached-compressions - missing-compression) - 'level)))) - - ;; Do this here, after creating the new cached nar, - ;; just in case there's a lack of space - (monitor - (let* ((cached-bytes-by-compression - (atomic-box-ref - cached-bytes-by-compression-box)) - (updated-bytes-for-compression - (+ (assq-ref cached-bytes-by-compression - missing-compression) - new-bytes))) - - (metric-set - nar-cache-bytes-metric - updated-bytes-for-compression - #:label-values `((compression . ,missing-compression))) - - (atomic-box-set! - cached-bytes-by-compression-box - (alist-cons - missing-compression - updated-bytes-for-compression - (alist-delete - missing-compression - cached-bytes-by-compression))) - - (maybe-remove-cached-files-for-compression - database - enabled-cached-compressions - missing-compression - cached-bytes-by-compression-box - nar-cache-bytes-metric))))) - missing-compressions) - - (with-usage-hash-table - (lambda (usage) - (hash-remove! usage narinfo-id)))))))) - - (let ((process-job - count-jobs - count-threads - list-jobs - (create-work-queue cached-compression-workers - proc - #:name "cached compression"))) - - (lambda (narinfo-id) - (spawn-fiber + (let ((val (+ 1 + (or (hash-ref usage-hash-table narinfo-id) + 0)))) + (hash-set! usage-hash-table + narinfo-id + val) + val))) + + (when (and (> new-count + cached-compression-min-uses)) + (let* ((narinfo-details + (database-select-narinfo database narinfo-id)) + (nar-size + (assq-ref narinfo-details 'nar-size)) + (compressions-with-space + (filter + (lambda (compression) + (let ((directory-max-size + (assq-ref + (assq-ref enabled-cached-compressions + compression) + 'directory-max-size)) + (current-directory-size + (assq-ref cached-bytes-by-compression + compression))) + (if directory-max-size + (< (+ current-directory-size + ;; Assume the commpressed nar could be + ;; as big as the uncompressed nar + nar-size) + directory-max-size) + #t))) + missing-compressions))) + (for-each + (lambda (compression) + (spawn-fiber + (lambda () + (process-job + (lambda () + (let ((new-bytes + (make-compressed-nar + narinfo-files + nar-source + enabled-cached-compressions + compression + #:level (assq-ref + (assq-ref enabled-cached-compressions + compression) + 'level)))) + (put-message channel + (list 'cached-narinfo-added + narinfo-id + compression + new-bytes)))))))) + compressions-with-space))))))) + + (spawn-fiber + (lambda () + (let ((initial-cached-bytes-by-compression + (perform-cached-compression-startup + database + enabled-cached-compressions)) + (nar-cached-compression-usage-hash-table + (make-hash-table 65536))) + + (for-each + (match-lambda + ((compression . bytes) + (metric-set + nar-cache-bytes-metric + bytes + #:label-values `((compression . ,compression))))) + initial-cached-bytes-by-compression) + + (let loop ((cached-bytes-by-compression + initial-cached-bytes-by-compression)) + (match (get-message channel) + (('narinfo-id . narinfo-id) + (consider-narinfo cached-bytes-by-compression + nar-cached-compression-usage-hash-table + narinfo-id) + (loop cached-bytes-by-compression)) + + (((and (or 'cached-narinfo-added 'cached-narinfo-removed) + action) + narinfo-id compression size) + (let ((updated-bytes + ((if (eq? action 'cached-narinfo-added) + + + -) + (or (assq-ref cached-bytes-by-compression + compression) + 0) + size))) + + (metric-set + nar-cache-bytes-metric + updated-bytes + #:label-values `((compression . ,compression))) + + (when (eq? action 'cached-narinfo-added) + (database-insert-cached-narinfo-file + database + narinfo-id + size + compression) + + (hash-remove! nar-cached-compression-usage-hash-table + narinfo-id)) + + (loop (alist-cons + cached-bytes-by-compression + updated-bytes + (alist-delete compression + cached-bytes-by-compression))))))))) + scheduler) + + channel)) + +;; Periodically check for nars that haven't been accessed in some time +;; and schedule them for removal +(define (start-cached-compression-schedule-removal-fiber + database + cached-compression-management-channel + enabled-cached-compressions + cached-compression-removal-fiber-wakeup-channel) + + (define (files-to-schedule-for-removal compression-details) + (let* ((directory (assq-ref compression-details 'directory)) + (unused-removal-duration + (assq-ref compression-details 'unused-removal-duration)) + (atime-threshold + (time-second + (subtract-duration (current-time) + unused-removal-duration)))) + (scandir + directory + (lambda (filename) + (and + (< (stat:atime (stat (string-append directory "/" filename))) + atime-threshold) + (not (member filename '("." "..")))))))) + + (define (schedule-removal compression compression-details) + (let* ((files (files-to-schedule-for-removal compression-details)) + (all-cached-narinfo-file-details + (map + (lambda (file) + (database-select-cached-narinfo-file-by-hash + database + (string-take file 32) ; hash part + compression)) + files)) + (existing-scheduled-removals + (map + (lambda (cached-narinfo-file-details) + (database-select-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-file-details 'id))) + all-cached-narinfo-file-details))) + + (for-each + (lambda (file cached-narinfo-file-details existing-scheduled-removal) + (unless existing-scheduled-removal + (let ((removal-time + ;; The earliest this can be removed is the current + ;; time, plus the TTL + (add-duration + (current-time) + (make-time time-duration + 0 + (assq-ref compression-details 'ttl))))) + (database-insert-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-file-details 'id) + removal-time)))) + files + all-cached-narinfo-file-details + existing-scheduled-removals) + + (when (any not existing-scheduled-removals) + ;; Wake the cached compression removal fiber in case one of + ;; the new scheduled removals is before it's scheduled to wake + ;; up + (put-message + cached-compression-removal-fiber-wakeup-channel + #t)))) + + (spawn-fiber + (lambda () + (let ((sleep-duration + (apply min + (map (lambda (compression-details) + (/ (time-second + (assq-ref compression-details + 'unused-removal-duration)) + 4)) + enabled-cached-compressions)))) + (while #t + (log-msg 'DEBUG "cached-compression-schedule-removal-fiber starting pass") + + (for-each + (match-lambda + ((compression . details) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "cached-compression-schedule-removal-fiber: " + "exception: " exn)) + (lambda () + (schedule-removal compression details)) + #:unwind? #t))) + enabled-cached-compressions) + + (log-msg 'DEBUG "cached-compression-schedule-removal-fiber sleeping for " + sleep-duration) + (sleep sleep-duration)))))) + +;; Process the scheduled removals for cached nars +(define (start-cached-compression-removal-fiber + database + cached-compression-management-channel + enabled-cached-compressions) + + (define (remove id narinfo-id compression store-path) + ;; Use an explicit transaction as it handles the + ;; database being busy, + (database-call-with-transaction + database + (lambda _ + (database-delete-scheduled-cached-narinfo-removal + database + id) + + ;; Remove all the database entries first, as + ;; that'll stop these files appearing in narinfos + (database-remove-cached-narinfo-file + database + narinfo-id + (symbol->string compression)))) + + (let ((directory + (assq-ref (assq-ref enabled-cached-compressions + compression) + 'directory))) + (let ((filename + (string-append + directory "/" + (basename store-path)))) + (log-msg 'DEBUG "deleting " filename) + (delete-file filename)))) + + (define wakeup-channel + (make-channel)) + + (define (make-pass) + (log-msg 'DEBUG "cached-compression-removal-fiber starting pass") + + (let ((scheduled-cached-narinfo-removal + (database-select-oldest-scheduled-cached-narinfo-removal + database))) + + (log-msg 'DEBUG "scheduled removal: " scheduled-cached-narinfo-removal) + + (if scheduled-cached-narinfo-removal + (if (time<=? (assq-ref scheduled-cached-narinfo-removal + 'scheduled-removal-time) + (current-time)) + (let ((id (assq-ref scheduled-cached-narinfo-removal 'id)) + (narinfo-id (assq-ref scheduled-cached-narinfo-removal + 'narinfo-id)) + (compression (assq-ref scheduled-cached-narinfo-removal + 'compression)) + (size (assq-ref scheduled-cached-narinfo-removal + 'size)) + (store-path (assq-ref scheduled-cached-narinfo-removal + 'store-path))) + (remove id narinfo-id compression store-path) + + (put-message + cached-compression-management-channel + (list 'cached-narinfo-removed + narinfo-id + compression + size))) + + (let ((duration + (time-difference + (assq-ref scheduled-cached-narinfo-removal + 'scheduled-removal-time) + (current-time)))) + (perform-operation + (choice-operation + (sleep-operation (max 1 (+ 1 (time-second duration)))) + (get-operation wakeup-channel))))) + + ;; Sleep until woken + (get-message wakeup-channel)))) + + + (spawn-fiber + (lambda () + (while #t + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception in cached-compression-removal-fiber: " + exn)) (lambda () - (call-with-worker-thread - consider-nar-request-channel - (lambda (cached-bytes-by-compression-box) - (let ((in-progress-narinfo-ids - (map car (list-jobs)))) - - (unless (member narinfo-id in-progress-narinfo-ids) - (process-job narinfo-id cached-bytes-by-compression-box))) - #t))) - scheduler - #:parallel? #t))))) - -(define* (make-compressed-nar database + (with-throw-handler #t + make-pass + (lambda _ + (backtrace)))) + #:unwind? #t)))) + + wakeup-channel) + +(define* (make-compressed-nar narinfo-files nar-source enabled-cached-compressions - narinfo-id target-compression #:key level) (define cached-compression-details (assq-ref enabled-cached-compressions target-compression)) - (define narinfo-files - (database-select-narinfo-files-by-narinfo-id database - narinfo-id)) - (log-msg 'DEBUG "making " target-compression " for " (uri-decode (basename @@ -565,16 +675,6 @@ (let ((bytes (stat:size (stat dest-filename)))) - - (database-call-with-transaction - database - (lambda _ - (database-insert-cached-narinfo-file - database - narinfo-id - bytes - (symbol->string target-compression)))) - (log-msg 'DEBUG "created " dest-filename) bytes)))) diff --git a/nar-herder/database.scm b/nar-herder/database.scm index c8171a3..98c29d5 100644 --- a/nar-herder/database.scm +++ b/nar-herder/database.scm @@ -43,6 +43,7 @@ database-insert-narinfo database-remove-narinfo + database-select-narinfo database-select-narinfo-by-hash database-select-narinfo-contents-by-hash @@ -64,7 +65,13 @@ database-select-cached-narinfo-file-by-hash database-select-cached-narinfo-files-by-narinfo-id database-fold-cached-narinfo-files - database-remove-cached-narinfo-file)) + database-remove-cached-narinfo-file + + database-select-scheduled-narinfo-removal + database-select-scheduled-cached-narinfo-removal + database-delete-scheduled-cached-narinfo-removal + database-select-oldest-scheduled-cached-narinfo-removal + database-insert-scheduled-cached-narinfo-removal)) (define-record-type <database> (make-database database-file reader-thread-channel writer-thread-channel @@ -145,7 +152,17 @@ CREATE TABLE cached_narinfo_files ( ); CREATE INDEX cached_narinfo_files_narinfo_id - ON cached_narinfo_files (narinfo_id);") + ON cached_narinfo_files (narinfo_id); + +CREATE TABLE scheduled_narinfo_removal ( + narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id), + removal_datetime TEXT NOT NULL +); + +CREATE TABLE scheduled_cached_narinfo_removal ( + cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id), + removal_datetime TEXT NOT NULL +);") (sqlite-exec db schema)) @@ -206,6 +223,24 @@ CREATE INDEX cached_narinfo_files_narinfo_id db "ALTER TABLE narinfos ADD COLUMN added_at TEXT;")) + (unless (table-exists? db "scheduled_narinfo_removal") + (sqlite-exec + db + " +CREATE TABLE scheduled_narinfo_removal ( + narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id), + removal_datetime TEXT NOT NULL +);")) + + (unless (table-exists? db "scheduled_cached_narinfo_removal") + (sqlite-exec + db + " +CREATE TABLE scheduled_cached_narinfo_removal ( + cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id), + removal_datetime TEXT NOT NULL +);")) + (sqlite-exec db "CREATE INDEX IF NOT EXISTS narinfo_tags_narinfo_id @@ -835,6 +870,38 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id" #t) #f))))) +(define (database-select-narinfo database id) + (call-with-time-tracking + database + "select_narinfo" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT store_path, nar_hash, nar_size, deriver, system +FROM narinfos +WHERE id = :id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:id id) + + (match (let ((result (sqlite-step statement))) + (sqlite-reset statement) + result) + (#(store_path nar_hash nar_size deriver system) + `((store-path . ,store_path) + (nar-hash . ,nar_hash) + (nar-size . ,nar_size) + (deriver . ,deriver) + (system . ,system))) + (_ + #f)))))))) + (define (database-select-narinfo-by-hash database hash) (call-with-time-tracking database @@ -1175,7 +1242,7 @@ INSERT INTO cached_narinfo_files ( statement #:narinfo_id narinfo-id #:size size - #:compression compression) + #:compression (symbol->string compression)) (sqlite-step statement) (sqlite-reset statement) @@ -1196,7 +1263,7 @@ INSERT INTO cached_narinfo_files ( (sqlite-prepare db " -SELECT cached_narinfo_files.size +SELECT cached_narinfo_files.id, cached_narinfo_files.size FROM narinfos INNER JOIN cached_narinfo_files ON cached_narinfo_files.narinfo_id = narinfos.id @@ -1207,12 +1274,13 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash (sqlite-bind-arguments statement #:hash hash - #:compression compression) + #:compression (symbol->string compression)) (let ((result (match (sqlite-step statement) - (#(size) - `((size . ,size))) + (#(id size) + `((id . ,id) + (size . ,size))) (#f #f)))) (sqlite-reset statement) @@ -1232,7 +1300,10 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash (sqlite-prepare db " -SELECT store_path, size, compression +SELECT cached_narinfo_files.id, + narinfos.store_path, + cached_narinfo_files.size, + cached_narinfo_files.compression FROM cached_narinfo_files INNER JOIN narinfos ON cached_narinfo_files.narinfo_id = narinfos.id @@ -1246,8 +1317,9 @@ WHERE narinfo_id = :narinfo_id" (let ((result (sqlite-map (match-lambda - (#(store_path size compression) - `((store-path . ,store_path) + (#(id store_path size compression) + `((id . ,id) + (store-path . ,store_path) (size . ,size) (compression . ,(string->symbol compression))))) statement))) @@ -1255,6 +1327,48 @@ WHERE narinfo_id = :narinfo_id" result))))))) +(define (database-select-cached-narinfo-file-by-narinfo-id-and-compression + database + narinfo-id + compression) + (call-with-time-tracking + database + "select_cached_narinfo_file_by_narinfo_id_and_compression" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT cached_narinfo_files.id, + narinfos.store_path, + cached_narinfo_files.size, + cached_narinfo_files.compression +FROM cached_narinfo_files +INNER JOIN narinfos + ON cached_narinfo_files.narinfo_id = narinfos.id +WHERE narinfo_id = :narinfo_id + AND compression = :compression" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:compression (symbol->string compression)) + + (let ((result + (match (sqlite-step statement) + (#(id store_path size compression) + `((id . ,id) + (store-path . ,store_path) + (size . ,size) + (compression . ,(string->symbol compression))))))) + (sqlite-reset statement) + + result))))))) + (define (database-fold-cached-narinfo-files database proc init) @@ -1306,3 +1420,156 @@ WHERE narinfo_id = :narinfo_id (sqlite-step statement) (sqlite-reset statement))))) + +(define (database-select-scheduled-narinfo-removal database narinfo-id) + (call-with-time-tracking + database + "select_scheduled_narinfo_removal" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT removal_datetime +FROM scheduled_narinfo_removal +WHERE narinfo_id = :narinfo_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id) + + (let ((result + (match (sqlite-step statement) + (#(datetime) + (date->time-utc + (string->date + datetime + "~Y-~m-~d ~H:~M:~S"))) + (#f #f)))) + (sqlite-reset statement) + result))))))) + +(define (database-select-scheduled-cached-narinfo-removal database + cached-narinfo-file-id) + (call-with-time-tracking + database + "select_scheduled_narinfo_removal" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT removal_datetime +FROM scheduled_cached_narinfo_removal +WHERE cached_narinfo_file_id = :cached_narinfo_file_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:cached_narinfo_file_id cached-narinfo-file-id) + + (let ((result + (match (sqlite-step statement) + (#(datetime) + (date->time-utc + (string->date + datetime + "~Y-~m-~d ~H:~M:~S"))) + (#f #f)))) + (sqlite-reset statement) + result))))))) + +(define (database-delete-scheduled-cached-narinfo-removal database + cached-narinfo-file-id) + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +DELETE FROM scheduled_cached_narinfo_removal +WHERE cached_narinfo_file_id = :cached_narinfo_file_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:cached_narinfo_file_id cached-narinfo-file-id) + + (sqlite-step statement) + (sqlite-reset statement) + + #t)))) + +(define (database-select-oldest-scheduled-cached-narinfo-removal database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT cached_narinfo_files.id, + cached_narinfo_files.narinfo_id, + cached_narinfo_files.size, + cached_narinfo_files.compression, + narinfos.store_path, + scheduled_cached_narinfo_removal.removal_datetime +FROM scheduled_cached_narinfo_removal +INNER JOIN cached_narinfo_files + ON scheduled_cached_narinfo_removal.cached_narinfo_file_id = + cached_narinfo_files.id +INNER JOIN narinfos + ON cached_narinfo_files.narinfo_id = narinfos.id +ORDER BY scheduled_cached_narinfo_removal.removal_datetime DESC +LIMIT 1" + #:cache? #t))) + + (let ((result + (match (sqlite-step statement) + (#(id narinfo_id size compression store_path datetime) + `((id . ,id) + (narinfo-id . ,narinfo_id) + (size . ,size) + (compression . ,(string->symbol compression)) + (store-path . ,store_path) + (scheduled-removal-time . ,(date->time-utc + (string->date + datetime + "~Y-~m-~d ~H:~M:~S"))))) + (#f #f)))) + (sqlite-reset statement) + result))))) + +(define (database-insert-scheduled-cached-narinfo-removal database + cached-narinfo-file-id + removal-datetime) + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO scheduled_cached_narinfo_removal ( + cached_narinfo_file_id, removal_datetime +) VALUES ( + :cached_narinfo_file_id, :removal_datetime +)" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:cached_narinfo_file_id cached-narinfo-file-id + #:removal_datetime (date->string + (time-utc->date removal-datetime) + "~Y-~m-~d ~H:~M:~S")) + + (sqlite-step statement) + (sqlite-reset statement) + + #t)))) diff --git a/nar-herder/server.scm b/nar-herder/server.scm index ff3f131..583b4a3 100644 --- a/nar-herder/server.scm +++ b/nar-herder/server.scm @@ -19,6 +19,7 @@ (define-module (nar-herder server) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) + #:use-module (srfi srfi-19) #:use-module (srfi srfi-34) #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) @@ -147,10 +148,8 @@ (proc port size))))) -(define (add-cached-compressions-to-narinfo - database - narinfo-id - initial-narinfo-contents) +(define (add-cached-compressions-to-narinfo initial-narinfo-contents + cached-narinfo-files) (let ((cached-nar-strings (map (lambda (cached-nar-details) (let ((compression @@ -163,10 +162,9 @@ "\n" "Compression: " compression "\n" "FileSize: " (number->string - (assq-ref cached-nar-details 'size))))) - (database-select-cached-narinfo-files-by-narinfo-id - database - narinfo-id)))) + (assq-ref cached-nar-details 'size)) + "\n"))) + cached-narinfo-files))) (string-append initial-narinfo-contents (string-join @@ -177,7 +175,8 @@ #:key base-ttl base-cached-compressions-ttl negative-ttl logger metrics-registry - maybe-trigger-creation-of-compressed-nars) + maybe-trigger-creation-of-cached-nars + cached-compression-nar-requested-hook) (define hostname (gethostname)) @@ -244,11 +243,55 @@ "404")) (if base-narinfo-contents - (let ((narinfo-contents - (add-cached-compressions-to-narinfo - database - narinfo-id - base-narinfo-contents))) + (let* ((cached-narinfo-files + (database-select-cached-narinfo-files-by-narinfo-id + database + narinfo-id)) + (narinfo-contents + (if (null? cached-narinfo-files) + base-narinfo-contents + (add-cached-compressions-to-narinfo + base-narinfo-contents + cached-narinfo-files))) + (potential-ttls + (remove + not + `(,(if (null? cached-narinfo-files) + base-ttl + base-cached-compressions-ttl) + + ,(and=> (database-select-scheduled-narinfo-removal + database + narinfo-id) + (lambda (scheduled-removal-time) + (list + (max + (- (time-second + (time-difference scheduled-removal-time + (current-time))) + 60) + 0)))) + + ,@(if (null? cached-narinfo-files) + '() + (map + (lambda (details) + (and=> + (database-select-scheduled-cached-narinfo-removal + database + (assq-ref details 'id)) + (lambda (scheduled-removal-time) + (max + (- (time-second + (time-difference scheduled-removal-time + (current-time))) + 60) + 0)))) + cached-narinfo-files))))) + (ttl + (cond + ((null? potential-ttls) #f) + (else (apply min potential-ttls))))) (values `((content-type . (text/plain)) ,@(if ttl @@ -320,8 +363,8 @@ (request-via request)))) (when (and (not loop?) - maybe-trigger-creation-of-compressed-nars) - (maybe-trigger-creation-of-compressed-nars + maybe-trigger-creation-of-cached-nars) + (maybe-trigger-creation-of-cached-nars (assq-ref narinfo 'id))) (when loop? @@ -359,10 +402,15 @@ #f))) (let ((cached-narinfo-file (and compression-symbol + ;; Check that the filename given in the + ;; request matches the narinfo store-path + (string=? filename + (basename + (assq-ref narinfo 'store-path))) (database-select-cached-narinfo-file-by-hash database hash - compression)))) + compression-symbol)))) (when (or cached-narinfo-file ;; Check for a common compression to avoid lots of @@ -380,6 +428,10 @@ '())) '()))) + (when cached-narinfo-file + (cached-compression-nar-requested-hook compression-symbol + filename)) + (if cached-narinfo-file (values (build-response #:code 200 @@ -592,7 +644,27 @@ (match-lambda (('cached-compression-directory-max-size . details) details) (_ #f)) + opts)) + (cached-compression-ttls + (filter-map + (match-lambda + (('cached-compression-ttl . details) details) + (_ #f)) + opts)) + (cached-compression-new-ttls + (filter-map + (match-lambda + (('cached-compression-new-ttl . details) details) + (_ #f)) + opts)) + (cached-compression-unused-removal-durations + (filter-map + (match-lambda + (('cached-compression-unused-removal-duration . details) + details) + (_ #f)) opts))) + (filter-map (match-lambda (('cached-compression . details) @@ -607,6 +679,15 @@ compression))) (directory-max-size . ,(assq-ref cached-compression-directories-max-sizes + compression)) + (ttl + . ,(assq-ref cached-compression-ttls + compression)) + (new-ttl + . ,(assq-ref cached-compression-new-ttls + compression)) + (unused-removal-duration + . ,(assq-ref cached-compression-unused-removal-durations compression)))))) (_ #f)) opts))) @@ -614,10 +695,10 @@ (cached-compression-min-uses (assq-ref opts 'cached-compression-min-uses)) - (maybe-trigger-creation-of-compressed-nars + (cached-compression-management-channel (if (null? enabled-cached-compressions) #f - (make-maybe-trigger-creation-of-compressed-nars + (start-cached-compression-management-fiber database metrics-registry (or (assq-ref opts 'cached-compression-nar-source) @@ -626,7 +707,30 @@ cached-compression-min-uses #:cached-compression-workers (assq-ref opts 'cached-compression-workers) - #:scheduler maintenance-scheduler)))) + #:scheduler maintenance-scheduler))) + + (maybe-trigger-creation-of-cached-nars + (if (null? enabled-cached-compressions) + #f + (lambda (narinfo-id) + (spawn-fiber + (lambda () + (put-message cached-compression-management-channel + (cons 'narinfo-id narinfo-id))) + maintenance-scheduler)))) + + (cached-compression-nar-requested-hook + (if (null? enabled-cached-compressions) + #f + (lambda (compression filename) + (spawn-fiber + (lambda () + (let* ((directory + (assq-ref (assq-ref enabled-cached-compressions + compression) + 'directory))) + (utime (peek "UTIME" (string-append directory "/" filename))))) + maintenance-scheduler))))) (if (string=? (assq-ref opts 'database-dump) "disabled") @@ -726,6 +830,18 @@ addition-channel removal-channel)) + (unless (null? enabled-cached-compressions) + (let ((cached-compression-removal-fiber-wakeup-channel + (start-cached-compression-removal-fiber + database + cached-compression-management-channel + enabled-cached-compressions))) + (start-cached-compression-schedule-removal-fiber + database + cached-compression-management-channel + enabled-cached-compressions + cached-compression-removal-fiber-wakeup-channel))) + (log-msg 'DEBUG "finished maintenance setup") (wait finished?)) #:scheduler maintenance-scheduler @@ -767,8 +883,10 @@ #:negative-ttl (assq-ref opts 'narinfo-negative-ttl) #:logger lgr #:metrics-registry metrics-registry - #:maybe-trigger-creation-of-compressed-nars - maybe-trigger-creation-of-compressed-nars) + #:maybe-trigger-creation-of-cached-nars + maybe-trigger-creation-of-cached-nars + #:cached-compression-nar-requested-hook + cached-compression-nar-requested-hook) #:host (assq-ref opts 'host) #:port (assq-ref opts 'port)) diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in index e81da92..e4ce9c2 100644 --- a/scripts/nar-herder.in +++ b/scripts/nar-herder.in @@ -208,18 +208,49 @@ arg (alist-delete 'cached-compression-nar-source result)))) - - (option '("ttl") #t #f + (option '("cached-compressions-unused-removal-duration") #t #f (lambda (opt name arg result) - (let ((duration (string->duration arg))) - (unless duration - (simple-format (current-error-port) - "~A: invalid duration\n" - arg) + (alist-cons + 'cached-compression-unused-removal-duration + (match (string-split arg #\=) + ((_) + (simple-format + (current-error-port) + "cached-compressions-unused-removal-duration: you must specify compression and value\n") (exit 1)) - (alist-cons 'narinfo-ttl (time-second duration) - result)))) - (option '("new-ttl") #t #f + ((type duration-string) + (cons (string->symbol type) + (let ((duration (string->duration duration-string))) + (unless duration + (simple-format + (current-error-port) + "~A: cached-compressions-unused-removal-duration: invalid duration\n" + arg) + (exit 1)) + + duration)))) + result))) + (option '("cached-compressions-ttl") #t #f + (lambda (opt name arg result) + (alist-cons 'cached-compression-ttl + (match (string-split arg #\=) + ((_) + (simple-format + (current-error-port) + "cached-compressions-ttl: you must specify compression and value\n") + (exit 1)) + ((type ttl-string) + (let ((duration (string->duration ttl-string))) + (unless duration + (simple-format (current-error-port) + "~A: invalid duration\n" + arg) + (exit 1)) + + (cons (string->symbol type) + (time-second duration))))) + result))) + (option '("cached-compressions-new-ttl") #t #f (lambda (opt name arg result) (let ((duration (string->duration arg))) (unless duration @@ -227,9 +258,14 @@ "~A: invalid duration\n" arg) (exit 1)) - (alist-cons 'new-narinfo-ttl (time-second duration) + (alist-cons 'cached-compression-new-ttl + (match (string-split arg #\=) + ((type size) + (cons (string->symbol type) + (time-second duration)))) result)))) - (option '("cached-compressions-ttl") #t #f + + (option '("ttl") #t #f (lambda (opt name arg result) (let ((duration (string->duration arg))) (unless duration @@ -237,10 +273,9 @@ "~A: invalid duration\n" arg) (exit 1)) - (alist-cons 'cached-compressions-narinfo-ttl - (time-second duration) + (alist-cons 'narinfo-ttl (time-second duration) result)))) - (option '("new-cached-compressions-ttl") #t #f + (option '("new-ttl") #t #f (lambda (opt name arg result) (let ((duration (string->duration arg))) (unless duration @@ -248,8 +283,7 @@ "~A: invalid duration\n" arg) (exit 1)) - (alist-cons 'new-cached-compressions-narinfo-ttl - (time-second duration) + (alist-cons 'new-narinfo-ttl (time-second duration) result)))) (option '("negative-ttl") #t #f (lambda (opt name arg result) |