aboutsummaryrefslogtreecommitdiff
path: root/guix-qa-frontpage/database.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-qa-frontpage/database.scm')
-rw-r--r--guix-qa-frontpage/database.scm162
1 files changed, 93 insertions, 69 deletions
diff --git a/guix-qa-frontpage/database.scm b/guix-qa-frontpage/database.scm
index 5649c36..06ce3bd 100644
--- a/guix-qa-frontpage/database.scm
+++ b/guix-qa-frontpage/database.scm
@@ -28,15 +28,15 @@
#:use-module (sqlite3)
#:use-module (fibers)
#:use-module (prometheus)
+ #:use-module (knots queue)
+ #:use-module (knots thread-pool)
#:use-module (guix narinfo)
#:use-module (guix derivations)
#:use-module ((guix-build-coordinator utils)
#:select (log-delay
call-with-delay-logging))
#:use-module ((guix-build-coordinator utils fibers)
- #:select (retry-on-error
- make-worker-thread-channel
- call-with-worker-thread))
+ #:select (retry-on-error))
#:use-module (guix-qa-frontpage guix-data-service)
#:export (setup-database
@@ -57,13 +57,16 @@
delete-create-branch-for-issue-log))
(define-record-type <database>
- (make-database database-file reader-thread-channel writer-thread-channel
+ (make-database database-file reader-thread-set writer-thread-set
+ writer-thread-set-channel
metrics-registry)
database?
(database-file database-file)
- (reader-thread-channel database-reader-thread-channel)
- (writer-thread-channel database-writer-thread-channel)
- (metrics-registry database-metrics-registry))
+ (reader-thread-set database-reader-thread-set)
+ (writer-thread-set database-writer-thread-set)
+ (writer-thread-set-channel database-writer-thread-set-channel
+ set-database-writer-thread-set-channel!)
+ (metrics-registry database-metrics-registry))
(define* (db-open database
#:key (write? #t))
@@ -143,28 +146,28 @@ CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs (
(sqlite-close db))
- (let ((reader-thread-channel
- (make-worker-thread-channel
+ (let ((reader-thread-pool
+ (make-thread-pool
+ (min (max (current-processor-count)
+ 32)
+ 128)
+ #:thread-initializer
(lambda ()
(let ((db
(db-open database-file #:write? #f)))
(sqlite-exec db "PRAGMA busy_timeout = 5000;")
(list db)))
- #:destructor
+ #:thread-destructor
(lambda (db)
(sqlite-close db))
- #:lifetime 50000
+ #:thread-lifetime 50000
#:name "db read"
- #:parallelism
- (min (max (current-processor-count)
- 32)
- 128)
#:delay-logger (let ((delay-metric
(make-histogram-metric
metrics-registry
"datastore_read_delay_seconds")))
- (lambda (seconds-delayed)
+ (lambda (seconds-delayed proc)
(metric-observe delay-metric
;; TODO exact->inexact to work around
;; a bug in guile-prometheus where
@@ -180,30 +183,31 @@ CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs (
#:log-exception?
(lambda (exn)
(not (guix-data-service-error? exn)))))
- (writer-thread-channel
- (make-worker-thread-channel
+ (writer-thread-pool
+ (make-thread-pool
+ ;; SQLite doesn't support parallel writes
+ 1
+ #:thread-initializer
(lambda ()
(let ((db
(db-open database-file)))
(sqlite-exec db "PRAGMA busy_timeout = 5000;")
(sqlite-exec db "PRAGMA foreign_keys = ON;")
(list db)))
- #:destructor
+ #:thread-destructor
(lambda (db)
(db-optimize db
database-file)
(sqlite-close db))
- #:lifetime 500
+ #:thread-lifetime 500
#:name "db write"
- ;; SQLite doesn't support parallel writes
- #:parallelism 1
#:delay-logger (let ((delay-metric
(make-histogram-metric
metrics-registry
"datastore_write_delay_seconds")))
- (lambda (seconds-delayed)
+ (lambda (seconds-delayed proc)
(metric-observe delay-metric
;; TODO exact->inexact to work around
;; a bug in guile-prometheus where
@@ -218,8 +222,9 @@ CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs (
seconds-delayed)))))))
(make-database database-file
- reader-thread-channel
- writer-thread-channel
+ reader-thread-pool
+ writer-thread-pool
+ (thread-pool-channel writer-thread-pool)
metrics-registry)))
(define (db-optimize db db-filename)
@@ -244,8 +249,8 @@ PRAGMA optimize;")))
(define (database-optimize database)
(retry-on-error
(lambda ()
- (call-with-worker-thread
- (database-writer-thread-channel database)
+ (call-with-thread
+ (database-writer-thread-set database)
(lambda (db)
(db-optimize
db
@@ -254,6 +259,14 @@ PRAGMA optimize;")))
#:delay 5))
(define (database-spawn-fibers database)
+ ;; Queue messages to the writer thread, so that they're handled in a first
+ ;; come first served manor
+ (set-database-writer-thread-set-channel!
+ database
+ (spawn-queueing-fiber
+ (thread-pool-channel
+ (database-writer-thread-set database))))
+
(spawn-fiber
(lambda ()
(while #t
@@ -313,10 +326,10 @@ PRAGMA optimize;")))
(apply values vals))))
#:unwind? #t))))
- (match (call-with-worker-thread
+ (match (call-with-thread
((if readonly?
- database-reader-thread-channel
- database-writer-thread-channel)
+ database-reader-thread-set
+ database-writer-thread-set)
database)
(lambda (db)
(let ((start-time (get-internal-real-time)))
@@ -337,7 +350,11 @@ PRAGMA optimize;")))
duration-seconds)
(current-error-port)))
- (cons duration-seconds vals)))))))
+ (cons duration-seconds vals))))))
+ #:channel
+ (if readonly?
+ #f
+ (database-writer-thread-set-channel database)))
((duration vals ...)
(apply values vals))))
@@ -421,8 +438,8 @@ DELETE FROM cache WHERE key = :key"
(error "must specify a ttl"))
(let ((cached-values
- (call-with-worker-thread
- (database-reader-thread-channel database)
+ (call-with-thread
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -455,49 +472,56 @@ SELECT data, timestamp FROM cache WHERE key = :key"
(if (eq? cached-values 'noval)
(call-with-values
(lambda ()
- (call-with-worker-thread
- (database-reader-thread-channel database)
+ (call-with-thread
+ (database-reader-thread-set database)
(lambda (db)
- (call-with-delay-logging
- proc
- #:args args))))
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-delay-logging
+ proc
+ #:args args))
+ (lambda args
+ (display (backtrace) (current-error-port))
+ (newline (current-error-port)))))))
(lambda vals
(when (if (procedure? store-computed-value?)
(apply store-computed-value? vals)
store-computed-value?)
- (database-call-with-transaction
- database
- (lambda (db)
- (let ((cleanup-statement
- (sqlite-prepare
- db
- "
+ (let ((vals-string
+ (call-with-output-string
+ (lambda (port)
+ (write vals port)))))
+ (database-call-with-transaction
+ database
+ (lambda (db)
+ (let ((cleanup-statement
+ (sqlite-prepare
+ db
+ "
DELETE FROM cache WHERE key = :key"
- #:cache? #t))
- (insert-statement
- (sqlite-prepare
- db
- "
+ #:cache? #t))
+ (insert-statement
+ (sqlite-prepare
+ db
+ "
INSERT INTO cache (key, timestamp, data)
VALUES (:key, :timestamp, :data)"
- #:cache? #t)))
+ #:cache? #t)))
- (sqlite-bind-arguments
- cleanup-statement
- #:key string-key)
- (sqlite-step cleanup-statement)
- (sqlite-reset cleanup-statement)
+ (sqlite-bind-arguments
+ cleanup-statement
+ #:key string-key)
+ (sqlite-step cleanup-statement)
+ (sqlite-reset cleanup-statement)
- (sqlite-bind-arguments
- insert-statement
- #:key string-key
- #:timestamp (time-second (current-time))
- #:data (call-with-output-string
- (lambda (port)
- (write vals port))))
+ (sqlite-bind-arguments
+ insert-statement
+ #:key string-key
+ #:timestamp (time-second (current-time))
+ #:data vals-string)
- (sqlite-step insert-statement)
- (sqlite-reset insert-statement)))))
+ (sqlite-step insert-statement)
+ (sqlite-reset insert-statement))))))
(apply values vals)))
(apply values cached-values))))
@@ -546,8 +570,8 @@ WHERE category_name = :name AND category_value = :value"
#t)
(define (select-from-builds-to-cancel-later database category-name)
- (call-with-worker-thread
- (database-reader-thread-channel database)
+ (call-with-thread
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -602,8 +626,8 @@ VALUES (:issue, :log)"
(sqlite-reset insert-statement)))))
(define (select-create-branch-for-issue-log database issue)
- (call-with-worker-thread
- (database-reader-thread-channel database)
+ (call-with-thread
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare