diff options
author | Christopher Baines <mail@cbaines.net> | 2021-12-11 10:27:24 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2021-12-12 16:35:38 +0000 |
commit | f9ff69e1c79f024ed188ad51642cca443aedfee2 (patch) | |
tree | 609b37ff8d6fc3d557d339a67ba6641522b0a977 /nar-herder/database.scm | |
parent | 7e280ca951e8ffa7c86224843075e65266911617 (diff) | |
download | nar-herder-f9ff69e1c79f024ed188ad51642cca443aedfee2.tar nar-herder-f9ff69e1c79f024ed188ad51642cca443aedfee2.tar.gz |
Get most of the functionality sort of working
At least working enough to start trying this out, and finding the
problems.
Diffstat (limited to 'nar-herder/database.scm')
-rw-r--r-- | nar-herder/database.scm | 532 |
1 files changed, 532 insertions, 0 deletions
diff --git a/nar-herder/database.scm b/nar-herder/database.scm new file mode 100644 index 0000000..9cb04e2 --- /dev/null +++ b/nar-herder/database.scm @@ -0,0 +1,532 @@ +;;; Nar Herder +;;; +;;; Copyright © 2021 Christopher Baines <mail@cbaines.net> +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; <http://www.gnu.org/licenses/>. + +(define-module (nar-herder database) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) + #:use-module (srfi srfi-19) + #:use-module (ice-9 match) + #:use-module (ice-9 format) + #:use-module (ice-9 threads) + #:use-module (ice-9 exceptions) + #:use-module (web uri) + #:use-module (sqlite3) + #:use-module (fibers) + #:use-module (guix narinfo) + #:use-module (guix derivations) + #:use-module (nar-herder utils) + #:export (setup-database + + database-optimize + database-spawn-fibers + + database-call-with-transaction + + dump-database + + database-insert-narinfo + database-select-narinfo-contents-by-hash + + database-select-recent-changes + database-select-latest-recent-change-datetime + + database-select-narinfo-files + + database-map-all-narinfo-files)) + +(define-record-type <database> + (make-database database-file reader-thread-channel writer-thread-channel) + database? + (database-file database-file) + (reader-thread-channel database-reader-thread-channel) + (writer-thread-channel database-writer-thread-channel)) + +(define* (db-open database + #:key (write? #t)) + (define flags + `(,@(if write? + (list SQLITE_OPEN_READWRITE + SQLITE_OPEN_CREATE) + + (list SQLITE_OPEN_READONLY)) + ,SQLITE_OPEN_NOMUTEX)) + + (sqlite-open database (apply logior flags))) + +(define (perform-initial-database-setup db) + (define schema + " +CREATE TABLE narinfos ( + id INTEGER PRIMARY KEY ASC, + store_path TEXT NOT NULL, + nar_hash TEXT NOT NULL, + nar_size INTEGER NOT NULL, + deriver TEXT, + system TEXT, + contents NOT NULL +); + +CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32)); + +CREATE TABLE narinfo_files ( + narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), + size INTEGER NOT NULL, + compression TEXT, + url TEXT NOT NULL +); + +CREATE TABLE narinfo_references ( + narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), + store_path TEXT NOT NULL +); + +CREATE TABLE tags ( + id INTEGER PRIMARY KEY ASC, + key TEXT NOT NULL, + value TEXT NOT NULL +); + +CREATE UNIQUE INDEX tags_index ON tags (key, value); + +CREATE TABLE narinfo_tags ( + narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), + tag_id INTEGER NOT NULL REFERENCES tags (id) +); + +CREATE TABLE recent_changes ( + id INTEGER PRIMARY KEY ASC, + datetime TEXT NOT NULL, + change TEXT NOT NULl, + data TEXT NOT NULL +);") + + (sqlite-exec db schema)) + +(define (update-schema db) + (let ((statement + (sqlite-prepare + db + " +SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) + + (sqlite-bind-arguments + statement + #:name "narinfos") + + (match (sqlite-step statement) + (#f (perform-initial-database-setup db)) + (_ #f)) + + (sqlite-finalize statement))) + +(define (setup-database database-file) + (let ((db (db-open database-file))) + (sqlite-exec db "PRAGMA journal_mode=WAL;") + (sqlite-exec db "PRAGMA optimize;") + (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") + + (update-schema db) + + (sqlite-close db)) + + (let ((reader-thread-channel + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file #:write? #f))) + (sqlite-exec db "PRAGMA busy_timeout = 5000;") + (list db))) + #:destructor + (lambda (db) + (sqlite-close db)) + #:lifetime 50000 + + ;; Use a minimum of 2 and a maximum of 8 threads + #:parallelism + (min (max (current-processor-count) + 2) + 8))) + + (writer-thread-channel + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file))) + (sqlite-exec db "PRAGMA busy_timeout = 5000;") + (sqlite-exec db "PRAGMA foreign_keys = ON;") + (list db))) + #:destructor + (lambda (db) + (db-optimize db + database-file) + + (sqlite-close db)) + #:lifetime 500 + + ;; SQLite doesn't support parallel writes + #:parallelism 1))) + + (make-database database-file + reader-thread-channel + writer-thread-channel))) + +(define (db-optimize db db-filename) + (define (wal-size) + (let ((db-wal-filename + (string-append db-filename "-wal"))) + + (stat:size (stat db-wal-filename)))) + + (define MiB (* (expt 2 20) 1.)) + (define wal-size-threshold + (* 5 MiB)) + + (when (> (wal-size) wal-size-threshold) + (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") + + (sqlite-exec db + " +PRAGMA analysis_limit=1000; +PRAGMA optimize;"))) + +(define (database-optimize database) + (retry-on-error + (lambda () + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (db-optimize + db + (database-file database))))) + #:times 5 + #:delay 5)) + +(define (database-spawn-fibers database) + (spawn-fiber + (lambda () + (while #t + (sleep (* 60 5)) ; 5 minutes + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "exception when performing WAL checkpoint: ~A\n" + exn)) + (lambda () + (database-optimize database)) + #:unwind? #t))) + #:parallel? #t)) + +(define %current-transaction-proc + (make-parameter #f)) + +(define* (database-call-with-transaction database proc + #:key + readonly?) + (define (run-proc-within-transaction db) + (if (%current-transaction-proc) + (proc db) ; already in transaction + (begin + (sqlite-exec db "BEGIN TRANSACTION;") + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "error: sqlite rolling back transaction\n") + (sqlite-exec db "ROLLBACK TRANSACTION;") + (raise-exception exn)) + (lambda () + (call-with-values + (lambda () + (parameterize ((%current-transaction-proc proc)) + (proc db))) + (lambda vals + (sqlite-exec db "COMMIT TRANSACTION;") + (apply values vals)))))))) + + (match (call-with-worker-thread + ((if readonly? + database-reader-thread-channel + database-writer-thread-channel) + database) + (lambda (db) + (let ((start-time (get-internal-real-time))) + (call-with-values + (lambda () + (run-proc-within-transaction db)) + (lambda vals + (let ((duration-seconds + (/ (- (get-internal-real-time) start-time) + internal-time-units-per-second))) + (when (and (not readonly?) + (> duration-seconds 2)) + (display + (format + #f + "warning: ~a:\n took ~4f seconds in transaction\n" + proc + duration-seconds) + (current-error-port))) + + (cons duration-seconds vals))))))) + ((duration vals ...) + (apply values vals)))) + +(define (dump-database database name) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (sqlite-exec + db + (string-append "VACUUM INTO '" name "';"))))) + +(define (last-insert-rowid db) + (let ((statement + (sqlite-prepare + db + "SELECT last_insert_rowid();" + #:cache? #t))) + (let ((id + (vector-ref (sqlite-step statement) + 0))) + + (sqlite-reset statement) + + id))) + +(define (database-insert-narinfo database narinfo) + (define (insert-narinfo-record db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO narinfos ( + store_path, nar_hash, nar_size, deriver, system, contents +) VALUES ( + :store_path, :nar_hash, :nar_size, :deriver, :system, :contents +)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:store_path (narinfo-path narinfo) + #:nar_hash (narinfo-hash narinfo) + #:nar_size (narinfo-size narinfo) + #:deriver (narinfo-deriver narinfo) + #:system (narinfo-system narinfo) + #:contents (narinfo-contents narinfo)) + + (sqlite-step statement) + (sqlite-reset statement) + + (last-insert-rowid db))) + + (define (insert-narinfo-file db narinfo-id size compression url) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO narinfo_files ( + narinfo_id, size, compression, url +) VALUES ( + :narinfo_id, :size, :compression, :url +)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:size size + #:compression compression + #:url url) + + (sqlite-step statement) + (sqlite-reset statement))) + + (define (insert-narinfo-references db narinfo-id store-path) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO narinfo_references ( + narinfo_id, store_path +) VALUES ( + :narinfo_id, :store_path +)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:store_path store-path) + + (sqlite-step statement) + (sqlite-reset statement))) + + (define (insert-change db contents) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO recent_changes ( + datetime, change, data +) VALUES ( + datetime('now'), 'addition', :contents +)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:contents contents) + + (sqlite-step statement) + (sqlite-reset statement))) + + (database-call-with-transaction + database + (lambda (db) + (let ((narinfo-id (insert-narinfo-record db))) + (let ((len (length (narinfo-uris narinfo)))) + (for-each insert-narinfo-file + (make-list len db) + (make-list len narinfo-id) + (narinfo-file-sizes narinfo) + (narinfo-compressions narinfo) + (map uri-path (narinfo-uris narinfo)))) + + (let ((references (narinfo-references narinfo))) + (for-each insert-narinfo-references + (make-list (length references) db) + (make-list (length references) narinfo-id) + references)) + + (insert-change db (narinfo-contents narinfo)) + + narinfo-id)))) + +(define (database-select-narinfo-contents-by-hash database hash) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:hash hash) + + (match (let ((result (sqlite-step statement))) + (sqlite-reset statement) + result) + (#(contents) contents) + (_ #f)))))) + +(define (database-select-recent-changes database after-date) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT datetime, change, data FROM recent_changes WHERE datetime >= :datetime" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:datetime after-date) + + (let loop ((row (sqlite-step statement)) + (result '())) + (match row + (#(datetime change data) + (loop (sqlite-step statement) + (cons `((datetime . ,datetime) + (change . ,change) + (data . ,data)) + result))) + (#f + (sqlite-reset statement) + + (reverse result)))))))) + +(define (database-select-latest-recent-change-datetime database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT datetime FROM recent_changes ORDER BY datetime ASC LIMIT 1" + #:cache? #t))) + + (let ((result (match (sqlite-step statement) + (#(date) date) + (#f #f)))) + (sqlite-reset statement) + + result))))) + +(define (database-select-narinfo-files database hash) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url +FROM narinfos +INNER JOIN narinfo_files + ON narinfos.id = narinfo_files.narinfo_id +WHERE substr(narinfos.store_path, 12, 32) = :hash" + #:cache? #t))) + (let ((result + (sqlite-map + (match-lambda + (#(size compression url) + `((size . ,size) + (compression . ,compression) + (url . ,url)))) + statement))) + (sqlite-reset statement) + + result))))) + +(define (database-map-all-narinfo-files database proc) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT size, compression, url +FROM narinfo_files" + #:cache? #t))) + (let ((result-list + (sqlite-fold + (lambda (row result) + (match row + (#(size compression url) + (cons (proc `((size . ,size) + (compression . ,compression) + (url . ,url))) + result)))) + '() + statement))) + (sqlite-reset statement) + + result-list))))) |