;;; Nar Herder ;;; ;;; Copyright © 2021 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (nar-herder database) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (web uri) #:use-module (sqlite3) #:use-module (fibers) #:use-module (prometheus) #:use-module (guix store) #:use-module (guix narinfo) #:use-module (guix derivations) #:use-module (nar-herder utils) #:export (setup-database database-optimize database-spawn-fibers database-call-with-transaction dump-database database-insert-narinfo database-remove-narinfo database-select-narinfo-by-hash database-select-narinfo-contents-by-hash database-count-recent-changes database-select-recent-changes database-select-latest-recent-change-datetime database-get-recent-changes-id-for-deletion database-delete-recent-changes-with-id-below database-select-narinfo-for-file database-select-narinfo-files database-select-narinfo-files-by-narinfo-id database-fold-all-narinfo-files database-map-all-narinfo-files database-count-narinfo-files database-insert-cached-narinfo-file database-select-cached-narinfo-file-by-hash database-select-cached-narinfo-files-by-narinfo-id database-fold-cached-narinfo-files database-remove-cached-narinfo-file)) (define-record-type (make-database database-file reader-thread-channel writer-thread-channel metrics-registry) database? (database-file database-file) (reader-thread-channel database-reader-thread-channel) (writer-thread-channel database-writer-thread-channel) (metrics-registry database-metrics-registry)) (define* (db-open database #:key (write? #t)) (define flags `(,@(if write? (list SQLITE_OPEN_READWRITE SQLITE_OPEN_CREATE) (list SQLITE_OPEN_READONLY)) ,SQLITE_OPEN_NOMUTEX)) (sqlite-open database (apply logior flags))) (define (perform-initial-database-setup db) (define schema " CREATE TABLE narinfos ( id INTEGER PRIMARY KEY ASC, store_path TEXT NOT NULL, nar_hash TEXT NOT NULL, nar_size INTEGER NOT NULL, deriver TEXT, system TEXT, contents NOT NULL, added_at TEXT ); CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32)); CREATE TABLE narinfo_files ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), size INTEGER NOT NULL, compression TEXT, url TEXT NOT NULL ); CREATE INDEX narinfo_files_narinfo_id ON narinfo_files (narinfo_id); CREATE TABLE narinfo_references ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), store_path TEXT NOT NULL ); CREATE TABLE tags ( id INTEGER PRIMARY KEY ASC, key TEXT NOT NULL, value TEXT NOT NULL ); CREATE UNIQUE INDEX tags_index ON tags (key, value); CREATE TABLE narinfo_tags ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), tag_id INTEGER NOT NULL REFERENCES tags (id) ); CREATE TABLE recent_changes ( id INTEGER PRIMARY KEY ASC, datetime TEXT NOT NULL, change TEXT NOT NULl, data TEXT NOT NULL ); CREATE TABLE cached_narinfo_files ( id INTEGER PRIMARY KEY ASC, narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), size INTEGER NOT NULL, compression TEXT ); CREATE INDEX cached_narinfo_files_narinfo_id ON cached_narinfo_files (narinfo_id);") (sqlite-exec db schema)) (define (table-exists? db name) (let ((statement (sqlite-prepare db " SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = :name"))) (sqlite-bind-arguments statement #:name name) (let ((result (match (sqlite-step statement) (#f #f) (#(1) #t)))) (sqlite-finalize statement) result))) (define (column-exists? db table-name column-name) (let ((statement (sqlite-prepare db (simple-format #f "PRAGMA table_info(~A);" table-name)))) (let ((columns (sqlite-map (lambda (row) (vector-ref row 1)) statement))) (sqlite-finalize statement) (member column-name columns)))) (define (update-schema db) (unless (table-exists? db "narinfos") (perform-initial-database-setup db)) (unless (table-exists? db "cached_narinfo_files") (sqlite-exec db " CREATE TABLE cached_narinfo_files ( id INTEGER PRIMARY KEY ASC, narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), size INTEGER NOT NULL, compression TEXT NOT NULL ); CREATE INDEX cached_narinfo_files_narinfo_id ON cached_narinfo_files (narinfo_id);")) (unless (column-exists? db "narinfos" "added_at") (sqlite-exec db "ALTER TABLE narinfos ADD COLUMN added_at TEXT;")) (sqlite-exec db "CREATE INDEX IF NOT EXISTS narinfo_tags_narinfo_id ON narinfo_tags (narinfo_id);") (sqlite-exec db "CREATE INDEX IF NOT EXISTS narinfo_references_narinfo_id ON narinfo_references (narinfo_id);") (sqlite-exec db "CREATE INDEX IF NOT EXISTS narinfo_files_narinfo_id ON narinfo_files (narinfo_id);")) (define (setup-database database-file metrics-registry) (define mmap-size #f) (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA optimize;") (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") (update-schema db) ;; (let ((requested-mmap-bytes 2147418112) ;; (statement ;; (sqlite-prepare ;; db ;; (simple-format #f "PRAGMA mmap_size=~A;" ;; 2147418112)))) ;; (match (sqlite-step statement) ;; (#(result-mmap-size) ;; (sqlite-finalize statement) ;; (set! mmap-size ;; result-mmap-size)))) (sqlite-close db)) (let ((reader-thread-channel (make-worker-thread-set (lambda () (let ((db (db-open database-file #:write? #f))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (when mmap-size (sqlite-exec db (simple-format #f "PRAGMA mmap_size=~A;" (number->string mmap-size)))) (list db))) #:destructor (lambda (db) (sqlite-close db)) #:lifetime 50000 #:name "db r" ;; Use a minimum of 2 and a maximum of 8 threads #:parallelism (min (max (current-processor-count) 2) 64) #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "database_read_delay_seconds"))) (lambda (seconds-delayed proc) (metric-observe delay-metric seconds-delayed) (when (> seconds-delayed 1) (display (format #f "warning: database read (~a) delayed by ~1,2f seconds~%" proc seconds-delayed) (current-error-port))))) #:duration-logger (lambda (duration proc) (when (> duration 5) (display (format #f "warning: database read took ~1,2f seconds (~a)~%" duration proc) (current-error-port)))))) (writer-thread-channel (make-worker-thread-set (lambda () (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (sqlite-exec db "PRAGMA foreign_keys = ON;") (when mmap-size (sqlite-exec db (simple-format #f "PRAGMA mmap_size=~A;" (number->string mmap-size)))) (list db))) #:destructor (lambda (db) (db-optimize db database-file) (sqlite-close db)) #:lifetime 500 #:name "db w" ;; SQLite doesn't support parallel writes #:parallelism 1 #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "database_write_delay_seconds"))) (lambda (seconds-delayed proc) (metric-observe delay-metric seconds-delayed) (when (> seconds-delayed 1) (display (format #f "warning: database write (~a) delayed by ~1,2f seconds~%" proc seconds-delayed) (current-error-port))))) #:duration-logger (lambda (duration proc) (when (> duration 5) (display (format #f "warning: database write took ~1,2f seconds (~a)~%" duration proc) (current-error-port))))))) (make-database database-file reader-thread-channel writer-thread-channel metrics-registry))) (define (db-optimize db db-filename) (define (wal-size) (let ((db-wal-filename (string-append db-filename "-wal"))) (stat:size (stat db-wal-filename)))) (define MiB (* (expt 2 20) 1.)) (define wal-size-threshold (* 5 MiB)) (when (> (wal-size) wal-size-threshold) (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") (sqlite-exec db " PRAGMA analysis_limit=1000; PRAGMA optimize;"))) (define (database-optimize database) (retry-on-error (lambda () (call-with-worker-thread (database-writer-thread-channel database) (lambda (db) (db-optimize db (database-file database))))) #:times 5 #:delay 5)) (define (database-spawn-fibers database) (spawn-fiber (lambda () (while #t (sleep (* 60 5)) ; 5 minutes (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception when performing WAL checkpoint: ~A\n" exn)) (lambda () (database-optimize database)) #:unwind? #t))) #:parallel? #t)) (define (call-with-time-tracking database thing thunk) (define registry (database-metrics-registry database)) (define metric-name (string-append "database_" thing "_duration_seconds")) (if registry (call-with-duration-metric registry metric-name thunk) (thunk))) (define %current-transaction-proc (make-parameter #f)) (define* (database-call-with-transaction database proc #:key readonly? (immediate? (not readonly?))) (define (run-proc-within-transaction db) (with-exception-handler (lambda (exn) (match (exception-args exn) (('sqlite-exec 5 msg) (simple-format (current-error-port) "warning: sqlite error: ~A\n" msg) (run-proc-within-transaction db)) (_ (simple-format (current-error-port) "exception starting transaction\n") (raise-exception exn)))) (lambda () (sqlite-exec db (if immediate? "BEGIN IMMEDIATE TRANSACTION;" "BEGIN TRANSACTION;")) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: sqlite rolling back transaction\n") (sqlite-exec db "ROLLBACK TRANSACTION;") (raise-exception exn)) (lambda () (call-with-values (lambda () (parameterize ((%current-transaction-proc proc)) (proc-with-duration-timing db))) (lambda vals (sqlite-exec db "COMMIT TRANSACTION;") (apply values vals)))) #:unwind? #t)) #:unwind? #t)) (define (proc-with-duration-timing db) (let ((start-time (get-internal-real-time))) (call-with-values (lambda () (proc db)) (lambda vals (let ((duration-seconds (/ (- (get-internal-real-time) start-time) internal-time-units-per-second))) (when (and (not readonly?) (> duration-seconds 2)) (display (format #f "warning: ~a:\n took ~4f seconds in transaction\n" proc duration-seconds) (current-error-port))) (cons duration-seconds vals)))))) (match (call-with-worker-thread ((if readonly? database-reader-thread-channel database-writer-thread-channel) database) (lambda (db) (if (%current-transaction-proc) (proc-with-duration-timing db) ; already in transaction (run-proc-within-transaction db)))) ((duration vals ...) (apply values vals)))) (define (dump-database database name) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (sqlite-exec db (string-append "VACUUM INTO '" name "';"))))) (define (last-insert-rowid db) (let ((statement (sqlite-prepare db "SELECT last_insert_rowid();" #:cache? #t))) (let ((id (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) id))) (define (changes db) (let ((statement (sqlite-prepare db "SELECT changes()" #:cache? #t))) (let ((id (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) id))) (define (tag->tag-id db key value) (define (insert-tag) (let ((statement (sqlite-prepare db " INSERT INTO tags (key, value) VALUES (:key, :value)" #:cache? #t))) (sqlite-bind-arguments statement #:key key #:value value) (sqlite-step statement) (sqlite-reset statement) (last-insert-rowid db))) (let ((statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :key AND value = :value" #:cache? #t))) (sqlite-bind-arguments statement #:key key #:value value) (match (let ((result (sqlite-step statement))) (sqlite-reset statement) result) (#f (insert-tag)) (#(id) id)))) (define* (database-insert-narinfo database narinfo #:key change-datetime (tags '())) (define (insert-narinfo-record db) (let ((statement (sqlite-prepare db " INSERT INTO narinfos ( store_path, nar_hash, nar_size, deriver, system, contents, added_at ) VALUES ( :store_path, :nar_hash, :nar_size, :deriver, :system, :contents, :added_at )" #:cache? #t))) (sqlite-bind-arguments statement #:store_path (narinfo-path narinfo) #:nar_hash (narinfo-hash narinfo) #:nar_size (narinfo-size narinfo) #:deriver (narinfo-deriver narinfo) #:system (narinfo-system narinfo) #:contents (narinfo-contents narinfo) #:added_at (date->string (current-date) "~1 ~3")) (sqlite-step statement) (sqlite-reset statement) (last-insert-rowid db))) (define (insert-narinfo-file db narinfo-id size compression url) (let ((statement (sqlite-prepare db " INSERT INTO narinfo_files ( narinfo_id, size, compression, url ) VALUES ( :narinfo_id, :size, :compression, :url )" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id #:size size #:compression compression #:url url) (sqlite-step statement) (sqlite-reset statement))) (define (insert-narinfo-references db narinfo-id store-path) (let ((statement (sqlite-prepare db " INSERT INTO narinfo_references ( narinfo_id, store_path ) VALUES ( :narinfo_id, :store_path )" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id #:store_path store-path) (sqlite-step statement) (sqlite-reset statement))) (define (insert-change db contents) (let ((statement (sqlite-prepare db " INSERT INTO recent_changes ( datetime, change, data ) VALUES ( datetime('now'), 'addition', :contents )" #:cache? #t))) (sqlite-bind-arguments statement #:contents contents) (sqlite-step statement) (sqlite-reset statement))) (define (insert-change-with-datetime db contents datetime) (let ((statement (sqlite-prepare db " INSERT INTO recent_changes ( datetime, change, data ) VALUES ( :datetime, 'addition', :contents )" #:cache? #t))) (sqlite-bind-arguments statement #:contents contents #:datetime datetime) (sqlite-step statement) (sqlite-reset statement))) (define (insert-tags db narinfo-id tags) (let ((statement (sqlite-prepare db " INSERT INTO narinfo_tags (narinfo_id, tag_id) VALUES (:narinfo_id, :tag_id)" #:cache? #t))) (map (match-lambda ((key . value) (let ((tag-id (tag->tag-id db key value))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id #:tag_id tag-id) (sqlite-step statement) (sqlite-reset statement)))) tags))) (database-call-with-transaction database (lambda (db) (let ((narinfo-id (insert-narinfo-record db))) (let ((len (length (narinfo-uris narinfo)))) (for-each insert-narinfo-file (make-list len db) (make-list len narinfo-id) (narinfo-file-sizes narinfo) (narinfo-compressions narinfo) (map uri-path (narinfo-uris narinfo)))) (let ((references (narinfo-references narinfo))) (for-each insert-narinfo-references (make-list (length references) db) (make-list (length references) narinfo-id) references)) (if change-datetime (insert-change-with-datetime db (narinfo-contents narinfo) change-datetime) (insert-change db (narinfo-contents narinfo))) (unless (null? tags) (insert-tags db narinfo-id tags)) narinfo-id)))) (define* (database-remove-narinfo database store-path #:key change-datetime) (define (remove-narinfo-record db id) (let ((statement (sqlite-prepare db " DELETE FROM narinfos WHERE id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id id) (sqlite-step statement) (sqlite-reset statement))) (define (remove-narinfo-files db narinfo-id) (let ((statement (sqlite-prepare db " DELETE FROM narinfo_files WHERE narinfo_id = :narinfo_id" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id) (sqlite-map (const #t) statement) (sqlite-reset statement))) (define (remove-narinfo-references db narinfo-id) (let ((statement (sqlite-prepare db " DELETE FROM narinfo_references WHERE narinfo_id = :narinfo_id" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id) (sqlite-map (const #t) statement) (sqlite-reset statement))) (define (insert-change db contents) (let ((statement (sqlite-prepare db " INSERT INTO recent_changes ( datetime, change, data ) VALUES ( datetime('now'), 'removal', :store_path )" #:cache? #t))) (sqlite-bind-arguments statement #:store_path store-path) (sqlite-step statement) (sqlite-reset statement))) (define (insert-change-with-datetime db store-path datetime) (let ((statement (sqlite-prepare db " INSERT INTO recent_changes ( datetime, change, data ) VALUES ( :datetime, 'removal', :store_path )" #:cache? #t))) (sqlite-bind-arguments statement #:store_path store-path #:datetime datetime) (sqlite-step statement) (sqlite-reset statement))) (define (remove-tags db narinfo-id) (let ((statement (sqlite-prepare db " DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id) (sqlite-map (const #t) statement) (sqlite-reset statement))) (database-call-with-transaction database (lambda (db) (let ((narinfo-details (database-select-narinfo-by-hash database (store-path-hash-part store-path)))) (if narinfo-details (let ((narinfo-id (assq-ref narinfo-details 'id))) (if change-datetime (insert-change-with-datetime db store-path change-datetime) (insert-change db store-path)) (remove-narinfo-files db narinfo-id) (remove-narinfo-references db narinfo-id) (remove-tags db narinfo-id) (for-each (lambda (cached-details) (database-remove-cached-narinfo-file database narinfo-id (symbol->string (assq-ref cached-details 'compression)))) (database-select-cached-narinfo-files-by-narinfo-id database narinfo-id)) (remove-narinfo-record db narinfo-id) #t) #f))))) (define (database-select-narinfo-by-hash database hash) (call-with-time-tracking database "select_narinfo_by_hash" (lambda () (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, store_path, nar_hash, nar_size, deriver, system FROM narinfos WHERE substr(store_path, 12, 32) = :hash" #:cache? #t))) (sqlite-bind-arguments statement #:hash hash) (match (let ((result (sqlite-step statement))) (sqlite-reset statement) result) (#(id store_path nar_hash nar_size deriver system) `((id . ,id) (store-path . ,store_path) (nar-hash . ,nar_hash) (nar-size . ,nar_size) (deriver . ,deriver) (system . ,system))) (_ #f)))))))) (define (database-select-narinfo-contents-by-hash database hash) (call-with-time-tracking database "select_narinfo_contents_by_hash" (lambda () (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" #:cache? #t))) (sqlite-bind-arguments statement #:hash hash) (match (let ((result (sqlite-step statement))) (sqlite-reset statement) result) (#(id contents) (values contents id)) (_ (values #f #f))))))))) (define (database-count-recent-changes database) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT COUNT(*) FROM recent_changes" #:cache? #t))) (let ((result (match (sqlite-step statement) (#(count) count)))) (sqlite-reset statement) result))))) (define* (database-select-recent-changes database after-date #:key (limit 8192)) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT datetime, change, data FROM recent_changes WHERE datetime >= :datetime ORDER BY datetime ASC LIMIT :limit" #:cache? #t))) (sqlite-bind-arguments statement #:datetime after-date #:limit limit) (let ((result (sqlite-map (match-lambda (#(datetime change data) `((datetime . ,datetime) (change . ,change) (data . ,data)))) statement))) (sqlite-reset statement) result))))) (define (database-select-latest-recent-change-datetime database) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT datetime FROM recent_changes ORDER BY datetime DESC LIMIT 1" #:cache? #t))) (let ((result (match (sqlite-step statement) (#(date) date) (#f #f)))) (sqlite-reset statement) result))))) (define (database-get-recent-changes-id-for-deletion database limit) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id FROM recent_changes ORDER BY datetime DESC LIMIT 1 OFFSET :offset" #:cache? #t))) (sqlite-bind-arguments statement #:offset limit) (let ((result (match (sqlite-step statement) (#(id) id) (#f #f)))) (sqlite-reset statement) result))))) (define (database-delete-recent-changes-with-id-below database id) (call-with-worker-thread (database-writer-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " DELETE FROM recent_changes WHERE id < :id" #:cache? #t))) (sqlite-bind-arguments statement #:id id) (sqlite-step statement) (sqlite-reset statement) (changes db))))) (define (database-select-narinfo-for-file database narinfo-file-url) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT narinfos.store_path, narinfos.nar_hash, narinfos.nar_size, narinfos.deriver, narinfos.system, narinfos.contents FROM narinfos INNER JOIN narinfo_files ON narinfos.id = narinfo_files.narinfo_id WHERE narinfo_files.url = :url" #:cache? #t))) (sqlite-bind-arguments statement #:url narinfo-file-url) (match (sqlite-step statement) (#(store-path nar-hash nar-size deriver system contents) (sqlite-reset statement) `((store-path . ,store-path) (nar-hash . ,nar-hash) (nar-size . ,nar-size) (deriver . ,deriver) (system . ,system) (contents . ,contents)))))))) (define (database-select-narinfo-files database hash) (call-with-time-tracking database "select_narinfo_files" (lambda () (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url FROM narinfos INNER JOIN narinfo_files ON narinfos.id = narinfo_files.narinfo_id WHERE substr(narinfos.store_path, 12, 32) = :hash" #:cache? #t))) (sqlite-bind-arguments statement #:hash hash) (let ((result (sqlite-map (match-lambda (#(size compression url) `((size . ,size) (compression . ,compression) (url . ,url)))) statement))) (sqlite-reset statement) result))))))) (define (database-select-narinfo-files-by-narinfo-id database narinfo-id) (call-with-time-tracking database "select_narinfo_files_by_narinfo_id" (lambda () (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url FROM narinfos INNER JOIN narinfo_files ON narinfos.id = narinfo_files.narinfo_id WHERE narinfos.id = :narinfo_id" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id) (let ((result (sqlite-map (match-lambda (#(size compression url) `((size . ,size) (compression . ,compression) (url . ,url)))) statement))) (sqlite-reset statement) result))))))) (define (database-fold-all-narinfo-files database proc init) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT size, compression, url FROM narinfo_files" #:cache? #t))) (let ((result-list (sqlite-fold (lambda (row result) (match row (#(size compression url) (proc `((size . ,size) (compression . ,compression) (url . ,url)) result)))) init statement))) (sqlite-reset statement) result-list))))) (define (database-map-all-narinfo-files database proc) (database-fold-all-narinfo-files database (lambda (nar-file result) (cons (proc nar-file) result)) '())) (define (database-count-narinfo-files database) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT COUNT(*) FROM narinfo_files" #:cache? #t))) (let ((result (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) result))))) (define (database-insert-cached-narinfo-file database narinfo-id size compression) (call-with-worker-thread (database-writer-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " INSERT INTO cached_narinfo_files ( narinfo_id, size, compression ) VALUES ( :narinfo_id, :size, :compression )" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id #:size size #:compression compression) (sqlite-step statement) (sqlite-reset statement) (last-insert-rowid db))))) (define (database-select-cached-narinfo-file-by-hash database hash compression) (call-with-time-tracking database "select_cached_narinfo_file_by_hash" (lambda () (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT cached_narinfo_files.size FROM narinfos INNER JOIN cached_narinfo_files ON cached_narinfo_files.narinfo_id = narinfos.id WHERE substr(narinfos.store_path, 12, 32) = :hash AND cached_narinfo_files.compression = :compression" #:cache? #t))) (sqlite-bind-arguments statement #:hash hash #:compression compression) (let ((result (match (sqlite-step statement) (#(size) `((size . ,size))) (#f #f)))) (sqlite-reset statement) result))))))) (define (database-select-cached-narinfo-files-by-narinfo-id database narinfo-id) (call-with-time-tracking database "select_cached_narinfo_file_by_narinfo_id" (lambda () (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT store_path, size, compression FROM cached_narinfo_files INNER JOIN narinfos ON cached_narinfo_files.narinfo_id = narinfos.id WHERE narinfo_id = :narinfo_id" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id) (let ((result (sqlite-map (match-lambda (#(store_path size compression) `((store-path . ,store_path) (size . ,size) (compression . ,(string->symbol compression))))) statement))) (sqlite-reset statement) result))))))) (define (database-fold-cached-narinfo-files database proc init) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT store_path, size, compression, narinfo_id FROM cached_narinfo_files INNER JOIN narinfos ON cached_narinfo_files.narinfo_id = narinfos.id" #:cache? #t))) (let ((result-list (sqlite-fold (lambda (row result) (match row (#(store_path size compression narinfo_id) (proc `((size . ,size) (compression . ,(string->symbol compression)) (store-path . ,store_path) (narinfo-id . ,narinfo_id)) result)))) init statement))) (sqlite-reset statement) result-list))))) (define (database-remove-cached-narinfo-file database narinfo-id compression) (call-with-worker-thread (database-writer-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " DELETE FROM cached_narinfo_files WHERE narinfo_id = :narinfo_id AND compression = :compression" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id #:compression compression) (sqlite-step statement) (sqlite-reset statement)))))