diff options
author | Christopher Baines <mail@cbaines.net> | 2020-05-01 18:22:19 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2020-05-01 18:22:19 +0100 |
commit | 854302b496e1fe1c791fa82e525a86c37722ea88 (patch) | |
tree | dd8f9e9bb34a42bded57555a36be3b2640e0becb | |
parent | c3a308716bbed0206e3aa46b4377dea568219f6a (diff) | |
download | build-coordinator-854302b496e1fe1c791fa82e525a86c37722ea88.tar build-coordinator-854302b496e1fe1c791fa82e525a86c37722ea88.tar.gz |
Use two separate channels and sets of threads for SQLite operations
Originally, I didn't quite grasp that SQLite can't handle concurrent
writes. Now that I'm pretty sure this is the case, use a dedicated channel
backed with one thread so that the writes are serialised at the application
level. I think this might help avoid some segfaults I've been seeing.
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 90 |
1 files changed, 54 insertions, 36 deletions
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index 14e656a..2f65f9f 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -47,7 +47,8 @@ (define-class <sqlite-datastore> (<abstract-datastore>) database-file - worker-thread-channel + worker-reader-thread-channel + worker-writer-thread-channel metrics-registry) (define* (sqlite-datastore database-uri @@ -68,7 +69,7 @@ (slot-set! datastore - 'worker-thread-channel + 'worker-reader-thread-channel (make-worker-thread-channel (lambda () (let ((db @@ -77,8 +78,25 @@ (sqlite-exec db "PRAGMA busy_timeout = 5000;") (list db))) + ;; Use a minimum of 2 and a maximum of 8 threads #:parallelism - (max (current-processor-count) 4))) + (min (max (current-processor-count) + 2) + 8))) + + (slot-set! + datastore + 'worker-writer-thread-channel + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file))) + (sqlite-exec db "PRAGMA journal_mode=WAL;") + (sqlite-exec db "PRAGMA busy_timeout = 5000;") + (list db))) + + ;; SQLite doesn't support parallel writes + #:parallelism 1)) datastore)) @@ -103,7 +121,7 @@ (datastore <sqlite-datastore>) uuid) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -132,7 +150,7 @@ SELECT description FROM agents WHERE id = :id"))) uuid description) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-agent db uuid description))) #t) @@ -140,7 +158,7 @@ SELECT description FROM agents WHERE id = :id"))) (define-method (datastore-list-agents (datastore <sqlite-datastore>)) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -163,7 +181,7 @@ SELECT id, description FROM agents ORDER BY id"))) agent-uuid password) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-agent-password db agent-uuid password))) #t) @@ -173,7 +191,7 @@ SELECT id, description FROM agents ORDER BY id"))) uuid password) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -199,7 +217,7 @@ WHERE agent_id = :agent_id AND password = :password"))) (datastore <sqlite-datastore>) derivation) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler @@ -219,7 +237,7 @@ WHERE agent_id = :agent_id AND password = :password"))) (datastore <sqlite-datastore>) derivation) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -258,7 +276,7 @@ WHERE name != :derivation uuid priority) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler @@ -277,7 +295,7 @@ WHERE name != :derivation (define-method (datastore-count-build-results (datastore <sqlite-datastore>)) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -385,7 +403,7 @@ VALUES " #t) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler @@ -455,7 +473,7 @@ INSERT INTO setup_failure_missing_inputs ( ", ")))) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler @@ -480,7 +498,7 @@ INSERT INTO setup_failure_missing_inputs ( (datastore <sqlite-datastore>) setup-failure-id) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -508,7 +526,7 @@ WHERE setup_failure_id = :id"))) agent-id failure-reason) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-setup-failure-and-remove-allocation db build-id @@ -518,7 +536,7 @@ WHERE setup_failure_id = :id"))) (define-method (datastore-count-builds (datastore <sqlite-datastore>)) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -537,7 +555,7 @@ SELECT COUNT(*) FROM builds"))) (datastore <sqlite-datastore>) uuid) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -570,7 +588,7 @@ WHERE uuid = :uuid"))) (datastore <sqlite-datastore>) build-id) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -602,7 +620,7 @@ WHERE build_id = :build_id"))) "list_builds_for_output_duration_seconds" (lambda () (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -638,7 +656,7 @@ WHERE derivation_outputs.output = :output"))) (datastore <sqlite-datastore>) derivation) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -670,7 +688,7 @@ SELECT uuid FROM builds WHERE derivation_name = :derivation"))) (define-method (datastore-count-setup-failures (datastore <sqlite-datastore>)) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -693,7 +711,7 @@ GROUP BY agent_id, failure_reason"))) (define-method (datastore-fetch-setup-failures (datastore <sqlite-datastore>)) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -729,7 +747,7 @@ WHERE builds.processed = 0"))) (define-method (datastore-list-processed-builds (datastore <sqlite-datastore>)) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -751,7 +769,7 @@ SELECT uuid, derivation_name, priority FROM builds WHERE processed = 1"))) (define-method (datastore-list-unprocessed-builds (datastore <sqlite-datastore>)) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -776,7 +794,7 @@ ORDER BY priority DESC"))) (define-method (datastore-count-build-allocation-plan-entries (datastore <sqlite-datastore>)) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -828,7 +846,7 @@ INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES " "replace_build_allocation_plan_duration_seconds" (lambda () (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler @@ -849,7 +867,7 @@ INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES " (define-method (datastore-count-allocated-builds (datastore <sqlite-datastore>)) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -903,7 +921,7 @@ WHERE build_id IN (" ")"))) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler @@ -925,7 +943,7 @@ WHERE build_id IN (" agent-id limit) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -961,7 +979,7 @@ LIMIT :limit"))) (datastore <sqlite-datastore>) agent-id) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -992,7 +1010,7 @@ WHERE allocated_builds.agent_id = :agent_id"))) (datastore <sqlite-datastore>) build-id) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -1094,7 +1112,7 @@ SELECT name, id FROM derivation_outputs WHERE derivation_name = :derivation_name (datastore <sqlite-datastore>) derivation-name) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -1123,7 +1141,7 @@ WHERE derivation_name = :derivation_name"))) (datastore <sqlite-datastore>) build-id) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -1163,7 +1181,7 @@ WHERE builds.uuid = :build_id"))) (datastore <sqlite-datastore>) derivation-name) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare @@ -1188,7 +1206,7 @@ WHERE name = :name"))) (datastore <sqlite-datastore>) derivation-name) (call-with-worker-thread - (slot-ref datastore 'worker-thread-channel) + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare |