aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/datastore/sqlite.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator/datastore/sqlite.scm')
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm550
1 files changed, 194 insertions, 356 deletions
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index e67a940..ac04362 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -87,7 +87,7 @@
datastore-replace-agent-tags
datastore-list-processed-builds
datastore-list-unprocessed-builds
- datastore-find-first-unallocated-deferred-build
+ datastore-find-deferred-build
datastore-fetch-prioritised-unprocessed-builds
datastore-insert-unprocessed-hook-event
datastore-count-unprocessed-hook-events
@@ -96,21 +96,20 @@
datastore-delete-unprocessed-hook-event
datastore-list-agent-builds
datastore-agent-for-build
- datastore-count-build-allocation-plan-entries
datastore-replace-build-allocation-plan
- datastore-remove-build-from-allocation-plan
datastore-count-allocated-builds
datastore-agent-requested-systems
datastore-update-agent-requested-systems
datastore-fetch-build-to-allocate
- datastore-insert-to-allocated-builds
- datastore-remove-builds-from-plan
- datastore-list-allocation-plan-builds))
+ datastore-check-if-derivation-conflicts?
+ datastore-insert-to-allocated-builds))
(define-class <sqlite-datastore> (<abstract-datastore>)
database-file
worker-reader-thread-channel
+ worker-reader-thread-proc-vector
worker-writer-thread-channel
+ worker-writer-thread-proc-vector
metrics-registry)
(define* (sqlite-datastore database-uri
@@ -140,138 +139,129 @@
(slot-set! datastore 'database-file database-file)
(slot-set! datastore 'metrics-registry metrics-registry)
- (slot-set!
- datastore
- 'worker-writer-thread-channel
- (make-worker-thread-channel
- (lambda ()
- (let ((db
- (db-open database-file)))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
- (sqlite-exec db "PRAGMA synchronous = NORMAL;")
- (sqlite-exec db "PRAGMA temp_store = MEMORY;")
- (sqlite-exec db "PRAGMA foreign_keys = ON;")
-
- (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;")
-
- (sqlite-exec
- db
- "
-CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
- build_id INTEGER NOT NULL,
- agent_id TEXT NOT NULL,
- ordering INTEGER NOT NULL,
- PRIMARY KEY (agent_id, build_id)
-);")
-
- (list db)))
- #:name "ds write"
- #:destructor
- (let ((writer-thread-destructor-counter
- (make-gauge-metric metrics-registry
- "datastore_writer_thread_close_total")))
- (lambda (db)
- (db-optimize db
- database-file
- metrics-registry
- #:maybe-truncate-wal? #f)
-
- (metric-increment writer-thread-destructor-counter)
- (sqlite-close db)))
- #:lifetime 500
- #:expire-on-exception? #t
-
- ;; SQLite doesn't support parallel writes
- #:parallelism 1
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
- metrics-registry
- "datastore_write_delay_seconds")))
- (lambda (seconds-delayed)
- (metric-observe delay-metric
- ;; TODO exact->inexact to work around
- ;; a bug in guile-prometheus where
- ;; the metric sum will output in the
- ;; exact form including the /q
- (exact->inexact seconds-delayed))
- (log-delay "datastore write" seconds-delayed)
- (when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database write delayed by ~1,2f seconds~%"
- seconds-delayed))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 10)
- (format
- (current-error-port)
- "warning: database write took ~1,2f seconds (~a)~%"
- duration
- proc)))
- #:log-exception? worker-thread-log-exception?))
-
- ;; Make sure the worker thread has initialised, and created the in memory
- ;; tables
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
- (const #t))
-
- (slot-set!
- datastore
- 'worker-reader-thread-channel
- (make-worker-thread-channel
- (lambda ()
- (let ((db
- (db-open database-file #:write? #f)))
- (sqlite-exec db "PRAGMA temp_store = MEMORY;")
- (sqlite-exec db "PRAGMA busy_timeout = 4000;")
- (sqlite-exec db "PRAGMA cache_size = -16000;")
-
- (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;")
-
- (list db)))
- #:name "ds read"
- #:destructor
- (let ((reader-thread-destructor-counter
- (make-gauge-metric metrics-registry
- "datastore_reader_thread_close_total")))
- (lambda (db)
- (metric-increment reader-thread-destructor-counter)
- (sqlite-close db)))
- #:lifetime 50000
- #:expire-on-exception? #t
-
- ;; Use a minimum of 8 and a maximum of 16 threads
- #:parallelism
- (min (max (current-processor-count)
- 8)
- 16)
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
+ (let ((channel
+ proc-vector
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file)))
+ (sqlite-exec db "PRAGMA busy_timeout = 0;")
+ (sqlite-exec db "PRAGMA synchronous = NORMAL;")
+ (sqlite-exec db "PRAGMA temp_store = MEMORY;")
+ (sqlite-exec db "PRAGMA foreign_keys = ON;")
+
+ (list db)))
+ #:name "ds write"
+ #:destructor
+ (let ((writer-thread-destructor-counter
+ (make-gauge-metric metrics-registry
+ "datastore_writer_thread_close_total")))
+ (lambda (db)
+ (db-optimize db
+ database-file
metrics-registry
- "datastore_read_delay_seconds")))
- (lambda (seconds-delayed)
- (metric-observe delay-metric
- ;; TODO exact->inexact to work around
- ;; a bug in guile-prometheus where
- ;; the metric sum will output in the
- ;; exact form including the /q
- (exact->inexact seconds-delayed))
- (log-delay "datastore read" seconds-delayed)
- (when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database read delayed by ~1,2f seconds~%"
- seconds-delayed))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 30)
- (format
- (current-error-port)
- "warning: database read took ~1,2f seconds (~a)~%"
- duration
- proc)))
- #:log-exception? worker-thread-log-exception?))
+ #:maybe-truncate-wal? #f)
+
+ (metric-increment writer-thread-destructor-counter)
+ (sqlite-close db)))
+ #:lifetime 500
+ #:expire-on-exception? #t
+
+ ;; SQLite doesn't support parallel writes
+ #:parallelism 1
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "datastore_write_delay_seconds")))
+ (lambda (seconds-delayed)
+ (metric-observe delay-metric
+ ;; TODO exact->inexact to work around
+ ;; a bug in guile-prometheus where
+ ;; the metric sum will output in the
+ ;; exact form including the /q
+ (exact->inexact seconds-delayed))
+ (log-delay "datastore write" seconds-delayed)
+ (when (> seconds-delayed 1)
+ (format
+ (current-error-port)
+ "warning: database write delayed by ~1,2f seconds~%"
+ seconds-delayed))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 10)
+ (format
+ (current-error-port)
+ "warning: database write took ~1,2f seconds (~a)~%"
+ duration
+ proc)))
+ #:log-exception? worker-thread-log-exception?)))
+
+ (slot-set! datastore
+ 'worker-writer-thread-channel
+ channel)
+ (slot-set! datastore
+ 'worker-writer-thread-proc-vector
+ proc-vector))
+
+ (let ((channel
+ proc-vector
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file #:write? #f)))
+ (sqlite-exec db "PRAGMA temp_store = MEMORY;")
+ (sqlite-exec db "PRAGMA busy_timeout = 0;")
+ (sqlite-exec db "PRAGMA cache_size = -16000;")
+
+ (list db)))
+ #:name "ds read"
+ #:destructor
+ (let ((reader-thread-destructor-counter
+ (make-gauge-metric metrics-registry
+ "datastore_reader_thread_close_total")))
+ (lambda (db)
+ (metric-increment reader-thread-destructor-counter)
+ (sqlite-close db)))
+ #:lifetime 50000
+ #:expire-on-exception? #t
+
+ ;; Use a minimum of 8 and a maximum of 16 threads
+ #:parallelism
+ (min (max (current-processor-count)
+ 8)
+ 16)
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "datastore_read_delay_seconds")))
+ (lambda (seconds-delayed)
+ (metric-observe delay-metric
+ ;; TODO exact->inexact to work around
+ ;; a bug in guile-prometheus where
+ ;; the metric sum will output in the
+ ;; exact form including the /q
+ (exact->inexact seconds-delayed))
+ (log-delay "datastore read" seconds-delayed)
+ (when (> seconds-delayed 1)
+ (format
+ (current-error-port)
+ "warning: database read delayed by ~1,2f seconds~%"
+ seconds-delayed))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 30)
+ (format
+ (current-error-port)
+ "warning: database read took ~1,2f seconds (~a)~%"
+ duration
+ proc)))
+ #:log-exception? worker-thread-log-exception?)))
+ (slot-set! datastore
+ 'worker-reader-thread-channel
+ channel)
+ (slot-set! datastore
+ 'worker-reader-thread-proc-vector
+ proc-vector))
datastore))
@@ -305,11 +295,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
metrics-registry
checkpoint-duration-metric-name
(lambda ()
- (if (> (wal-size) extreme-wal-size-threshold)
- ;; Since the WAL is really getting too big, wait for much longer
- (sqlite-exec db "PRAGMA busy_timeout = 300000;")
- (sqlite-exec db "PRAGMA busy_timeout = 20;"))
-
(let* ((statement
(sqlite-prepare
db
@@ -331,8 +316,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
modified-page-count
pages-moved-to-db)
#t))))))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
-
result)))
#t))
@@ -358,6 +341,25 @@ PRAGMA optimize;")
(define-method (datastore-spawn-fibers
(datastore <sqlite-datastore>))
+ ;; Queue messages to the writer thread, so that they're handled in a first
+ ;; come first served manor
+ (slot-set!
+ datastore
+ 'worker-writer-thread-channel
+ (make-queueing-channel
+ (slot-ref datastore 'worker-writer-thread-channel)))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (sleep 20)
+ (vector-for-each
+ (lambda (i proc)
+ (simple-format (current-error-port)
+ "reader thread ~A running: ~A\n"
+ i proc))
+ (slot-ref datastore 'worker-reader-thread-proc-vector)))))
+
(spawn-fiber
(lambda ()
(while #t
@@ -1734,42 +1736,6 @@ WHERE build_id = :build_id"
#t)
rest))
-(define-method (datastore-remove-build-from-allocation-plan
- (datastore <sqlite-datastore>)
- uuid)
- (define (update-build-allocation-plan-metrics)
- (let ((allocation-plan-metric
- (metrics-registry-fetch-metric
- (slot-ref datastore 'metrics-registry)
- "build_allocation_plan_total")))
- (for-each
- (match-lambda
- ((agent-id . count)
- (metric-set allocation-plan-metric
- count
- #:label-values
- `((agent_id . ,agent-id)))))
- (datastore-count-build-allocation-plan-entries datastore))))
-
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (let ((statement (sqlite-prepare
- db
- "
-DELETE FROM build_allocation_plan WHERE build_id = :build_id"
- #:cache? #t)))
-
- (sqlite-bind-arguments
- statement
- #:build_id (db-find-build-id db uuid))
-
- (sqlite-step-and-reset statement)
-
- (unless (= 0 (changes-count db))
- (update-build-allocation-plan-metrics)))))
- #t)
-
(define-method (datastore-count-build-results
(datastore <sqlite-datastore>))
(call-with-worker-thread
@@ -3016,8 +2982,9 @@ ORDER BY priority DESC"
builds)))))
-(define-method (datastore-find-first-unallocated-deferred-build
- (datastore <sqlite-datastore>))
+(define-method (datastore-find-deferred-build
+ (datastore <sqlite-datastore>)
+ select?)
(call-with-worker-thread
(slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
@@ -3032,25 +2999,29 @@ INNER JOIN derivations
WHERE processed = 0
AND canceled = 0
AND deferred_until IS NOT NULL
- AND builds.id NOT IN (SELECT build_id FROM build_allocation_plan)
-ORDER BY deferred_until ASC
-LIMIT 1"
+ORDER BY deferred_until ASC"
#:cache? #t)))
- (match (sqlite-step-and-reset statement)
- (#(uuid derivation_name priority created_at deferred_until)
- `((uuid . ,uuid)
- (derivation-name . ,derivation_name)
- (priority . ,priority)
- (created-at . ,(if (string? created_at)
- (string->date created_at
- "~Y-~m-~d ~H:~M:~S")
- #f))
- (deferred-until . ,(if (string? deferred_until)
- (string->date deferred_until
- "~Y-~m-~d ~H:~M:~S")
- #f))))
- (#f #f))))))
+ (let loop ((row (sqlite-step statement)))
+ (match row
+ (#(uuid derivation_name priority created_at deferred_until)
+ (let ((res
+ (select?
+ `((uuid . ,uuid)
+ (derivation-name . ,derivation_name)
+ (priority . ,priority)
+ (created-at . ,(if (string? created_at)
+ (string->date created_at
+ "~Y-~m-~d ~H:~M:~S")
+ #f))
+ (deferred-until . ,(if (string? deferred_until)
+ (string->date deferred_until
+ "~Y-~m-~d ~H:~M:~S")
+ #f))))))
+ (if res
+ res
+ (loop (sqlite-step statement)))))
+ (#f #f)))))))
(define-method (datastore-fetch-prioritised-unprocessed-builds
(datastore <sqlite-datastore>))
@@ -3222,103 +3193,6 @@ DELETE FROM unprocessed_hook_events WHERE id = :id"
(sqlite-step-and-reset statement))))
#t)
-(define-method (datastore-count-build-allocation-plan-entries
- (datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (let ((statement
- (sqlite-prepare
- db
- "
-SELECT agent_id, COUNT(*)
-FROM build_allocation_plan
-GROUP BY agent_id"
- #:cache? #t)))
-
- (let ((result
- (sqlite-map
- (match-lambda
- (#(agent_id count)
- (cons agent_id count)))
- statement)))
- (sqlite-reset statement)
-
- result)))))
-
-(define-method (datastore-replace-build-allocation-plan
- (datastore <sqlite-datastore>)
- planned-builds)
- (define (clear-current-plan db)
- (sqlite-exec
- db
- "DELETE FROM build_allocation_plan"))
-
- (define insert-sql
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (string-append
- "
-INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES "
- (string-join
- (map (match-lambda
- ((build-uuid agent-id ordering)
- (simple-format
- #f
- "('~A', '~A', ~A)"
- (db-find-build-id db build-uuid)
- agent-id
- ordering)))
- planned-builds)
- ", ")
- ";"))))
-
- (define (insert-new-plan db planned-builds)
- (sqlite-exec
- db
- insert-sql))
-
- (datastore-call-with-transaction
- datastore
- (lambda (db)
- (clear-current-plan db)
- (unless (null? planned-builds)
- (insert-new-plan db planned-builds)))
- #:duration-metric-name "replace_build_allocation_plan")
-
- (let* ((agent-ids
- (map (lambda (agent-details)
- (assq-ref agent-details 'uuid))
- (datastore-list-agents datastore)))
- (counts-by-agent
- (make-vector (length agent-ids) 0)))
- (for-each
- (match-lambda
- ((_ agent-id _)
- (let ((index (list-index (lambda (list-agent-id)
- (string=? agent-id list-agent-id))
- agent-ids)))
- (vector-set! counts-by-agent
- index
- (+ (vector-ref counts-by-agent
- index)
- 1)))))
- planned-builds)
-
- (let ((metric
- (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry)
- "build_allocation_plan_total")))
- (for-each
- (lambda (index agent-id)
- (metric-set metric
- (vector-ref counts-by-agent index)
- #:label-values
- `((agent_id . ,agent-id))))
- (iota (length agent-ids))
- agent-ids)))
- #t)
-
(define-method (datastore-count-allocated-builds
(datastore <sqlite-datastore>))
(call-with-worker-thread/delay-logging
@@ -3418,34 +3292,40 @@ INSERT INTO build_allocation_agent_requested_systems (agent_id, system_id) VALUE
(define-method (datastore-fetch-build-to-allocate
(datastore <sqlite-datastore>)
- agent-id)
- (datastore-call-with-transaction
- datastore
+ build-id)
+ (call-with-worker-thread
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
db
- ;; This needs to guard against the plan being out of date
"
SELECT builds.uuid, derivations.id, derivations.name,
unprocessed_builds_with_derived_priorities.derived_priority
FROM builds
-INNER JOIN build_allocation_plan
- ON builds.id = build_allocation_plan.build_id
INNER JOIN derivations
ON builds.derivation_id = derivations.id
-INNER JOIN build_allocation_agent_requested_systems
- ON build_allocation_agent_requested_systems.agent_id = :agent_id
- AND build_allocation_agent_requested_systems.system_id = derivations.system_id
LEFT JOIN unprocessed_builds_with_derived_priorities
ON unprocessed_builds_with_derived_priorities.build_id = builds.id
-WHERE build_allocation_plan.agent_id = :agent_id
+WHERE builds.uuid = :uuid
AND builds.processed = 0
AND builds.canceled = 0
- AND builds.id NOT IN (SELECT build_id FROM allocated_builds)
-ORDER BY build_allocation_plan.ordering ASC"
- #:cache? #t))
- (output-conflicts-statement
+ AND builds.id NOT IN (SELECT build_id FROM allocated_builds)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:uuid build-id)
+
+ (sqlite-step-and-reset statement)))))
+
+(define-method (datastore-check-if-derivation-conflicts?
+ (datastore <sqlite-datastore>)
+ agent-id
+ derivation-id)
+ (call-with-worker-thread
+ (slot-ref datastore 'worker-reader-thread-channel)
+ (lambda (db)
+ (let ((statement
(sqlite-prepare
db
"
@@ -3463,34 +3343,11 @@ WHERE build_derivation_outputs.derivation_id = :derivation_id
allocated_builds_derivation_outputs.output_id"
#:cache? #t)))
- (define (get-build-to-allocate)
- (let loop ((build-details (sqlite-step statement)))
- (match build-details
- (#f #f)
- (#(uuid derivation-id derivation-name derived_priority)
-
- (sqlite-bind-arguments output-conflicts-statement
- #:agent_id agent-id
- #:derivation_id derivation-id)
-
- (if (eq? (sqlite-step-and-reset output-conflicts-statement)
- #f)
- `((uuid . ,uuid)
- (derivation_name . ,derivation-name)
- (derived_priority . ,derived_priority))
- (loop (sqlite-step statement)))))))
-
- (sqlite-bind-arguments
- statement
- #:agent_id agent-id)
-
- (let ((result (get-build-to-allocate)))
- (sqlite-reset statement)
-
- result)))
+ (sqlite-bind-arguments statement
+ #:agent_id agent-id
+ #:derivation_id derivation-id)
- #:readonly? #t
- #:duration-metric-name "fetch_builds_to_allocate"))
+ (->bool (sqlite-step-and-reset statement))))))
(define-method (datastore-insert-to-allocated-builds
(datastore <sqlite-datastore>)
@@ -3515,25 +3372,6 @@ INSERT INTO allocated_builds (build_id, agent_id) VALUES "
", ")
";")))))
-(define-method (datastore-remove-builds-from-plan
- (datastore <sqlite-datastore>)
- build-uuids)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (sqlite-exec
- db
- (string-append
- "
-DELETE FROM build_allocation_plan
-WHERE build_id IN ("
- (string-join
- (map (lambda (build-uuid)
- (number->string (db-find-build-id db build-uuid)))
- build-uuids)
- ", ")
- ")")))))
-
(define-method (datastore-list-allocation-plan-builds
(datastore <sqlite-datastore>)
.