aboutsummaryrefslogtreecommitdiff
path: root/guix-qa-frontpage/database.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-qa-frontpage/database.scm')
-rw-r--r--guix-qa-frontpage/database.scm107
1 files changed, 60 insertions, 47 deletions
diff --git a/guix-qa-frontpage/database.scm b/guix-qa-frontpage/database.scm
index c44d83a..06ce3bd 100644
--- a/guix-qa-frontpage/database.scm
+++ b/guix-qa-frontpage/database.scm
@@ -28,16 +28,15 @@
#:use-module (sqlite3)
#:use-module (fibers)
#:use-module (prometheus)
+ #:use-module (knots queue)
+ #:use-module (knots thread-pool)
#:use-module (guix narinfo)
#:use-module (guix derivations)
#:use-module ((guix-build-coordinator utils)
#:select (log-delay
call-with-delay-logging))
#:use-module ((guix-build-coordinator utils fibers)
- #:select (retry-on-error
- make-worker-thread-channel
- call-with-worker-thread
- make-queueing-channel))
+ #:select (retry-on-error))
#:use-module (guix-qa-frontpage guix-data-service)
#:export (setup-database
@@ -58,14 +57,16 @@
delete-create-branch-for-issue-log))
(define-record-type <database>
- (make-database database-file reader-thread-channel writer-thread-channel
+ (make-database database-file reader-thread-set writer-thread-set
+ writer-thread-set-channel
metrics-registry)
database?
(database-file database-file)
- (reader-thread-channel database-reader-thread-channel)
- (writer-thread-channel database-writer-thread-channel
- set-database-writer-thread-channel!)
- (metrics-registry database-metrics-registry))
+ (reader-thread-set database-reader-thread-set)
+ (writer-thread-set database-writer-thread-set)
+ (writer-thread-set-channel database-writer-thread-set-channel
+ set-database-writer-thread-set-channel!)
+ (metrics-registry database-metrics-registry))
(define* (db-open database
#:key (write? #t))
@@ -145,28 +146,28 @@ CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs (
(sqlite-close db))
- (let ((reader-thread-channel
- (make-worker-thread-channel
+ (let ((reader-thread-pool
+ (make-thread-pool
+ (min (max (current-processor-count)
+ 32)
+ 128)
+ #:thread-initializer
(lambda ()
(let ((db
(db-open database-file #:write? #f)))
(sqlite-exec db "PRAGMA busy_timeout = 5000;")
(list db)))
- #:destructor
+ #:thread-destructor
(lambda (db)
(sqlite-close db))
- #:lifetime 50000
+ #:thread-lifetime 50000
#:name "db read"
- #:parallelism
- (min (max (current-processor-count)
- 32)
- 128)
#:delay-logger (let ((delay-metric
(make-histogram-metric
metrics-registry
"datastore_read_delay_seconds")))
- (lambda (seconds-delayed)
+ (lambda (seconds-delayed proc)
(metric-observe delay-metric
;; TODO exact->inexact to work around
;; a bug in guile-prometheus where
@@ -182,30 +183,31 @@ CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs (
#:log-exception?
(lambda (exn)
(not (guix-data-service-error? exn)))))
- (writer-thread-channel
- (make-worker-thread-channel
+ (writer-thread-pool
+ (make-thread-pool
+ ;; SQLite doesn't support parallel writes
+ 1
+ #:thread-initializer
(lambda ()
(let ((db
(db-open database-file)))
(sqlite-exec db "PRAGMA busy_timeout = 5000;")
(sqlite-exec db "PRAGMA foreign_keys = ON;")
(list db)))
- #:destructor
+ #:thread-destructor
(lambda (db)
(db-optimize db
database-file)
(sqlite-close db))
- #:lifetime 500
+ #:thread-lifetime 500
#:name "db write"
- ;; SQLite doesn't support parallel writes
- #:parallelism 1
#:delay-logger (let ((delay-metric
(make-histogram-metric
metrics-registry
"datastore_write_delay_seconds")))
- (lambda (seconds-delayed)
+ (lambda (seconds-delayed proc)
(metric-observe delay-metric
;; TODO exact->inexact to work around
;; a bug in guile-prometheus where
@@ -220,8 +222,9 @@ CREATE TABLE IF NOT EXISTS create_branch_for_issue_logs (
seconds-delayed)))))))
(make-database database-file
- reader-thread-channel
- writer-thread-channel
+ reader-thread-pool
+ writer-thread-pool
+ (thread-pool-channel writer-thread-pool)
metrics-registry)))
(define (db-optimize db db-filename)
@@ -246,8 +249,8 @@ PRAGMA optimize;")))
(define (database-optimize database)
(retry-on-error
(lambda ()
- (call-with-worker-thread
- (database-writer-thread-channel database)
+ (call-with-thread
+ (database-writer-thread-set database)
(lambda (db)
(db-optimize
db
@@ -258,10 +261,11 @@ PRAGMA optimize;")))
(define (database-spawn-fibers database)
;; Queue messages to the writer thread, so that they're handled in a first
;; come first served manor
- (set-database-writer-thread-channel!
+ (set-database-writer-thread-set-channel!
database
- (make-queueing-channel
- (database-writer-thread-channel database)))
+ (spawn-queueing-fiber
+ (thread-pool-channel
+ (database-writer-thread-set database))))
(spawn-fiber
(lambda ()
@@ -322,10 +326,10 @@ PRAGMA optimize;")))
(apply values vals))))
#:unwind? #t))))
- (match (call-with-worker-thread
+ (match (call-with-thread
((if readonly?
- database-reader-thread-channel
- database-writer-thread-channel)
+ database-reader-thread-set
+ database-writer-thread-set)
database)
(lambda (db)
(let ((start-time (get-internal-real-time)))
@@ -346,7 +350,11 @@ PRAGMA optimize;")))
duration-seconds)
(current-error-port)))
- (cons duration-seconds vals)))))))
+ (cons duration-seconds vals))))))
+ #:channel
+ (if readonly?
+ #f
+ (database-writer-thread-set-channel database)))
((duration vals ...)
(apply values vals))))
@@ -430,8 +438,8 @@ DELETE FROM cache WHERE key = :key"
(error "must specify a ttl"))
(let ((cached-values
- (call-with-worker-thread
- (database-reader-thread-channel database)
+ (call-with-thread
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -464,12 +472,17 @@ SELECT data, timestamp FROM cache WHERE key = :key"
(if (eq? cached-values 'noval)
(call-with-values
(lambda ()
- (call-with-worker-thread
- (database-reader-thread-channel database)
+ (call-with-thread
+ (database-reader-thread-set database)
(lambda (db)
- (call-with-delay-logging
- proc
- #:args args))))
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-delay-logging
+ proc
+ #:args args))
+ (lambda args
+ (display (backtrace) (current-error-port))
+ (newline (current-error-port)))))))
(lambda vals
(when (if (procedure? store-computed-value?)
(apply store-computed-value? vals)
@@ -557,8 +570,8 @@ WHERE category_name = :name AND category_value = :value"
#t)
(define (select-from-builds-to-cancel-later database category-name)
- (call-with-worker-thread
- (database-reader-thread-channel database)
+ (call-with-thread
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -613,8 +626,8 @@ VALUES (:issue, :log)"
(sqlite-reset insert-statement)))))
(define (select-create-branch-for-issue-log database issue)
- (call-with-worker-thread
- (database-reader-thread-channel database)
+ (call-with-thread
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare