;;; Guix QA Frontpage ;;; ;;; Copyright © 2021, 2022 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (guix-qa-frontpage database) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (web uri) #:use-module (sqlite3) #:use-module (fibers) #:use-module (prometheus) #:use-module (guix narinfo) #:use-module (guix derivations) #:use-module ((guix-build-coordinator utils) #:select (log-delay call-with-delay-logging)) #:use-module ((guix-build-coordinator utils fibers) #:select (retry-on-error make-worker-thread-channel call-with-worker-thread make-queueing-channel)) #:use-module (guix-qa-frontpage guix-data-service) #:export (setup-database database-optimize database-spawn-fibers database-call-with-transaction clear-sqlite-cache-entry with-sqlite-cache insert-into-builds-to-cancel-later delete-from-builds-to-cancel-later select-from-builds-to-cancel-later insert-create-branch-for-issue-log select-create-branch-for-issue-log delete-create-branch-for-issue-log)) (define-record-type (make-database database-file reader-thread-channel writer-thread-channel metrics-registry) database? (database-file database-file) (reader-thread-channel database-reader-thread-channel) (writer-thread-channel database-writer-thread-channel set-database-writer-thread-channel!) (metrics-registry database-metrics-registry)) (define* (db-open database #:key (write? #t)) (define flags `(,@(if write? (list SQLITE_OPEN_READWRITE SQLITE_OPEN_CREATE) (list SQLITE_OPEN_READONLY)) ,SQLITE_OPEN_NOMUTEX)) (sqlite-open database (apply logior flags))) (define (perform-initial-database-setup db) (define schema " CREATE TABLE cache ( key TEXT NOT NULL, timestamp INTEGER NOT NULL, data TEXT NOT NULL ); CREATE UNIQUE INDEX IF NOT EXISTS cache_idx ON cache (key); CREATE TABLE IF NOT EXISTS builds_to_cancel_later ( category_name TEXT NOT NULL, category_value TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs ( issue TEXT NOT NULL, log TEXT NOT NULL );") (sqlite-exec db schema)) (define (update-schema db) (let ((statement (sqlite-prepare db " SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) (sqlite-bind-arguments statement #:name "cache") (match (sqlite-step statement) (#f (perform-initial-database-setup db)) (_ (sqlite-exec db " CREATE TABLE IF NOT EXISTS builds_to_cancel_later ( category_name TEXT NOT NULL, category_value TEXT NOT NULL ); CREATE UNIQUE INDEX IF NOT EXISTS builds_to_cancel_later_unique ON builds_to_cancel_later (category_name, category_value); CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs ( issue TEXT NOT NULL, log TEXT NOT NULL );"))) (sqlite-finalize statement))) (define (setup-database database-file metrics-registry) (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA optimize;") (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") (update-schema db) (sqlite-close db)) (let ((reader-thread-channel (make-worker-thread-channel (lambda () (let ((db (db-open database-file #:write? #f))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (list db))) #:destructor (lambda (db) (sqlite-close db)) #:lifetime 50000 #:name "db read" #:parallelism (min (max (current-processor-count) 32) 128) #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "datastore_read_delay_seconds"))) (lambda (seconds-delayed) (metric-observe delay-metric ;; TODO exact->inexact to work around ;; a bug in guile-prometheus where ;; the metric sum will output in the ;; exact form including the /q (exact->inexact seconds-delayed)) (log-delay "datastore read" seconds-delayed) (when (> seconds-delayed 1) (format (current-error-port) "warning: database read delayed by ~1,2f seconds~%" seconds-delayed)))) #:log-exception? (lambda (exn) (not (guix-data-service-error? exn))))) (writer-thread-channel (make-worker-thread-channel (lambda () (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (sqlite-exec db "PRAGMA foreign_keys = ON;") (list db))) #:destructor (lambda (db) (db-optimize db database-file) (sqlite-close db)) #:lifetime 500 #:name "db write" ;; SQLite doesn't support parallel writes #:parallelism 1 #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "datastore_write_delay_seconds"))) (lambda (seconds-delayed) (metric-observe delay-metric ;; TODO exact->inexact to work around ;; a bug in guile-prometheus where ;; the metric sum will output in the ;; exact form including the /q (exact->inexact seconds-delayed)) (log-delay "datastore write" seconds-delayed) (when (> seconds-delayed 1) (format (current-error-port) "warning: database write delayed by ~1,2f seconds~%" seconds-delayed))))))) (make-database database-file reader-thread-channel writer-thread-channel metrics-registry))) (define (db-optimize db db-filename) (define (wal-size) (let ((db-wal-filename (string-append db-filename "-wal"))) (stat:size (stat db-wal-filename)))) (define MiB (* (expt 2 20) 1.)) (define wal-size-threshold (* 5 MiB)) (when (> (wal-size) wal-size-threshold) (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") (sqlite-exec db " PRAGMA analysis_limit=1000; PRAGMA optimize;"))) (define (database-optimize database) (retry-on-error (lambda () (call-with-worker-thread (database-writer-thread-channel database) (lambda (db) (db-optimize db (database-file database))))) #:times 5 #:delay 5)) (define (database-spawn-fibers database) ;; Queue messages to the writer thread, so that they're handled in a first ;; come first served manor (set-database-writer-thread-channel! database (make-queueing-channel (database-writer-thread-channel database))) (spawn-fiber (lambda () (while #t (sleep (* 60 5)) ; 5 minutes (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception when performing WAL checkpoint: ~A\n" exn)) (lambda () (database-optimize database)) #:unwind? #t))) #:parallel? #t)) (define (call-with-time-tracking database thing thunk) (define registry (database-metrics-registry database)) (define metric-name (string-append "database_" thing "_duration_seconds")) (if registry (let* ((metric (or (metrics-registry-fetch-metric registry metric-name) (make-histogram-metric registry metric-name))) (start-time (get-internal-real-time))) (let ((result (thunk))) (metric-observe metric (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)) result)) (thunk))) (define %current-transaction-proc (make-parameter #f)) (define* (database-call-with-transaction database proc #:key readonly?) (define (run-proc-within-transaction db) (if (%current-transaction-proc) (proc db) ; already in transaction (begin (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: sqlite rolling back transaction\n") (sqlite-exec db "ROLLBACK TRANSACTION;") (raise-exception exn)) (lambda () (call-with-values (lambda () (parameterize ((%current-transaction-proc proc)) (proc db))) (lambda vals (sqlite-exec db "COMMIT TRANSACTION;") (apply values vals)))) #:unwind? #t)))) (match (call-with-worker-thread ((if readonly? database-reader-thread-channel database-writer-thread-channel) database) (lambda (db) (let ((start-time (get-internal-real-time))) (call-with-values (lambda () (run-proc-within-transaction db)) (lambda vals (let ((duration-seconds (/ (- (get-internal-real-time) start-time) internal-time-units-per-second))) (when (and (not readonly?) (> duration-seconds 2)) (display (format #f "warning: ~a:\n took ~4f seconds in transaction\n" proc duration-seconds) (current-error-port))) (cons duration-seconds vals))))))) ((duration vals ...) (apply values vals)))) (define (last-insert-rowid db) (let ((statement (sqlite-prepare db "SELECT last_insert_rowid();" #:cache? #t))) (let ((id (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) id))) (define (changes db) (let ((statement (sqlite-prepare db "SELECT changes()" #:cache? #t))) (let ((id (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) id))) (define* (clear-sqlite-cache-entry database key #:key (args '()) (version 1)) (define string-key (call-with-output-string (lambda (port) (write key port) (display "|" port) (write version port) (display ": " port) (write args port)))) (database-call-with-transaction database (lambda (db) (let ((cleanup-statement (sqlite-prepare db " DELETE FROM cache WHERE key = :key" #:cache? #t))) (sqlite-bind-arguments cleanup-statement #:key string-key) (sqlite-step cleanup-statement) (sqlite-reset cleanup-statement))))) (define* (with-sqlite-cache database key proc #:key (args '()) (version 1) ttl (store-computed-value? #t)) (define string-key (call-with-output-string (lambda (port) (write key port) (display "|" port) (write version port) (display ": " port) (write args port)))) (unless (number? ttl) (error "must specify a ttl")) (let ((cached-values (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT data, timestamp FROM cache WHERE key = :key" #:cache? #t))) (sqlite-bind-arguments statement #:key string-key) (let ((result (sqlite-step statement))) (sqlite-reset statement) (match result (#f 'noval) (#(data timestamp) (if (<= (+ timestamp ttl) (time-second (current-time))) 'noval (call-with-input-string data read)))))))))) ;; Don't log cache misses for now (when (and #f (eq? cached-values 'noval)) (simple-format (current-error-port) "cache miss: ~A\n" string-key)) (if (eq? cached-values 'noval) (call-with-values (lambda () (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (call-with-delay-logging proc #:args args)))) (lambda vals (when (if (procedure? store-computed-value?) (apply store-computed-value? vals) store-computed-value?) (let ((vals-string (call-with-output-string (lambda (port) (write vals port))))) (database-call-with-transaction database (lambda (db) (let ((cleanup-statement (sqlite-prepare db " DELETE FROM cache WHERE key = :key" #:cache? #t)) (insert-statement (sqlite-prepare db " INSERT INTO cache (key, timestamp, data) VALUES (:key, :timestamp, :data)" #:cache? #t))) (sqlite-bind-arguments cleanup-statement #:key string-key) (sqlite-step cleanup-statement) (sqlite-reset cleanup-statement) (sqlite-bind-arguments insert-statement #:key string-key #:timestamp (time-second (current-time)) #:data vals-string) (sqlite-step insert-statement) (sqlite-reset insert-statement)))))) (apply values vals))) (apply values cached-values)))) (define (insert-into-builds-to-cancel-later database category-name category-value) (database-call-with-transaction database (lambda (db) (let ((statement (sqlite-prepare db " INSERT INTO builds_to_cancel_later (category_name, category_value) VALUES (:name, :value) ON CONFLICT DO NOTHING" #:cache? #t))) (sqlite-bind-arguments statement #:name category-name #:value category-value) (sqlite-step statement) (sqlite-reset statement)))) #t) (define (delete-from-builds-to-cancel-later database category-name category-value) (database-call-with-transaction database (lambda (db) (let ((statement (sqlite-prepare db " DELETE FROM builds_to_cancel_later WHERE category_name = :name AND category_value = :value" #:cache? #t))) (sqlite-bind-arguments statement #:name category-name #:value category-value) (sqlite-step statement) (sqlite-reset statement)))) #t) (define (select-from-builds-to-cancel-later database category-name) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT category_value FROM builds_to_cancel_later WHERE category_name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name category-name) (let ((result (sqlite-map (match-lambda (#(val) val)) statement))) (sqlite-reset statement) result))))) (define (insert-create-branch-for-issue-log database issue log) (database-call-with-transaction database (lambda (db) (let ((delete-statement (sqlite-prepare db " DELETE FROM create_branch_for_issue_logs WHERE issue = :issue" #:cache? #t)) (insert-statement (sqlite-prepare db " INSERT INTO create_branch_for_issue_logs (issue, log) VALUES (:issue, :log)" #:cache? #t))) (sqlite-bind-arguments delete-statement #:issue issue) (sqlite-step delete-statement) (sqlite-reset delete-statement) (sqlite-bind-arguments insert-statement #:issue issue #:log log) (sqlite-step insert-statement) (sqlite-reset insert-statement))))) (define (select-create-branch-for-issue-log database issue) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) (let ((statement (sqlite-prepare db " SELECT log FROM create_branch_for_issue_logs WHERE issue = :issue" #:cache? #t))) (sqlite-bind-arguments statement #:issue issue) (let ((result (match (sqlite-step statement) (#(log) log) (#f #f)))) (sqlite-reset statement) result))))) (define (delete-create-branch-for-issue-log database issue) (database-call-with-transaction database (lambda (db) (let ((statement (sqlite-prepare db " DELETE FROM create_branch_for_issue_logs WHERE issue = :issue" #:cache? #t))) (sqlite-bind-arguments statement #:issue issue) (sqlite-step statement) (sqlite-reset statement)))) #t)