diff options
Diffstat (limited to 'guix-qa-frontpage/database.scm')
-rw-r--r-- | guix-qa-frontpage/database.scm | 162 |
1 files changed, 93 insertions, 69 deletions
diff --git a/guix-qa-frontpage/database.scm b/guix-qa-frontpage/database.scm index 5649c36..06ce3bd 100644 --- a/guix-qa-frontpage/database.scm +++ b/guix-qa-frontpage/database.scm @@ -28,15 +28,15 @@ #:use-module (sqlite3) #:use-module (fibers) #:use-module (prometheus) + #:use-module (knots queue) + #:use-module (knots thread-pool) #:use-module (guix narinfo) #:use-module (guix derivations) #:use-module ((guix-build-coordinator utils) #:select (log-delay call-with-delay-logging)) #:use-module ((guix-build-coordinator utils fibers) - #:select (retry-on-error - make-worker-thread-channel - call-with-worker-thread)) + #:select (retry-on-error)) #:use-module (guix-qa-frontpage guix-data-service) #:export (setup-database @@ -57,13 +57,16 @@ delete-create-branch-for-issue-log)) (define-record-type <database> - (make-database database-file reader-thread-channel writer-thread-channel + (make-database database-file reader-thread-set writer-thread-set + writer-thread-set-channel metrics-registry) database? (database-file database-file) - (reader-thread-channel database-reader-thread-channel) - (writer-thread-channel database-writer-thread-channel) - (metrics-registry database-metrics-registry)) + (reader-thread-set database-reader-thread-set) + (writer-thread-set database-writer-thread-set) + (writer-thread-set-channel database-writer-thread-set-channel + set-database-writer-thread-set-channel!) + (metrics-registry database-metrics-registry)) (define* (db-open database #:key (write? #t)) @@ -143,28 +146,28 @@ CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs ( (sqlite-close db)) - (let ((reader-thread-channel - (make-worker-thread-channel + (let ((reader-thread-pool + (make-thread-pool + (min (max (current-processor-count) + 32) + 128) + #:thread-initializer (lambda () (let ((db (db-open database-file #:write? #f))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (list db))) - #:destructor + #:thread-destructor (lambda (db) (sqlite-close db)) - #:lifetime 50000 + #:thread-lifetime 50000 #:name "db read" - #:parallelism - (min (max (current-processor-count) - 32) - 128) #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "datastore_read_delay_seconds"))) - (lambda (seconds-delayed) + (lambda (seconds-delayed proc) (metric-observe delay-metric ;; TODO exact->inexact to work around ;; a bug in guile-prometheus where @@ -180,30 +183,31 @@ CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs ( #:log-exception? (lambda (exn) (not (guix-data-service-error? exn))))) - (writer-thread-channel - (make-worker-thread-channel + (writer-thread-pool + (make-thread-pool + ;; SQLite doesn't support parallel writes + 1 + #:thread-initializer (lambda () (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (sqlite-exec db "PRAGMA foreign_keys = ON;") (list db))) - #:destructor + #:thread-destructor (lambda (db) (db-optimize db database-file) (sqlite-close db)) - #:lifetime 500 + #:thread-lifetime 500 #:name "db write" - ;; SQLite doesn't support parallel writes - #:parallelism 1 #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "datastore_write_delay_seconds"))) - (lambda (seconds-delayed) + (lambda (seconds-delayed proc) (metric-observe delay-metric ;; TODO exact->inexact to work around ;; a bug in guile-prometheus where @@ -218,8 +222,9 @@ CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs ( seconds-delayed))))))) (make-database database-file - reader-thread-channel - writer-thread-channel + reader-thread-pool + writer-thread-pool + (thread-pool-channel writer-thread-pool) metrics-registry))) (define (db-optimize db db-filename) @@ -244,8 +249,8 @@ PRAGMA optimize;"))) (define (database-optimize database) (retry-on-error (lambda () - (call-with-worker-thread - (database-writer-thread-channel database) + (call-with-thread + (database-writer-thread-set database) (lambda (db) (db-optimize db @@ -254,6 +259,14 @@ PRAGMA optimize;"))) #:delay 5)) (define (database-spawn-fibers database) + ;; Queue messages to the writer thread, so that they're handled in a first + ;; come first served manor + (set-database-writer-thread-set-channel! + database + (spawn-queueing-fiber + (thread-pool-channel + (database-writer-thread-set database)))) + (spawn-fiber (lambda () (while #t @@ -313,10 +326,10 @@ PRAGMA optimize;"))) (apply values vals)))) #:unwind? #t)))) - (match (call-with-worker-thread + (match (call-with-thread ((if readonly? - database-reader-thread-channel - database-writer-thread-channel) + database-reader-thread-set + database-writer-thread-set) database) (lambda (db) (let ((start-time (get-internal-real-time))) @@ -337,7 +350,11 @@ PRAGMA optimize;"))) duration-seconds) (current-error-port))) - (cons duration-seconds vals))))))) + (cons duration-seconds vals)))))) + #:channel + (if readonly? + #f + (database-writer-thread-set-channel database))) ((duration vals ...) (apply values vals)))) @@ -421,8 +438,8 @@ DELETE FROM cache WHERE key = :key" (error "must specify a ttl")) (let ((cached-values - (call-with-worker-thread - (database-reader-thread-channel database) + (call-with-thread + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -455,49 +472,56 @@ SELECT data, timestamp FROM cache WHERE key = :key" (if (eq? cached-values 'noval) (call-with-values (lambda () - (call-with-worker-thread - (database-reader-thread-channel database) + (call-with-thread + (database-reader-thread-set database) (lambda (db) - (call-with-delay-logging - proc - #:args args)))) + (with-throw-handler #t + (lambda () + (call-with-delay-logging + proc + #:args args)) + (lambda args + (display (backtrace) (current-error-port)) + (newline (current-error-port))))))) (lambda vals (when (if (procedure? store-computed-value?) (apply store-computed-value? vals) store-computed-value?) - (database-call-with-transaction - database - (lambda (db) - (let ((cleanup-statement - (sqlite-prepare - db - " + (let ((vals-string + (call-with-output-string + (lambda (port) + (write vals port))))) + (database-call-with-transaction + database + (lambda (db) + (let ((cleanup-statement + (sqlite-prepare + db + " DELETE FROM cache WHERE key = :key" - #:cache? #t)) - (insert-statement - (sqlite-prepare - db - " + #:cache? #t)) + (insert-statement + (sqlite-prepare + db + " INSERT INTO cache (key, timestamp, data) VALUES (:key, :timestamp, :data)" - #:cache? #t))) + #:cache? #t))) - (sqlite-bind-arguments - cleanup-statement - #:key string-key) - (sqlite-step cleanup-statement) - (sqlite-reset cleanup-statement) + (sqlite-bind-arguments + cleanup-statement + #:key string-key) + (sqlite-step cleanup-statement) + (sqlite-reset cleanup-statement) - (sqlite-bind-arguments - insert-statement - #:key string-key - #:timestamp (time-second (current-time)) - #:data (call-with-output-string - (lambda (port) - (write vals port)))) + (sqlite-bind-arguments + insert-statement + #:key string-key + #:timestamp (time-second (current-time)) + #:data vals-string) - (sqlite-step insert-statement) - (sqlite-reset insert-statement))))) + (sqlite-step insert-statement) + (sqlite-reset insert-statement)))))) (apply values vals))) (apply values cached-values)))) @@ -546,8 +570,8 @@ WHERE category_name = :name AND category_value = :value" #t) (define (select-from-builds-to-cancel-later database category-name) - (call-with-worker-thread - (database-reader-thread-channel database) + (call-with-thread + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -602,8 +626,8 @@ VALUES (:issue, :log)" (sqlite-reset insert-statement))))) (define (select-create-branch-for-issue-log database issue) - (call-with-worker-thread - (database-reader-thread-channel database) + (call-with-thread + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare |