;;; Nar Herder ;;; ;;; Copyright © 2021 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (nar-herder database) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (web uri) #:use-module (sqlite3) #:use-module (fibers) #:use-module (prometheus) #:use-module (guix narinfo) #:use-module (guix derivations) #:use-module (nar-herder utils) #:export (setup-database database-optimize database-spawn-fibers database-call-with-transaction dump-database database-insert-narinfo database-remove-narinfo database-select-narinfo-contents-by-hash database-select-recent-changes database-select-latest-recent-change-datetime database-get-recent-changes-id-for-deletion database-delete-recent-changes-with-id-below database-select-narinfo-for-file database-select-narinfo-files database-map-all-narinfo-files)) (define-record-type (make-database database-file reader-thread-channel writer-thread-channel metrics-registry) database? (database-file database-file) (reader-thread-channel database-reader-thread-channel) (writer-thread-channel database-writer-thread-channel) (metrics-registry database-metrics-registry)) (define* (db-open database #:key (write? #t)) (define flags `(,@(if write? (list SQLITE_OPEN_READWRITE SQLITE_OPEN_CREATE) (list SQLITE_OPEN_READONLY)) ,SQLITE_OPEN_NOMUTEX)) (sqlite-open database (apply logior flags))) (define (perform-initial-database-setup db) (define schema " CREATE TABLE narinfos ( id INTEGER PRIMARY KEY ASC, store_path TEXT NOT NULL, nar_hash TEXT NOT NULL, nar_size INTEGER NOT NULL, deriver TEXT, system TEXT, contents NOT NULL ); CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32)); CREATE TABLE narinfo_files ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), size INTEGER NOT NULL, compression TEXT, url TEXT NOT NULL ); CREATE INDEX narinfo_files_narinfo_id ON narinfo_files (narinfo_id); CREATE TABLE narinfo_references ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), store_path TEXT NOT NULL ); CREATE TABLE tags ( id INTEGER PRIMARY KEY ASC, key TEXT NOT NULL, value TEXT NOT NULL ); CREATE UNIQUE INDEX tags_index ON tags (key, value); CREATE TABLE narinfo_tags ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), tag_id INTEGER NOT NULL REFERENCES tags (id) ); CREATE TABLE recent_changes ( id INTEGER PRIMARY KEY ASC, datetime TEXT NOT NULL, change TEXT NOT NULl, data TEXT NOT NULL );") (sqlite-exec db schema)) (define (update-schema db) (let ((statement (sqlite-prepare db " SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) (sqlite-bind-arguments statement #:name "narinfos") (match (sqlite-step statement) (#f (perform-initial-database-setup db)) (_ #f)) (sqlite-finalize statement)) (sqlite-exec db "CREATE INDEX IF NOT EXISTS narinfo_files_narinfo_id ON narinfo_files (narinfo_id);")) (define (setup-database database-file metrics-registry) (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA optimize;") (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") (update-schema db) (sqlite-close db)) (let ((reader-thread-channel (make-worker-thread-channel (lambda () (let ((db (db-open database-file #:write? #f))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (list db))) #:destructor (lambda (db) (sqlite-close db)) #:lifetime 50000 ;; Use a minimum of 2 and a maximum of 8 threads #:parallelism (min (max (current-processor-count) 2) 64) #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "database_read_delay_seconds"))) (lambda (seconds-delayed) (metric-observe delay-metric seconds-delayed) (when (> seconds-delayed 1) (format (current-error-port) "warning: database read delayed by ~1,2f seconds~%" seconds-delayed)))))) (writer-thread-channel (make-worker-thread-channel (lambda () (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (sqlite-exec db "PRAGMA foreign_keys = ON;") (list db))) #:destructor (lambda (db) (db-optimize db database-file) (sqlite-close db)) #:lifetime 500 ;; SQLite doesn't support parallel writes #:parallelism 1 #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "database_write_delay_seconds"))) (lambda (seconds-delayed) (metric-observe delay-metric seconds-delayed) (when (> seconds-delayed 1) (format (current-error-port) "warning: database write delayed by ~1,2f seconds~%" seconds-delayed))))))) (make-database database-file reader-thread-channel writer-thread-channel metrics-registry))) (define (db-optimize db db-filename) (define (wal-size) (let ((db-wal-filename (string-append db-filename "-wal"))) (stat:size (stat db-wal-filename)))) (define MiB (* (expt 2 20) 1.)) (define wal-size-threshold (* 5 MiB)) (when (> (wal-size) wal-size-threshold) (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") (sqlite-exec db " PRAGMA analysis_limit=1000; PRAGMA optimize;"))) (define (database-optimize database) (retry-on-error (lambda () (call-with-worker-thread (database-writer-thread-channel database) (lambda (db) (db-optimize db (database-file database))))) #:times 5 #:delay 5)) (define (database-spawn-fibers database) (spawn-fiber (lambda () (while #t (sleep (* 60 5)) ; 5 minutes (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception when performing WAL checkpoint: ~A\n" exn)) (lambda () (database-optimize database)) #:unwind? #t))) #:parallel? #t)) (define (call-with-time-tracking database thing thunk) (define registry (database-metrics-registry database)) (define metric-name (string-append "database_" thing "_duration_seconds")) (if registry (let* ((metric (or (metrics-registry-fetch-metric registry metric-name) (make-histogram-metric registry metric-name))) (start-time (get-internal-real-time))) (let ((result (thunk))) (metric-observe metric (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)) result)) (thunk))) (define %current-transaction-proc (make-parameter #f)) (define* (database-call-with-transaction database proc #:key readonly?) (define (run-proc-within-transaction db) (if (%current-transaction-proc) (proc db) ; already in transaction (begin (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: sqlite rolling back transaction\n") (sqlite-exec db "ROLLBACK TRANSACTION;") (raise-exception exn)) (lambda () (call-with-values (lambda () (parameterize ((%current-transaction-proc proc)) (proc db))) (lambda vals (sqlite-exec db "COMMIT TRANSACTION;") (apply values vals)))))))) (match (call-with-worker-thread ((if readonly? database-reader-thread-channel database-writer-thread-channel) database) (lambda (db) (let ((start-time (get-internal-real-time))) (call-with-values (lambda () (run-proc-within-transaction db)) (lambda vals (let ((duration-seconds (/ (- (get-internal-real-time) start-time) internal-time-units-per-second))) (when (and (not readonly?) (> duration-seconds 2)) (display (format #f "warning: ~a:\n took ~4f seconds in transaction\n" proc duration-seconds) (current-error-port))) (cons duration-seconds vals))))))) ((duration vals ...) (apply values vals)))) (define (dump-database database name) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (sqlite-exec db (string-append "VACUUM INTO '" name "';"))))) (define (last-insert-rowid db) (let ((statement (sqlite-prepare db "SELECT last_insert_rowid();" #:cache? #t))) (let ((id (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) id))) (define (changes db) (let ((statement (sqlite-prepare db "SELECT changes()" #:cache? #t))) (let ((id (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) id))) (define (tag->tag-id db key value) (define (insert-tag) (let ((statement (sqlite-prepare db " INSERT INTO tags (key, value) VALUES (:key, :value)" #:cache? #t))) (sqlite-bind-arguments statement #:key key #:value value) (sqlite-step statement) (sqlite-reset statement) (last-insert-rowid db))) (let ((statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :key AND value = :value" #:cache? #t))) (sqlite-bind-arguments statement #:key key #:value value) (match (let ((result (sqlite-step statement))) (sqlite-reset statement) result) (#f (insert-tag)) (#(id) id)))) (define* (database-insert-narinfo database narinfo #:key change-datetime (tags '())) (define (insert-narinfo-record db) (let ((statement (sqlite-prepare db " INSERT INTO narinfos ( store_path, nar_hash, nar_size, deriver, system, contents ) VALUES ( :store_path, :nar_hash, :nar_size, :deriver, :system, :contents )" #:cache? #t))) (sqlite-bind-arguments statement #:store_path (narinfo-path narinfo) #:nar_hash (narinfo-hash narinfo) #:nar_size (narinfo-size narinfo) #:deriver (narinfo-deriver narinfo) #:system (narinfo-system narinfo) #:contents (narinfo-contents narinfo)) (sqlite-step statement) (sqlite-reset statement) (last-insert-rowid db))) (define (insert-narinfo-file db narinfo-id size compression url) (let ((statement (sqlite-prepare db " INSERT INTO narinfo_files ( narinfo_id, size, compression, url ) VALUES ( :narinfo_id, :size, :compression, :url )" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id #:size size #:compression compression #:url url) (sqlite-step statement) (sqlite-reset statement))) (define (insert-narinfo-references db narinfo-id store-path) (let ((statement (sqlite-prepare db " INSERT INTO narinfo_references ( narinfo_id, store_path ) VALUES ( :narinfo_id, :store_path )" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id #:store_path store-path) (sqlite-step statement) (sqlite-reset statement))) (define (insert-change db contents) (let ((statement (sqlite-prepare db " INSERT INTO recent_changes ( datetime, change, data ) VALUES ( datetime('now'), 'addition', :contents )" #:cache? #t))) (sqlite-bind-arguments statement #:contents contents) (sqlite-step statement) (sqlite-reset statement))) (define (insert-change-with-datetime db contents datetime) (let ((statement (sqlite-prepare db " INSERT INTO recent_changes ( datetime, change, data ) VALUES ( :datetime, 'addition', :contents )" #:cache? #t))) (sqlite-bind-arguments statement #:contents contents #:datetime datetime) (sqlite-step statement) (sqlite-reset statement))) (define (insert-tags db narinfo-id tags) (let ((statement (sqlite-prepare db " INSERT INTO narinfo_tags (narinfo_id, tag_id) VALUES (:narinfo_id, :tag_id)" #:cache? #t))) (map (match-lambda ((key . value) (let ((tag-id (tag->tag-id db key value))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id #:tag_id tag-id) (sqlite-step statement) (sqlite-reset statement)))) tags))) (database-call-with-transaction database (lambda (db) (let ((narinfo-id (insert-narinfo-record db))) (let ((len (length (narinfo-uris narinfo)))) (for-each insert-narinfo-file (make-list len db) (make-list len narinfo-id) (narinfo-file-sizes narinfo) (narinfo-compressions narinfo) (map uri-path (narinfo-uris narinfo)))) (let ((references (narinfo-references narinfo))) (for-each insert-narinfo-references (make-list (length references) db) (make-list (length references) narinfo-id) references)) (if change-datetime (insert-change-with-datetime db (narinfo-contents narinfo) change-datetime) (insert-change db (narinfo-contents narinfo))) (unless (null? tags) (insert-tags db narinfo-id tags)) narinfo-id)))) (define* (database-remove-narinfo database store-path #:key change-datetime) (define (store-path->narinfo-id db) (let ((statement (sqlite-prepare db " SELECT id FROM narinfos WHERE store_path = :store_path" #:cache? #t))) (sqlite-bind-arguments statement #:store_path store-path) (let ((result (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) result))) (define (remove-narinfo-record db id) (let ((statement (sqlite-prepare db " DELETE FROM narinfos WHERE id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id id) (sqlite-step statement) (sqlite-reset statement))) (define (remove-narinfo-files db narinfo-id) (let ((statement (sqlite-prepare db " DELETE FROM narinfo_files WHERE narinfo_id = :narinfo_id" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id) (sqlite-map (const #t) statement) (sqlite-reset statement))) (define (remove-narinfo-references db narinfo-id) (let ((statement (sqlite-prepare db " DELETE FROM narinfo_references WHERE narinfo_id = :narinfo_id" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id) (sqlite-map (const #t) statement) (sqlite-reset statement))) (define (insert-change db contents) (let ((statement (sqlite-prepare db " INSERT INTO recent_changes ( datetime, change, data ) VALUES ( datetime('now'), 'removal', :store_path )" #:cache? #t))) (sqlite-bind-arguments statement #:store_path store-path) (sqlite-step statement) (sqlite-reset statement))) (define (insert-change-with-datetime db store-path datetime) (let ((statement (sqlite-prepare db " INSERT INTO recent_changes ( datetime, change, data ) VALUES ( :datetime, 'removal', :store_path )" #:cache? #t))) (sqlite-bind-arguments statement #:store_path store-path #:datetime datetime) (sqlite-step statement) (sqlite-reset statement))) (define (remove-tags db narinfo-id) (let ((statement (sqlite-prepare db " DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id" #:cache? #t))) (sqlite-bind-arguments statement #:narinfo_id narinfo-id) (sqlite-map (const #t) statement) (sqlite-reset statement))) (database-call-with-transaction database (lambda (db) (let ((narinfo-id (store-path->narinfo-id db))) (if change-datetime (insert-change-with-datetime db store-path change-datetime) (insert-change db store-path)) (remove-narinfo-files db narinfo-id) (remove-narinfo-references db narinfo-id) (remove-tags db narinfo-id) (remove-narinfo-record db narinfo-id) #t)))) (define (database-select-narinfo-contents-by-hash database hash) (call-with-time-tracking database "select_narinfo_contents_by_hash" (lambda () (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" #:cache? #t))) (sqlite-bind-arguments statement #:hash hash) (match (let ((result (sqlite-step statement))) (sqlite-reset statement) result) (#(contents) contents) (_ #f)))))))) (define* (database-select-recent-changes database after-date #:key (limit 8192)) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT datetime, change, data FROM recent_changes WHERE datetime >= :datetime LIMIT :limit" #:cache? #t))) (sqlite-bind-arguments statement #:datetime after-date #:limit limit) (let loop ((row (sqlite-step statement)) (result '())) (match row (#(datetime change data) (loop (sqlite-step statement) (cons `((datetime . ,datetime) (change . ,change) (data . ,data)) result))) (#f (sqlite-reset statement) (reverse result)))))))) (define (database-select-latest-recent-change-datetime database) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT datetime FROM recent_changes ORDER BY datetime DESC LIMIT 1" #:cache? #t))) (let ((result (match (sqlite-step statement) (#(date) date) (#f #f)))) (sqlite-reset statement) result))))) (define (database-get-recent-changes-id-for-deletion database limit) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id FROM recent_changes ORDER BY datetime DESC LIMIT 1 OFFSET :offset" #:cache? #t))) (sqlite-bind-arguments statement #:offset limit) (let ((result (match (sqlite-step statement) (#(id) id) (#f #f)))) (sqlite-reset statement) result))))) (define (database-delete-recent-changes-with-id-below database id) (call-with-worker-thread (database-writer-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " DELETE FROM recent_changes WHERE id < :id" #:cache? #t))) (sqlite-bind-arguments statement #:id id) (sqlite-step statement) (sqlite-reset statement) (changes db))))) (define (database-select-narinfo-for-file database narinfo-file-url) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT narinfos.store_path, narinfos.nar_hash, narinfos.nar_size, narinfos.deriver, narinfos.system, narinfos.contents FROM narinfos INNER JOIN narinfo_files ON narinfos.id = narinfo_files.narinfo_id WHERE narinfo_files.url = :url" #:cache? #t))) (sqlite-bind-arguments statement #:url narinfo-file-url) (match (sqlite-step statement) (#(store-path nar-hash nar-size deriver system contents) (sqlite-reset statement) `((store-path . ,store-path) (nar-hash . ,nar-hash) (nar-size . ,nar-size) (deriver . ,deriver) (system . ,system) (contents . ,contents)))))))) (define (database-select-narinfo-files database hash) (call-with-time-tracking database "select_narinfo_files" (lambda () (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url FROM narinfos INNER JOIN narinfo_files ON narinfos.id = narinfo_files.narinfo_id WHERE substr(narinfos.store_path, 12, 32) = :hash" #:cache? #t))) (sqlite-bind-arguments statement #:hash hash) (let ((result (sqlite-map (match-lambda (#(size compression url) `((size . ,size) (compression . ,compression) (url . ,url)))) statement))) (sqlite-reset statement) result))))))) (define (database-map-all-narinfo-files database proc) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT size, compression, url FROM narinfo_files" #:cache? #t))) (let ((result-list (sqlite-fold (lambda (row result) (match row (#(size compression url) (cons (proc `((size . ,size) (compression . ,compression) (url . ,url))) result)))) '() statement))) (sqlite-reset statement) result-list)))))