aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2020-05-01 18:22:19 +0100
committerChristopher Baines <mail@cbaines.net>2020-05-01 18:22:19 +0100
commit854302b496e1fe1c791fa82e525a86c37722ea88 (patch)
treedd8f9e9bb34a42bded57555a36be3b2640e0becb
parentc3a308716bbed0206e3aa46b4377dea568219f6a (diff)
downloadbuild-coordinator-854302b496e1fe1c791fa82e525a86c37722ea88.tar
build-coordinator-854302b496e1fe1c791fa82e525a86c37722ea88.tar.gz
Use two separate channels and sets of threads for SQLite operations
Originally, I didn't quite grasp that SQLite can't handle concurrent writes. Now that I'm pretty sure this is the case, use a dedicated channel backed with one thread so that the writes are serialised at the application level. I think this might help avoid some segfaults I've been seeing.
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm90
1 files changed, 54 insertions, 36 deletions
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index 14e656a..2f65f9f 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -47,7 +47,8 @@
(define-class <sqlite-datastore> (<abstract-datastore>)
database-file
- worker-thread-channel
+ worker-reader-thread-channel
+ worker-writer-thread-channel
metrics-registry)
(define* (sqlite-datastore database-uri
@@ -68,7 +69,7 @@
(slot-set!
datastore
- 'worker-thread-channel
+ 'worker-reader-thread-channel
(make-worker-thread-channel
(lambda ()
(let ((db
@@ -77,8 +78,25 @@
(sqlite-exec db "PRAGMA busy_timeout = 5000;")
(list db)))
+ ;; Use a minimum of 2 and a maximum of 8 threads
#:parallelism
- (max (current-processor-count) 4)))
+ (min (max (current-processor-count)
+ 2)
+ 8)))
+
+ (slot-set!
+ datastore
+ 'worker-writer-thread-channel
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file)))
+ (sqlite-exec db "PRAGMA journal_mode=WAL;")
+ (sqlite-exec db "PRAGMA busy_timeout = 5000;")
+ (list db)))
+
+ ;; SQLite doesn't support parallel writes
+ #:parallelism 1))
datastore))
@@ -103,7 +121,7 @@
(datastore <sqlite-datastore>)
uuid)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -132,7 +150,7 @@ SELECT description FROM agents WHERE id = :id")))
uuid
description)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-writer-thread-channel)
(lambda (db)
(insert-agent db uuid description)))
#t)
@@ -140,7 +158,7 @@ SELECT description FROM agents WHERE id = :id")))
(define-method (datastore-list-agents
(datastore <sqlite-datastore>))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -163,7 +181,7 @@ SELECT id, description FROM agents ORDER BY id")))
agent-uuid
password)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-writer-thread-channel)
(lambda (db)
(insert-agent-password db agent-uuid password)))
#t)
@@ -173,7 +191,7 @@ SELECT id, description FROM agents ORDER BY id")))
uuid
password)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -199,7 +217,7 @@ WHERE agent_id = :agent_id AND password = :password")))
(datastore <sqlite-datastore>)
derivation)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-writer-thread-channel)
(lambda (db)
(sqlite-exec db "BEGIN TRANSACTION;")
(with-exception-handler
@@ -219,7 +237,7 @@ WHERE agent_id = :agent_id AND password = :password")))
(datastore <sqlite-datastore>)
derivation)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -258,7 +276,7 @@ WHERE name != :derivation
uuid
priority)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-writer-thread-channel)
(lambda (db)
(sqlite-exec db "BEGIN TRANSACTION;")
(with-exception-handler
@@ -277,7 +295,7 @@ WHERE name != :derivation
(define-method (datastore-count-build-results
(datastore <sqlite-datastore>))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -385,7 +403,7 @@ VALUES "
#t)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-writer-thread-channel)
(lambda (db)
(sqlite-exec db "BEGIN TRANSACTION;")
(with-exception-handler
@@ -455,7 +473,7 @@ INSERT INTO setup_failure_missing_inputs (
", "))))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-writer-thread-channel)
(lambda (db)
(sqlite-exec db "BEGIN TRANSACTION;")
(with-exception-handler
@@ -480,7 +498,7 @@ INSERT INTO setup_failure_missing_inputs (
(datastore <sqlite-datastore>)
setup-failure-id)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -508,7 +526,7 @@ WHERE setup_failure_id = :id")))
agent-id
failure-reason)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-writer-thread-channel)
(lambda (db)
(insert-setup-failure-and-remove-allocation db
build-id
@@ -518,7 +536,7 @@ WHERE setup_failure_id = :id")))
(define-method (datastore-count-builds
(datastore <sqlite-datastore>))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -537,7 +555,7 @@ SELECT COUNT(*) FROM builds")))
(datastore <sqlite-datastore>)
uuid)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -570,7 +588,7 @@ WHERE uuid = :uuid")))
(datastore <sqlite-datastore>)
build-id)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -602,7 +620,7 @@ WHERE build_id = :build_id")))
"list_builds_for_output_duration_seconds"
(lambda ()
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -638,7 +656,7 @@ WHERE derivation_outputs.output = :output")))
(datastore <sqlite-datastore>)
derivation)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -670,7 +688,7 @@ SELECT uuid FROM builds WHERE derivation_name = :derivation")))
(define-method (datastore-count-setup-failures
(datastore <sqlite-datastore>))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -693,7 +711,7 @@ GROUP BY agent_id, failure_reason")))
(define-method (datastore-fetch-setup-failures
(datastore <sqlite-datastore>))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -729,7 +747,7 @@ WHERE builds.processed = 0")))
(define-method (datastore-list-processed-builds
(datastore <sqlite-datastore>))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -751,7 +769,7 @@ SELECT uuid, derivation_name, priority FROM builds WHERE processed = 1")))
(define-method (datastore-list-unprocessed-builds
(datastore <sqlite-datastore>))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -776,7 +794,7 @@ ORDER BY priority DESC")))
(define-method (datastore-count-build-allocation-plan-entries
(datastore <sqlite-datastore>))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -828,7 +846,7 @@ INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES "
"replace_build_allocation_plan_duration_seconds"
(lambda ()
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-writer-thread-channel)
(lambda (db)
(sqlite-exec db "BEGIN TRANSACTION;")
(with-exception-handler
@@ -849,7 +867,7 @@ INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES "
(define-method (datastore-count-allocated-builds
(datastore <sqlite-datastore>))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -903,7 +921,7 @@ WHERE build_id IN ("
")")))
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-writer-thread-channel)
(lambda (db)
(sqlite-exec db "BEGIN TRANSACTION;")
(with-exception-handler
@@ -925,7 +943,7 @@ WHERE build_id IN ("
agent-id
limit)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -961,7 +979,7 @@ LIMIT :limit")))
(datastore <sqlite-datastore>)
agent-id)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -992,7 +1010,7 @@ WHERE allocated_builds.agent_id = :agent_id")))
(datastore <sqlite-datastore>)
build-id)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1094,7 +1112,7 @@ SELECT name, id FROM derivation_outputs WHERE derivation_name = :derivation_name
(datastore <sqlite-datastore>)
derivation-name)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1123,7 +1141,7 @@ WHERE derivation_name = :derivation_name")))
(datastore <sqlite-datastore>)
build-id)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1163,7 +1181,7 @@ WHERE builds.uuid = :build_id")))
(datastore <sqlite-datastore>)
derivation-name)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1188,7 +1206,7 @@ WHERE name = :name")))
(datastore <sqlite-datastore>)
derivation-name)
(call-with-worker-thread
- (slot-ref datastore 'worker-thread-channel)
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare