aboutsummaryrefslogtreecommitdiff
path: root/nar-herder/database.scm
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2021-12-11 10:27:24 +0000
committerChristopher Baines <mail@cbaines.net>2021-12-12 16:35:38 +0000
commitf9ff69e1c79f024ed188ad51642cca443aedfee2 (patch)
tree609b37ff8d6fc3d557d339a67ba6641522b0a977 /nar-herder/database.scm
parent7e280ca951e8ffa7c86224843075e65266911617 (diff)
downloadnar-herder-f9ff69e1c79f024ed188ad51642cca443aedfee2.tar
nar-herder-f9ff69e1c79f024ed188ad51642cca443aedfee2.tar.gz
Get most of the functionality sort of working
At least working enough to start trying this out, and finding the problems.
Diffstat (limited to 'nar-herder/database.scm')
-rw-r--r--nar-herder/database.scm532
1 files changed, 532 insertions, 0 deletions
diff --git a/nar-herder/database.scm b/nar-herder/database.scm
new file mode 100644
index 0000000..9cb04e2
--- /dev/null
+++ b/nar-herder/database.scm
@@ -0,0 +1,532 @@
+;;; Nar Herder
+;;;
+;;; Copyright © 2021 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (nar-herder database)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
+ #:use-module (srfi srfi-19)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 format)
+ #:use-module (ice-9 threads)
+ #:use-module (ice-9 exceptions)
+ #:use-module (web uri)
+ #:use-module (sqlite3)
+ #:use-module (fibers)
+ #:use-module (guix narinfo)
+ #:use-module (guix derivations)
+ #:use-module (nar-herder utils)
+ #:export (setup-database
+
+ database-optimize
+ database-spawn-fibers
+
+ database-call-with-transaction
+
+ dump-database
+
+ database-insert-narinfo
+ database-select-narinfo-contents-by-hash
+
+ database-select-recent-changes
+ database-select-latest-recent-change-datetime
+
+ database-select-narinfo-files
+
+ database-map-all-narinfo-files))
+
+(define-record-type <database>
+ (make-database database-file reader-thread-channel writer-thread-channel)
+ database?
+ (database-file database-file)
+ (reader-thread-channel database-reader-thread-channel)
+ (writer-thread-channel database-writer-thread-channel))
+
+(define* (db-open database
+ #:key (write? #t))
+ (define flags
+ `(,@(if write?
+ (list SQLITE_OPEN_READWRITE
+ SQLITE_OPEN_CREATE)
+
+ (list SQLITE_OPEN_READONLY))
+ ,SQLITE_OPEN_NOMUTEX))
+
+ (sqlite-open database (apply logior flags)))
+
+(define (perform-initial-database-setup db)
+ (define schema
+ "
+CREATE TABLE narinfos (
+ id INTEGER PRIMARY KEY ASC,
+ store_path TEXT NOT NULL,
+ nar_hash TEXT NOT NULL,
+ nar_size INTEGER NOT NULL,
+ deriver TEXT,
+ system TEXT,
+ contents NOT NULL
+);
+
+CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32));
+
+CREATE TABLE narinfo_files (
+ narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
+ size INTEGER NOT NULL,
+ compression TEXT,
+ url TEXT NOT NULL
+);
+
+CREATE TABLE narinfo_references (
+ narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
+ store_path TEXT NOT NULL
+);
+
+CREATE TABLE tags (
+ id INTEGER PRIMARY KEY ASC,
+ key TEXT NOT NULL,
+ value TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX tags_index ON tags (key, value);
+
+CREATE TABLE narinfo_tags (
+ narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
+ tag_id INTEGER NOT NULL REFERENCES tags (id)
+);
+
+CREATE TABLE recent_changes (
+ id INTEGER PRIMARY KEY ASC,
+ datetime TEXT NOT NULL,
+ change TEXT NOT NULl,
+ data TEXT NOT NULL
+);")
+
+ (sqlite-exec db schema))
+
+(define (update-schema db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))
+
+ (sqlite-bind-arguments
+ statement
+ #:name "narinfos")
+
+ (match (sqlite-step statement)
+ (#f (perform-initial-database-setup db))
+ (_ #f))
+
+ (sqlite-finalize statement)))
+
+(define (setup-database database-file)
+ (let ((db (db-open database-file)))
+ (sqlite-exec db "PRAGMA journal_mode=WAL;")
+ (sqlite-exec db "PRAGMA optimize;")
+ (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")
+
+ (update-schema db)
+
+ (sqlite-close db))
+
+ (let ((reader-thread-channel
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file #:write? #f)))
+ (sqlite-exec db "PRAGMA busy_timeout = 5000;")
+ (list db)))
+ #:destructor
+ (lambda (db)
+ (sqlite-close db))
+ #:lifetime 50000
+
+ ;; Use a minimum of 2 and a maximum of 8 threads
+ #:parallelism
+ (min (max (current-processor-count)
+ 2)
+ 8)))
+
+ (writer-thread-channel
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file)))
+ (sqlite-exec db "PRAGMA busy_timeout = 5000;")
+ (sqlite-exec db "PRAGMA foreign_keys = ON;")
+ (list db)))
+ #:destructor
+ (lambda (db)
+ (db-optimize db
+ database-file)
+
+ (sqlite-close db))
+ #:lifetime 500
+
+ ;; SQLite doesn't support parallel writes
+ #:parallelism 1)))
+
+ (make-database database-file
+ reader-thread-channel
+ writer-thread-channel)))
+
+(define (db-optimize db db-filename)
+ (define (wal-size)
+ (let ((db-wal-filename
+ (string-append db-filename "-wal")))
+
+ (stat:size (stat db-wal-filename))))
+
+ (define MiB (* (expt 2 20) 1.))
+ (define wal-size-threshold
+ (* 5 MiB))
+
+ (when (> (wal-size) wal-size-threshold)
+ (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")
+
+ (sqlite-exec db
+ "
+PRAGMA analysis_limit=1000;
+PRAGMA optimize;")))
+
+(define (database-optimize database)
+ (retry-on-error
+ (lambda ()
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (db-optimize
+ db
+ (database-file database)))))
+ #:times 5
+ #:delay 5))
+
+(define (database-spawn-fibers database)
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (sleep (* 60 5)) ; 5 minutes
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "exception when performing WAL checkpoint: ~A\n"
+ exn))
+ (lambda ()
+ (database-optimize database))
+ #:unwind? #t)))
+ #:parallel? #t))
+
+(define %current-transaction-proc
+ (make-parameter #f))
+
+(define* (database-call-with-transaction database proc
+ #:key
+ readonly?)
+ (define (run-proc-within-transaction db)
+ (if (%current-transaction-proc)
+ (proc db) ; already in transaction
+ (begin
+ (sqlite-exec db "BEGIN TRANSACTION;")
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "error: sqlite rolling back transaction\n")
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (raise-exception exn))
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (parameterize ((%current-transaction-proc proc))
+ (proc db)))
+ (lambda vals
+ (sqlite-exec db "COMMIT TRANSACTION;")
+ (apply values vals))))))))
+
+ (match (call-with-worker-thread
+ ((if readonly?
+ database-reader-thread-channel
+ database-writer-thread-channel)
+ database)
+ (lambda (db)
+ (let ((start-time (get-internal-real-time)))
+ (call-with-values
+ (lambda ()
+ (run-proc-within-transaction db))
+ (lambda vals
+ (let ((duration-seconds
+ (/ (- (get-internal-real-time) start-time)
+ internal-time-units-per-second)))
+ (when (and (not readonly?)
+ (> duration-seconds 2))
+ (display
+ (format
+ #f
+ "warning: ~a:\n took ~4f seconds in transaction\n"
+ proc
+ duration-seconds)
+ (current-error-port)))
+
+ (cons duration-seconds vals)))))))
+ ((duration vals ...)
+ (apply values vals))))
+
+(define (dump-database database name)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (sqlite-exec
+ db
+ (string-append "VACUUM INTO '" name "';")))))
+
+(define (last-insert-rowid db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "SELECT last_insert_rowid();"
+ #:cache? #t)))
+ (let ((id
+ (vector-ref (sqlite-step statement)
+ 0)))
+
+ (sqlite-reset statement)
+
+ id)))
+
+(define (database-insert-narinfo database narinfo)
+ (define (insert-narinfo-record db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO narinfos (
+ store_path, nar_hash, nar_size, deriver, system, contents
+) VALUES (
+ :store_path, :nar_hash, :nar_size, :deriver, :system, :contents
+)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:store_path (narinfo-path narinfo)
+ #:nar_hash (narinfo-hash narinfo)
+ #:nar_size (narinfo-size narinfo)
+ #:deriver (narinfo-deriver narinfo)
+ #:system (narinfo-system narinfo)
+ #:contents (narinfo-contents narinfo))
+
+ (sqlite-step statement)
+ (sqlite-reset statement)
+
+ (last-insert-rowid db)))
+
+ (define (insert-narinfo-file db narinfo-id size compression url)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO narinfo_files (
+ narinfo_id, size, compression, url
+) VALUES (
+ :narinfo_id, :size, :compression, :url
+)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:size size
+ #:compression compression
+ #:url url)
+
+ (sqlite-step statement)
+ (sqlite-reset statement)))
+
+ (define (insert-narinfo-references db narinfo-id store-path)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO narinfo_references (
+ narinfo_id, store_path
+) VALUES (
+ :narinfo_id, :store_path
+)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:store_path store-path)
+
+ (sqlite-step statement)
+ (sqlite-reset statement)))
+
+ (define (insert-change db contents)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO recent_changes (
+ datetime, change, data
+) VALUES (
+ datetime('now'), 'addition', :contents
+)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:contents contents)
+
+ (sqlite-step statement)
+ (sqlite-reset statement)))
+
+ (database-call-with-transaction
+ database
+ (lambda (db)
+ (let ((narinfo-id (insert-narinfo-record db)))
+ (let ((len (length (narinfo-uris narinfo))))
+ (for-each insert-narinfo-file
+ (make-list len db)
+ (make-list len narinfo-id)
+ (narinfo-file-sizes narinfo)
+ (narinfo-compressions narinfo)
+ (map uri-path (narinfo-uris narinfo))))
+
+ (let ((references (narinfo-references narinfo)))
+ (for-each insert-narinfo-references
+ (make-list (length references) db)
+ (make-list (length references) narinfo-id)
+ references))
+
+ (insert-change db (narinfo-contents narinfo))
+
+ narinfo-id))))
+
+(define (database-select-narinfo-contents-by-hash database hash)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:hash hash)
+
+ (match (let ((result (sqlite-step statement)))
+ (sqlite-reset statement)
+ result)
+ (#(contents) contents)
+ (_ #f))))))
+
+(define (database-select-recent-changes database after-date)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT datetime, change, data FROM recent_changes WHERE datetime >= :datetime"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:datetime after-date)
+
+ (let loop ((row (sqlite-step statement))
+ (result '()))
+ (match row
+ (#(datetime change data)
+ (loop (sqlite-step statement)
+ (cons `((datetime . ,datetime)
+ (change . ,change)
+ (data . ,data))
+ result)))
+ (#f
+ (sqlite-reset statement)
+
+ (reverse result))))))))
+
+(define (database-select-latest-recent-change-datetime database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT datetime FROM recent_changes ORDER BY datetime ASC LIMIT 1"
+ #:cache? #t)))
+
+ (let ((result (match (sqlite-step statement)
+ (#(date) date)
+ (#f #f))))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define (database-select-narinfo-files database hash)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url
+FROM narinfos
+INNER JOIN narinfo_files
+ ON narinfos.id = narinfo_files.narinfo_id
+WHERE substr(narinfos.store_path, 12, 32) = :hash"
+ #:cache? #t)))
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(size compression url)
+ `((size . ,size)
+ (compression . ,compression)
+ (url . ,url))))
+ statement)))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define (database-map-all-narinfo-files database proc)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT size, compression, url
+FROM narinfo_files"
+ #:cache? #t)))
+ (let ((result-list
+ (sqlite-fold
+ (lambda (row result)
+ (match row
+ (#(size compression url)
+ (cons (proc `((size . ,size)
+ (compression . ,compression)
+ (url . ,url)))
+ result))))
+ '()
+ statement)))
+ (sqlite-reset statement)
+
+ result-list)))))