aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/datastore/sqlite.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator/datastore/sqlite.scm')
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm1636
1 files changed, 900 insertions, 736 deletions
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index 5e39849..bb1d8e8 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -10,8 +10,11 @@
#:use-module (ice-9 exceptions)
#:use-module (sqlite3)
#:use-module (fibers)
+ #:use-module (knots parallelism)
+ #:use-module (knots thread-pool)
#:use-module (prometheus)
#:use-module (guix base16)
+ #:use-module (guix progress)
#:use-module (guix derivations)
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator utils fibers)
@@ -87,7 +90,7 @@
datastore-replace-agent-tags
datastore-list-processed-builds
datastore-list-unprocessed-builds
- datastore-find-first-unallocated-deferred-build
+ datastore-find-deferred-build
datastore-fetch-prioritised-unprocessed-builds
datastore-insert-unprocessed-hook-event
datastore-count-unprocessed-hook-events
@@ -96,39 +99,53 @@
datastore-delete-unprocessed-hook-event
datastore-list-agent-builds
datastore-agent-for-build
- datastore-count-build-allocation-plan-entries
datastore-replace-build-allocation-plan
- datastore-remove-build-from-allocation-plan
datastore-count-allocated-builds
datastore-agent-requested-systems
datastore-update-agent-requested-systems
datastore-fetch-build-to-allocate
+ datastore-check-if-derivation-conflicts?
datastore-insert-to-allocated-builds
- datastore-remove-builds-from-plan
- datastore-list-allocation-plan-builds))
+ datastore-update-allocated-build-submit-outputs
+ datastore-insert-background-job
+ datastore-delete-background-job
+ datastore-select-background-jobs
+ datastore-check-and-correct-unprocessed-builds-all-inputs-built))
+
+(define %transaction-duration-histogram-buckets
+ (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf)))
(define-class <sqlite-datastore> (<abstract-datastore>)
database-file
- worker-reader-thread-channel
- worker-writer-thread-channel
+ reader-thread-pool
+ writer-thread-pool
+ low-priority-writer-thread-channel
+ default-priority-writer-thread-channel
+ high-priority-writer-thread-channel
+ writer-thread-channel-queue-stats
metrics-registry)
(define* (sqlite-datastore database-uri
#:key
update-database?
metrics-registry
- worker-thread-log-exception?)
+ thread-pool-log-exception?)
(define database-file
(string-drop database-uri
(string-length "sqlite://")))
(when update-database?
- (run-sqitch database-file))
+ (retry-on-error
+ (lambda ()
+ (run-sqitch database-file))
+ #:times 2
+ #:delay 5))
(let ((db (db-open database-file)))
(sqlite-exec db "PRAGMA journal_mode=WAL;")
(sqlite-exec db "PRAGMA optimize;")
- (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")
+ (with-time-logging "truncating the WAL"
+ (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);"))
(sqlite-close db))
(let ((datastore (make <sqlite-datastore>)))
@@ -136,141 +153,164 @@
(slot-set! datastore 'database-file database-file)
(slot-set! datastore 'metrics-registry metrics-registry)
- (slot-set!
- datastore
- 'worker-writer-thread-channel
- (make-worker-thread-channel
- (lambda ()
- (let ((db
- (db-open database-file)))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
- (sqlite-exec db "PRAGMA synchronous = NORMAL;")
- (sqlite-exec db "PRAGMA temp_store = MEMORY;")
- (sqlite-exec db "PRAGMA foreign_keys = ON;")
-
- (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;")
-
- (sqlite-exec
- db
- "
-CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
- build_id INTEGER NOT NULL,
- agent_id TEXT NOT NULL,
- ordering INTEGER NOT NULL,
- PRIMARY KEY (agent_id, build_id)
-);")
-
- (list db)))
- #:name "ds write"
- #:destructor
- (let ((writer-thread-destructor-counter
- (make-gauge-metric metrics-registry
- "datastore_writer_thread_close_total")))
- (lambda (db)
- (db-optimize db
- database-file
- metrics-registry
- #:maybe-truncate-wal? #f)
-
- (metric-increment writer-thread-destructor-counter)
- (sqlite-close db)))
- #:lifetime 500
- #:expire-on-exception? #t
-
- ;; SQLite doesn't support parallel writes
- #:parallelism 1
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
- metrics-registry
- "datastore_write_delay_seconds")))
- (lambda (seconds-delayed)
- (metric-observe delay-metric
- ;; TODO exact->inexact to work around
- ;; a bug in guile-prometheus where
- ;; the metric sum will output in the
- ;; exact form including the /q
- (exact->inexact seconds-delayed))
- (log-delay "datastore write" seconds-delayed)
- (when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database write delayed by ~1,2f seconds~%"
- seconds-delayed))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 10)
- (format
- (current-error-port)
- "warning: database write took ~1,2f seconds (~a)~%"
- duration
- proc)))
- #:log-exception? worker-thread-log-exception?))
-
- ;; Make sure the worker thread has initialised, and created the in memory
- ;; tables
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
- (const #t))
-
- (slot-set!
- datastore
- 'worker-reader-thread-channel
- (make-worker-thread-channel
- (lambda ()
- (let ((db
- (db-open database-file #:write? #f)))
- (sqlite-exec db "PRAGMA temp_store = MEMORY;")
- (sqlite-exec db "PRAGMA busy_timeout = 4000;")
- (sqlite-exec db "PRAGMA cache_size = -16000;")
-
- (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;")
-
- (list db)))
- #:name "ds read"
- #:destructor
- (let ((reader-thread-destructor-counter
- (make-gauge-metric metrics-registry
- "datastore_reader_thread_close_total")))
- (lambda (db)
- (metric-increment reader-thread-destructor-counter)
- (sqlite-close db)))
- #:lifetime 50000
- #:expire-on-exception? #t
-
- ;; Use a minimum of 8 and a maximum of 16 threads
- #:parallelism
- (min (max (current-processor-count)
- 8)
- 16)
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
+ (let ((writer-thread-pool
+ (make-thread-pool
+ ;; SQLite doesn't support parallel writes
+ 1
+ #:thread-initializer
+ (lambda ()
+ (let ((db
+ (db-open database-file)))
+ (sqlite-exec db "PRAGMA busy_timeout = 0;")
+ (sqlite-exec db "PRAGMA synchronous = NORMAL;")
+ (sqlite-exec db "PRAGMA temp_store = MEMORY;")
+ (sqlite-exec db "PRAGMA foreign_keys = ON;")
+
+ (list db)))
+ #:name "ds write"
+ #:thread-destructor
+ (let ((writer-thread-destructor-counter
+ (make-gauge-metric metrics-registry
+ "datastore_writer_thread_close_total")))
+ (lambda (db)
+ (db-optimize db
+ database-file
metrics-registry
- "datastore_read_delay_seconds")))
- (lambda (seconds-delayed)
- (metric-observe delay-metric
- ;; TODO exact->inexact to work around
- ;; a bug in guile-prometheus where
- ;; the metric sum will output in the
- ;; exact form including the /q
- (exact->inexact seconds-delayed))
- (log-delay "datastore read" seconds-delayed)
- (when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database read delayed by ~1,2f seconds~%"
- seconds-delayed))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 30)
- (format
- (current-error-port)
- "warning: database read took ~1,2f seconds (~a)~%"
- duration
- proc)))
- #:log-exception? worker-thread-log-exception?))
+ #:maybe-truncate-wal? #f)
+
+ (metric-increment writer-thread-destructor-counter)
+ (sqlite-close db)))
+ #:thread-lifetime 500
+ #:expire-on-exception? #t
+
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "datastore_write_delay_seconds"
+ #:buckets
+ %transaction-duration-histogram-buckets)))
+ (lambda (seconds-delayed proc)
+ (metric-observe delay-metric
+ ;; TODO exact->inexact to work around
+ ;; a bug in guile-prometheus where
+ ;; the metric sum will output in the
+ ;; exact form including the /q
+ (exact->inexact seconds-delayed))
+ (log-delay "datastore write" seconds-delayed)
+ (when (> seconds-delayed 1)
+ (format/safe
+ (current-error-port)
+ "warning: database write delayed by ~1,2f seconds~%"
+ seconds-delayed))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 10)
+ (format/safe
+ (current-error-port)
+ "warning: database write took ~1,2f seconds (~a)~%"
+ duration
+ proc)))
+ #:log-exception? thread-pool-log-exception?)))
+
+ (slot-set! datastore
+ 'writer-thread-pool
+ writer-thread-pool)
+ ;; This is changed in datastore-spawn-fibers
+ (slot-set! datastore
+ 'low-priority-writer-thread-channel
+ (thread-pool-channel writer-thread-pool))
+ (slot-set! datastore
+ 'default-priority-writer-thread-channel
+ (thread-pool-channel writer-thread-pool))
+ (slot-set! datastore
+ 'high-priority-writer-thread-channel
+ (thread-pool-channel writer-thread-pool)))
+
+ (let ((reader-thread-pool
+ (make-thread-pool
+ ;; Use a minimum of 8 and a maximum of 12 threads
+ (min (max (current-processor-count)
+ 8)
+ 12)
+ #:thread-initializer
+ (lambda ()
+ (let ((db
+ (db-open database-file #:write? #f)))
+ (sqlite-exec db "PRAGMA temp_store = MEMORY;")
+ (sqlite-exec db "PRAGMA busy_timeout = 0;")
+ (sqlite-exec db "PRAGMA cache_size = -16000;")
+
+ (list db)))
+ #:thread-destructor
+ (let ((reader-thread-destructor-counter
+ (make-gauge-metric metrics-registry
+ "datastore_reader_thread_close_total")))
+ (lambda (db)
+ (metric-increment reader-thread-destructor-counter)
+ (sqlite-close db)))
+ #:name "ds read"
+ #:thread-lifetime 50000
+ #:expire-on-exception? #t
+
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "datastore_read_delay_seconds"
+ #:buckets
+ %transaction-duration-histogram-buckets)))
+ (lambda (seconds-delayed proc)
+ (metric-observe delay-metric
+ ;; TODO exact->inexact to work around
+ ;; a bug in guile-prometheus where
+ ;; the metric sum will output in the
+ ;; exact form including the /q
+ (exact->inexact seconds-delayed))
+ (log-delay "datastore read" seconds-delayed)
+ (when (> seconds-delayed 1)
+ (format/safe
+ (current-error-port)
+ "warning: database read delayed by ~1,2f seconds~%"
+ seconds-delayed))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 30)
+ (format/safe
+ (current-error-port)
+ "warning: database read took ~1,2f seconds (~a)~%"
+ duration
+ proc)))
+ #:log-exception? thread-pool-log-exception?)))
+
+ (let ((metric (make-gauge-metric
+ metrics-registry
+ "datastore_reader_threads_total")))
+ (metric-set metric
+ (vector-length
+ (thread-pool-proc-vector reader-thread-pool))))
+
+
+ (slot-set! datastore
+ 'reader-thread-pool
+ reader-thread-pool))
datastore))
+(define* (call-with-writer-thread
+ datastore
+ proc
+ #:key priority duration-logger)
+ (call-with-thread
+ (slot-ref datastore 'writer-thread-pool)
+ proc
+ #:duration-logger duration-logger
+ #:channel
+ (slot-ref datastore
+ (assq-ref
+ '((high . high-priority-writer-thread-channel)
+ (default . default-priority-writer-thread-channel)
+ (low . low-priority-writer-thread-channel))
+ (or priority 'default)))))
+
(define (sqlite-step-and-reset statement)
(let ((val (sqlite-step statement)))
(sqlite-reset statement)
@@ -301,11 +341,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
metrics-registry
checkpoint-duration-metric-name
(lambda ()
- (if (> (wal-size) extreme-wal-size-threshold)
- ;; Since the WAL is really getting too big, wait for much longer
- (sqlite-exec db "PRAGMA busy_timeout = 300000;")
- (sqlite-exec db "PRAGMA busy_timeout = 20;"))
-
(let* ((statement
(sqlite-prepare
db
@@ -316,19 +351,17 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
(#(blocked? modified-page-count pages-moved-to-db)
(if (= blocked? 1)
(begin
- (simple-format
+ (simple-format/safe
(current-error-port)
"warning: wal checkpoint blocked\n")
#f)
(begin
- (simple-format
+ (simple-format/safe
(current-error-port)
"wal checkpoint completed (~A, ~A)\n"
modified-page-count
pages-moved-to-db)
#t))))))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
-
result)))
#t))
@@ -344,8 +377,8 @@ PRAGMA optimize;")
(define-method (datastore-optimize
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(db-optimize
db
@@ -354,15 +387,49 @@ PRAGMA optimize;")
(define-method (datastore-spawn-fibers
(datastore <sqlite-datastore>))
+ ;; Queue messages to the writer thread, so that they're handled in a first
+ ;; come first served manor
+ (let ((queues
+ queue-stats
+ (make-discrete-priority-queueing-channels
+ (thread-pool-channel
+ (slot-ref datastore 'writer-thread-pool))
+ 3)))
+ (match queues
+ ((high default low)
+ (slot-set! datastore 'high-priority-writer-thread-channel high)
+ (slot-set! datastore 'default-priority-writer-thread-channel default)
+ (slot-set! datastore 'low-priority-writer-thread-channel low)))
+ (slot-set! datastore
+ 'writer-thread-channel-queue-stats
+ queue-stats))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (sleep 20)
+ (let ((procs
+ (vector->list
+ (thread-pool-proc-vector
+ (slot-ref datastore 'reader-thread-pool)))))
+ (when (every procedure? procs)
+ (for-each
+ (lambda (i proc)
+ (simple-format/safe (current-error-port)
+ "reader thread ~A running: ~A\n"
+ i proc))
+ (iota (length procs))
+ procs))))))
+
(spawn-fiber
(lambda ()
(while #t
(sleep (* 60 10)) ; 10 minutes
(with-exception-handler
(lambda (exn)
- (simple-format (current-error-port)
- "exception when performing WAL checkpoint: ~A\n"
- exn))
+ (simple-format/safe (current-error-port)
+ "exception when performing WAL checkpoint: ~A\n"
+ exn))
(lambda ()
(with-time-logging
"performing regular database maintenance"
@@ -387,11 +454,18 @@ PRAGMA optimize;")
(let ((setup-failures-total
(make-gauge-metric registry
"setup_failures_total"
- #:labels '(agent_id reason))))
-
- (letpar& ((setup-failure-counts
- (with-time-logging "counting setup failures"
- (datastore-count-setup-failures datastore))))
+ #:labels '(agent_id reason)))
+ (background-jobs-inserted-total
+ (make-counter-metric registry
+ "coordinator_background_job_inserted_total"
+ #:labels '(name))))
+
+ (fibers-let ((setup-failure-counts
+ (with-time-logging "counting setup failures"
+ (datastore-count-setup-failures datastore)))
+ (background-job-counts
+ (with-time-logging "counting background jobs"
+ (datastore-count-background-jobs datastore))))
(for-each (match-lambda
(((agent-id reason) . count)
@@ -400,7 +474,14 @@ PRAGMA optimize;")
#:label-values
`((agent_id . ,agent-id)
(reason . ,reason)))))
- setup-failure-counts)))
+ setup-failure-counts)
+
+ (for-each (match-lambda
+ ((type . count)
+ (metric-increment background-jobs-inserted-total
+ #:by count
+ #:label-values `((name . ,type)))))
+ background-job-counts)))
#t)
(define-method (datastore-update-metrics!
@@ -433,12 +514,38 @@ PRAGMA optimize;")
"datastore_wal_bytes")
(make-gauge-metric
registry "datastore_wal_bytes"
- #:docstring "Size of the SQLite Write Ahead Log file"))))
+ #:docstring "Size of the SQLite Write Ahead Log file")))
+ (reader-threads-used-metric
+ (or (metrics-registry-fetch-metric registry
+ "datastore_reader_threads_used_total")
+ (make-gauge-metric
+ registry "datastore_reader_threads_used_total")))
+ (writer-queue-length-metric
+ (or (metrics-registry-fetch-metric registry
+ "datastore_writer_queue_total")
+ (make-gauge-metric
+ registry "datastore_writer_queue_total"
+ #:labels '(priority)))))
- (letpar& ((build-counts
- (datastore-count-builds datastore))
- (build-result-counts
- (datastore-count-build-results datastore)))
+ (metric-set reader-threads-used-metric
+ (vector-count
+ (lambda (_ proc)
+ (->bool proc))
+ (thread-pool-proc-vector
+ (slot-ref datastore 'reader-thread-pool))))
+
+ (for-each
+ (lambda (priority stats)
+ (metric-set writer-queue-length-metric
+ (assq-ref stats 'length)
+ #:label-values `((priority . ,priority))))
+ '(high default low)
+ ((slot-ref datastore 'writer-thread-channel-queue-stats)))
+
+ (fibers-let ((build-counts
+ (datastore-count-builds datastore))
+ (build-result-counts
+ (datastore-count-build-results datastore)))
(for-each (match-lambda
((system . count)
(metric-set builds-total
@@ -471,9 +578,10 @@ PRAGMA optimize;")
internal-time-units-per-second))
(apply values vals)))))
-(define (metric-observe-duration datastore
- thing
- duration-seconds)
+(define* (metric-observe-duration datastore
+ thing
+ duration-seconds
+ #:key (buckets %default-histogram-buckets))
(define registry (slot-ref datastore 'metrics-registry))
(define metric-name
(string-append "datastore_" thing "_duration_seconds"))
@@ -481,15 +589,25 @@ PRAGMA optimize;")
(let ((metric
(or (metrics-registry-fetch-metric registry metric-name)
(make-histogram-metric registry
- metric-name))))
+ metric-name
+ #:buckets buckets))))
(metric-observe metric duration-seconds)))
-(define (call-with-worker-thread/delay-logging channel proc)
- (call-with-worker-thread channel
+(define (call-with-thread/delay-logging thread-pool proc)
+ (call-with-thread thread-pool
+ proc
+ #:duration-logger
+ (lambda (duration)
+ (log-delay proc duration))))
+
+(define* (call-with-writer-thread/delay-logging datastore proc
+ #:key priority)
+ (call-with-writer-thread datastore
proc
#:duration-logger
(lambda (duration)
- (log-delay proc duration))))
+ (log-delay proc duration))
+ #:priority priority))
(define-exception-type &transaction-rollback-exception &exception
make-transaction-rollback-exception
@@ -503,21 +621,24 @@ PRAGMA optimize;")
#:key
readonly?
(immediate? (not readonly?))
- duration-metric-name)
+ priority
+ duration-metric-name
+ (duration-metric-buckets
+ %transaction-duration-histogram-buckets))
(define (run-proc-within-transaction db)
(define (attempt-begin)
(with-exception-handler
(lambda (exn)
(match (exception-args exn)
(('sqlite-exec 5 msg)
- (simple-format
+ (simple-format/safe
(current-error-port)
"warning: issue starting transaction (code: 5, proc: ~A): ~A\n"
proc msg)
#f)
(_
- (simple-format (current-error-port)
- "exception starting transaction\n")
+ (simple-format/safe (current-error-port)
+ "exception starting transaction\n")
(raise-exception exn))))
(lambda ()
(sqlite-exec db (if immediate?
@@ -531,14 +652,14 @@ PRAGMA optimize;")
(lambda (exn)
(match (exception-args exn)
(('sqlite-exec 5 msg)
- (simple-format
+ (simple-format/safe
(current-error-port)
"warning: attempt commit (code: 5, proc: ~A): ~A\n"
proc msg)
#f)
(_
- (simple-format (current-error-port)
- "exception committing transaction\n")
+ (simple-format/safe (current-error-port)
+ "exception committing transaction\n")
(raise-exception exn))))
(lambda ()
(sqlite-exec db "COMMIT TRANSACTION;")
@@ -546,63 +667,88 @@ PRAGMA optimize;")
#:unwind? #t))
(if (attempt-begin)
- (call-with-values
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (if (transaction-rollback-exception? exn)
- (begin
- (sqlite-exec db "ROLLBACK TRANSACTION;")
- (transaction-rollback-exception-return-value exn))
- (begin
- (simple-format (current-error-port)
- "error: sqlite rolling back transaction (~A)\n"
- exn)
- (sqlite-exec db "ROLLBACK TRANSACTION;")
- (raise-exception exn))))
+ (with-exception-handler
+ (lambda (exn)
+ (if (transaction-rollback-exception? exn)
+ (begin
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (transaction-rollback-exception-return-value exn))
+ (begin
+ (simple-format/safe
+ (current-error-port)
+ "error: sqlite rolling back transaction (~A)\n"
+ exn)
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (raise-exception exn))))
+ (lambda ()
+ (call-with-values
(lambda ()
- (parameterize ((%current-transaction-proc proc))
- (call-with-delay-logging proc #:args (list db))))
- #:unwind? #t))
- (lambda vals
- (let loop ((success? (attempt-commit)))
- (if success?
- (apply values vals)
- (loop (attempt-commit))))))
+ (with-exception-handler
+ (lambda (exn)
+ (unless (transaction-rollback-exception? exn)
+ (backtrace))
+ (raise-exception exn))
+ (lambda ()
+ (parameterize ((%current-transaction-proc proc)
+ ;; Set the arguments parameter for the
+ ;; reader thread pool so that any nested
+ ;; calls to call-with-thread for the
+ ;; reader thread pool just use the writer
+ ;; db connection and thus this
+ ;; transaction
+ ((thread-pool-arguments-parameter
+ (slot-ref datastore 'reader-thread-pool))
+ (list db)))
+ (call-with-delay-logging proc #:args (list db))))))
+ (lambda vals
+ (let loop ((success? (attempt-commit)))
+ (if success?
+ (apply values vals)
+ (loop (attempt-commit)))))))
+ #:unwind? #t)
;; Database is busy, so retry
(run-proc-within-transaction db)))
- (call-with-worker-thread
+ (call-with-thread
(slot-ref datastore (if readonly?
- 'worker-reader-thread-channel
- 'worker-writer-thread-channel))
+ 'reader-thread-pool
+ 'writer-thread-pool))
(lambda (db)
(if (%current-transaction-proc)
(call-with-delay-logging proc #:args (list db)) ; already in transaction
(run-proc-within-transaction db)))
+ #:channel
+ (if readonly?
+ (thread-pool-channel
+ (slot-ref datastore 'reader-thread-pool))
+ (slot-ref datastore
+ (assq-ref
+ '((high . high-priority-writer-thread-channel)
+ (default . default-priority-writer-thread-channel)
+ (low . low-priority-writer-thread-channel))
+ (or priority 'default))))
#:duration-logger
(lambda (duration-seconds)
(when (and (not readonly?)
(> duration-seconds 2))
- (display
- (format
- #f
- "warning: ~a:\n took ~4f seconds in transaction\n"
- proc
- duration-seconds)
- (current-error-port))
-
- (when duration-metric-name
- (metric-observe-duration datastore
- duration-metric-name
- duration-seconds))))))
+ (format/safe
+ (current-error-port)
+ "warning: ~a:\n took ~4f seconds in transaction\n"
+ proc
+ duration-seconds))
+
+ (when duration-metric-name
+ (metric-observe-duration datastore
+ duration-metric-name
+ duration-seconds
+ #:buckets duration-metric-buckets)))))
(define-method (datastore-find-agent
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -630,8 +776,8 @@ SELECT description FROM agents WHERE id = :id"
(define-method (datastore-find-agent-by-name
(datastore <sqlite-datastore>)
name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -658,8 +804,8 @@ SELECT id FROM agents WHERE name = :name"
(define-method (datastore-insert-dynamic-auth-token
(datastore <sqlite-datastore>)
token)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -677,8 +823,8 @@ INSERT INTO dynamic_auth_tokens (token) VALUES (:token)"
(define-method (datastore-dynamic-auth-token-exists?
(datastore <sqlite-datastore>)
token)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -705,8 +851,8 @@ SELECT 1 FROM dynamic_auth_tokens WHERE token = :token"
(define-method (datastore-fetch-agent-tags
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -742,16 +888,16 @@ WHERE agent_tags.agent_id = :agent_id"
uuid
name
description)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(insert-agent db uuid name description)))
#t)
(define-method (datastore-list-agents
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -779,8 +925,8 @@ SELECT id, name, description, active FROM agents ORDER BY id"
(unless (boolean? active?)
(error "datastore-set-agent-active called with non-boolean"))
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -799,8 +945,8 @@ UPDATE agents SET active = :active WHERE id = :uuid"
(define-method (datastore-find-agent-status
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -834,8 +980,8 @@ WHERE agent_id = :agent_id"
1min-load-average
system-uptime
processor-count)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -871,8 +1017,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr
(datastore <sqlite-datastore>)
agent-uuid
password)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(insert-agent-password db agent-uuid password)))
#t)
@@ -881,8 +1027,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr
(datastore <sqlite-datastore>)
uuid
password)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -904,8 +1050,8 @@ WHERE agent_id = :agent_id AND password = :password"
(define-method (datastore-agent-list-passwords
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1002,6 +1148,7 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)"
(lambda (db)
(insert-derivation-and-return-outputs db derivation)
(hash-clear! %derivation-outputs-cache))
+ #:priority 'low
#:duration-metric-name "store_derivation")
#t)
@@ -1009,8 +1156,8 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)"
(define-method (datastore-build-exists-for-derivation-outputs?
(datastore <sqlite-datastore>)
derivation)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1042,8 +1189,8 @@ WHERE derivation_outputs.derivation_id = :derivation_id
(define-method (datastore-build-required-by-another?
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1137,8 +1284,8 @@ SELECT name FROM derivations WHERE id = :id"
(match (sqlite-step-and-reset statement)
(#(name) name))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let loop ((derivation-ids (list (db-find-derivation-id db derivation)))
(result '()))
@@ -1163,8 +1310,8 @@ SELECT name FROM derivations WHERE id = :id"
args)
(apply
(lambda* (system #:key include-cancelled?)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1252,8 +1399,8 @@ FROM (
(define-method (datastore-list-builds-for-derivation-recursive-inputs
(datastore <sqlite-datastore>)
derivation)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1290,8 +1437,8 @@ INNER JOIN related_derivations
(define-method (datastore-find-unprocessed-build-entry
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1320,8 +1467,8 @@ WHERE build_id = :build_id"
(datastore <sqlite-datastore>)
build-uuid
tags)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((insert-tag-statement
(sqlite-prepare
@@ -1521,8 +1668,8 @@ WHERE build_id = :build_id"
uuid
explicit-priority-lower-bound)
(define builds-to-consider
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
;; Recursively find builds for all missing outputs that this build
;; takes as inputs. The order is important here, since we want to
@@ -1672,8 +1819,8 @@ WHERE build_id = :build_id"
override-derived-priority)
(let ((build-id
old-priority
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((build-id
(db-find-build-id db uuid)))
@@ -1730,46 +1877,10 @@ WHERE build_id = :build_id"
#t)
rest))
-(define-method (datastore-remove-build-from-allocation-plan
- (datastore <sqlite-datastore>)
- uuid)
- (define (update-build-allocation-plan-metrics)
- (let ((allocation-plan-metric
- (metrics-registry-fetch-metric
- (slot-ref datastore 'metrics-registry)
- "build_allocation_plan_total")))
- (for-each
- (match-lambda
- ((agent-id . count)
- (metric-set allocation-plan-metric
- count
- #:label-values
- `((agent_id . ,agent-id)))))
- (datastore-count-build-allocation-plan-entries datastore))))
-
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (let ((statement (sqlite-prepare
- db
- "
-DELETE FROM build_allocation_plan WHERE build_id = :build_id"
- #:cache? #t)))
-
- (sqlite-bind-arguments
- statement
- #:build_id (db-find-build-id db uuid))
-
- (sqlite-step-and-reset statement)
-
- (unless (= 0 (changes-count db))
- (update-build-allocation-plan-metrics)))))
- #t)
-
(define-method (datastore-count-build-results
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1827,8 +1938,8 @@ VALUES (:agent_id, :result, 1)"
#t))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1879,14 +1990,12 @@ LIMIT 1"
(#f #t)
(#(1) #f))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (let ((builds-statement
- (sqlite-prepare
- db
- "
-SELECT DISTINCT unprocessed_builds.id
+ (define (all-build-ids db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT input_builds.id
FROM builds
INNER JOIN derivation_outputs
ON builds.derivation_id = derivation_outputs.derivation_id
@@ -1894,45 +2003,62 @@ INNER JOIN derivation_outputs AS all_derivation_outputs
ON all_derivation_outputs.output_id = derivation_outputs.output_id
INNER JOIN derivation_inputs
ON derivation_inputs.derivation_output_id = all_derivation_outputs.id
-INNER JOIN builds AS unprocessed_builds
- ON unprocessed_builds.processed = 0
- AND unprocessed_builds.derivation_id = derivation_inputs.derivation_id
-INNER JOIN unprocessed_builds_with_derived_priorities
- ON unprocessed_builds_with_derived_priorities.build_id = unprocessed_builds.id
- AND unprocessed_builds_with_derived_priorities.all_inputs_built = 0
+INNER JOIN builds AS input_builds
+ ON input_builds.processed = 0
+ AND input_builds.canceled = 0
+ AND input_builds.derivation_id = derivation_inputs.derivation_id
WHERE builds.id = :build_id"
- #:cache? #t))
+ #:cache? #t)))
- (update-statement
- (sqlite-prepare
- db
- "
+ (sqlite-bind-arguments
+ statement
+ #:build_id (db-find-build-id db build-uuid))
+
+ (let ((result
+ (sqlite-fold
+ (lambda (row result)
+ (match row
+ (#(build-id)
+ (if (all-inputs-built? db build-id)
+ (cons build-id result)
+ result))))
+ '()
+ statement)))
+ (sqlite-reset statement)
+
+ result)))
+
+ (let ((build-ids
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (all-build-ids db)))))
+ (call-with-writer-thread/delay-logging
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
UPDATE unprocessed_builds_with_derived_priorities
SET all_inputs_built = 1
WHERE build_id = :build_id"
- #:cache? #t)))
+ #:cache? #t)))
- (sqlite-bind-arguments builds-statement
- #:build_id (db-find-build-id db build-uuid))
-
- (sqlite-fold
- (lambda (row result)
- (match row
- (#(build-id)
- (when (all-inputs-built? db build-id)
- (sqlite-bind-arguments update-statement
- #:build_id build-id)
- (sqlite-step-and-reset update-statement))))
- #f)
- #f
- builds-statement)
- (sqlite-reset builds-statement)))))
+ (for-each
+ (lambda (build-id)
+ (sqlite-bind-arguments statement
+ #:build_id build-id)
+ (sqlite-step-and-reset statement))
+ build-ids)
+
+ #t)))))
(define-method (datastore-remove-build-allocation
(datastore <sqlite-datastore>)
build-uuid agent-id)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1954,8 +2080,8 @@ DELETE FROM allocated_builds
(define-method (datastore-mark-build-as-processed
(datastore <sqlite-datastore>)
build-uuid end-time)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1991,8 +2117,8 @@ DELETE FROM unprocessed_builds_with_derived_priorities
(define-method (datastore-delete-relevant-outputs-from-unbuilt-outputs
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2019,8 +2145,8 @@ WHERE output_id IN (
(datastore <sqlite-datastore>)
build-uuid
output-metadata)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(define (name->output-id name)
(let ((statement
@@ -2097,8 +2223,8 @@ INSERT INTO build_starts (
(define-method (datastore-find-build-starts
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2207,8 +2333,8 @@ INSERT INTO setup_failure_missing_inputs (
(define-method (datastore-list-setup-failure-missing-inputs
(datastore <sqlite-datastore>)
setup-failure-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2236,8 +2362,8 @@ WHERE setup_failure_id = :id"
build-uuid
agent-id
failure-reason)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(insert-setup-failure-and-remove-allocation db
(db-find-build-id db build-uuid)
@@ -2254,8 +2380,8 @@ WHERE setup_failure_id = :id"
(define-method (datastore-count-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2278,8 +2404,8 @@ FROM builds_counts"
(define-method (datastore-for-each-build
(datastore <sqlite-datastore>)
proc)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2318,8 +2444,8 @@ SELECT id FROM builds WHERE uuid = :uuid"
(define-method (datastore-find-build
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2377,12 +2503,14 @@ WHERE uuid = :uuid"
(canceled 'unset)
(priority-> 'unset)
(priority-< 'unset)
+ (created-at-> 'unset)
+ (created-at-< 'unset)
(after-id #f)
(limit #f)
;; other-builds-dependent or no-dependent-builds
(relationship 'unset))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(define tag->expression
(let ((statement
@@ -2395,7 +2523,7 @@ SELECT id FROM tags WHERE key = :key AND value = :value"
(sqlite-prepare
db
"
-SELECT id FROM tags WHERE key = :key"
+SELECT 1 FROM tags WHERE key = :key LIMIT 1"
#:cache? #t)))
(lambda (tag not?)
(match tag
@@ -2416,28 +2544,27 @@ SELECT id FROM tags WHERE key = :key"
(sqlite-bind-arguments key-statement
#:key key)
- (let* ((tag-ids (sqlite-map
- (match-lambda
- (#(id) id))
- key-statement))
- (result
- (string-append
- "("
- (string-join
- (map
- (lambda (id)
+ (let ((tag-with-key-exists?
+ (->bool (sqlite-step-and-reset key-statement))))
+ (if tag-with-key-exists?
+ (let ((result
(string-append
+ "("
(if not? "NOT " "")
- "EXISTS (SELECT 1 FROM build_tags "
- "WHERE build_id = builds.id AND tag_id = "
- (number->string id)
- ")"))
- tag-ids)
- (if not? " AND " " OR "))
- ")")))
- (sqlite-reset key-statement)
-
- result))))))
+ "
+EXISTS (
+ SELECT 1
+ FROM build_tags
+ INNER JOIN tags ON build_tags.tag_id = tags.id
+ WHERE build_id = builds.id
+ AND tags.key = '"
+ key "'
+)"
+ ")")))
+ result)
+ (if not?
+ "TRUE"
+ "FALSE"))))))))
(let ((tag-expressions
(map (lambda (tag)
@@ -2459,6 +2586,8 @@ SELECT id FROM tags WHERE key = :key"
(not (null? not-systems))
(not (eq? priority-> 'unset))
(not (eq? priority-< 'unset))
+ (not (eq? created-at-> 'unset))
+ (not (eq? created-at-< 'unset))
(not (eq? processed 'unset))
(not (eq? canceled 'unset))
(not (eq? relationship 'unset))
@@ -2509,6 +2638,14 @@ INNER JOIN derivations
(list
(simple-format #f "priority < ~A" priority-<))
'())
+ (if (string? created-at->)
+ (list
+ (simple-format #f "created_at > '~A'" created-at->))
+ '())
+ (if (string? created-at-<)
+ (list
+ (simple-format #f "created_at < '~A'" created-at-<))
+ '())
(cond
((eq? processed #t) '("processed = 1"))
((eq? processed #f) '("processed = 0"))
@@ -2612,8 +2749,8 @@ WHERE derivation_outputs.derivation_id = builds.derivation_id)"))
(define-method (datastore-fetch-build-tags
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2647,8 +2784,8 @@ WHERE build_tags.build_id = :build_id"
(define-method (datastore-find-build-result
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2673,8 +2810,8 @@ WHERE build_id = :build_id"
(define-method (datastore-find-build-derivation-system
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2710,8 +2847,8 @@ WHERE builds.id = :build_id"
datastore
"list_builds_for_output"
(lambda ()
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2759,8 +2896,8 @@ WHERE derivation_outputs.output_id = :output_id"
rest)
(apply
(lambda* (output system #:key include-canceled?)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2804,8 +2941,8 @@ WHERE derivation_outputs.output_id = :output_id
rest)
(apply
(lambda* (derivation #:key (include-canceled? #t))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2840,8 +2977,8 @@ WHERE derivations.name = :derivation"
(define-method (datastore-count-setup-failures
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2865,8 +3002,8 @@ GROUP BY agent_id, failure_reason"
(define-method (datastore-list-setup-failures-for-build
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2898,8 +3035,8 @@ WHERE build_id = :build_id"
args)
(apply
(lambda* (#:key agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2950,8 +3087,8 @@ WHERE builds.processed = 0
(define-method (datastore-list-processed-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2977,8 +3114,8 @@ WHERE processed = 1"
(define-method (datastore-list-unprocessed-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3012,10 +3149,11 @@ ORDER BY priority DESC"
builds)))))
-(define-method (datastore-find-first-unallocated-deferred-build
- (datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+(define-method (datastore-find-deferred-build
+ (datastore <sqlite-datastore>)
+ select?)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3028,25 +3166,29 @@ INNER JOIN derivations
WHERE processed = 0
AND canceled = 0
AND deferred_until IS NOT NULL
- AND builds.id NOT IN (SELECT build_id FROM build_allocation_plan)
-ORDER BY deferred_until ASC
-LIMIT 1"
+ORDER BY deferred_until ASC"
#:cache? #t)))
- (match (sqlite-step-and-reset statement)
- (#(uuid derivation_name priority created_at deferred_until)
- `((uuid . ,uuid)
- (derivation-name . ,derivation_name)
- (priority . ,priority)
- (created-at . ,(if (string? created_at)
- (string->date created_at
- "~Y-~m-~d ~H:~M:~S")
- #f))
- (deferred-until . ,(if (string? deferred_until)
- (string->date deferred_until
- "~Y-~m-~d ~H:~M:~S")
- #f))))
- (#f #f))))))
+ (let loop ((row (sqlite-step statement)))
+ (match row
+ (#(uuid derivation_name priority created_at deferred_until)
+ (let ((res
+ (select?
+ `((uuid . ,uuid)
+ (derivation-name . ,derivation_name)
+ (priority . ,priority)
+ (created-at . ,(if (string? created_at)
+ (string->date created_at
+ "~Y-~m-~d ~H:~M:~S")
+ #f))
+ (deferred-until . ,(if (string? deferred_until)
+ (string->date deferred_until
+ "~Y-~m-~d ~H:~M:~S")
+ #f))))))
+ (if res
+ res
+ (loop (sqlite-step statement)))))
+ (#f #f)))))))
(define-method (datastore-fetch-prioritised-unprocessed-builds
(datastore <sqlite-datastore>))
@@ -3055,9 +3197,11 @@ LIMIT 1"
(sqlite-prepare
db
"
-SELECT builds.uuid
+SELECT builds.uuid, systems.system
FROM unprocessed_builds_with_derived_priorities
INNER JOIN builds ON build_id = builds.id
+INNER JOIN derivations ON builds.derivation_id = derivations.id
+INNER JOIN systems ON derivations.system_id = systems.id
WHERE all_inputs_built = 1
AND NOT EXISTS (
SELECT 1
@@ -3067,26 +3211,21 @@ WHERE all_inputs_built = 1
) ORDER BY derived_priority ASC"
#:cache? #t)))
- (let ((result (sqlite-fold
- (lambda (row result)
- (cons (vector-ref row 0)
- result))
- '()
- statement)))
+ (let ((result (sqlite-fold cons '() statement)))
(sqlite-reset statement)
result)))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
fetch-prioritised-unprocessed-builds))
(define-method (datastore-insert-unprocessed-hook-event
(datastore <sqlite-datastore>)
event
arguments)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(insert-unprocessed-hook-event db
event
@@ -3116,8 +3255,8 @@ VALUES (:event, :arguments)"
(define-method (datastore-count-unprocessed-hook-events
(datastore <sqlite-datastore>))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3140,8 +3279,8 @@ SELECT event, COUNT(*) FROM unprocessed_hook_events GROUP BY event"
(datastore <sqlite-datastore>)
event
limit)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3174,8 +3313,8 @@ LIMIT :limit"
(define-method (datastore-find-unprocessed-hook-event
(datastore <sqlite-datastore>)
id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3201,8 +3340,8 @@ WHERE id = :id"
(define-method (datastore-delete-unprocessed-hook-event
(datastore <sqlite-datastore>)
id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3215,110 +3354,14 @@ DELETE FROM unprocessed_hook_events WHERE id = :id"
statement
#:id id)
- (sqlite-step-and-reset statement))))
- #t)
-
-(define-method (datastore-count-build-allocation-plan-entries
- (datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (let ((statement
- (sqlite-prepare
- db
- "
-SELECT agent_id, COUNT(*)
-FROM build_allocation_plan
-GROUP BY agent_id"
- #:cache? #t)))
-
- (let ((result
- (sqlite-map
- (match-lambda
- (#(agent_id count)
- (cons agent_id count)))
- statement)))
- (sqlite-reset statement)
-
- result)))))
-
-(define-method (datastore-replace-build-allocation-plan
- (datastore <sqlite-datastore>)
- planned-builds)
- (define (clear-current-plan db)
- (sqlite-exec
- db
- "DELETE FROM build_allocation_plan"))
-
- (define insert-sql
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (string-append
- "
-INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES "
- (string-join
- (map (match-lambda
- ((build-uuid agent-id ordering)
- (simple-format
- #f
- "('~A', '~A', ~A)"
- (db-find-build-id db build-uuid)
- agent-id
- ordering)))
- planned-builds)
- ", ")
- ";"))))
-
- (define (insert-new-plan db planned-builds)
- (sqlite-exec
- db
- insert-sql))
-
- (datastore-call-with-transaction
- datastore
- (lambda (db)
- (clear-current-plan db)
- (unless (null? planned-builds)
- (insert-new-plan db planned-builds)))
- #:duration-metric-name "replace_build_allocation_plan")
-
- (let* ((agent-ids
- (map (lambda (agent-details)
- (assq-ref agent-details 'uuid))
- (datastore-list-agents datastore)))
- (counts-by-agent
- (make-vector (length agent-ids) 0)))
- (for-each
- (match-lambda
- ((_ agent-id _)
- (let ((index (list-index (lambda (list-agent-id)
- (string=? agent-id list-agent-id))
- agent-ids)))
- (vector-set! counts-by-agent
- index
- (+ (vector-ref counts-by-agent
- index)
- 1)))))
- planned-builds)
-
- (let ((metric
- (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry)
- "build_allocation_plan_total")))
- (for-each
- (lambda (index agent-id)
- (metric-set metric
- (vector-ref counts-by-agent index)
- #:label-values
- `((agent_id . ,agent-id))))
- (iota (length agent-ids))
- agent-ids)))
+ (sqlite-step-and-reset statement)))
+ #:priority 'high)
#t)
(define-method (datastore-count-allocated-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3340,8 +3383,8 @@ SELECT agent_id, COUNT(*) FROM allocated_builds GROUP BY agent_id"
(define-method (datastore-agent-requested-systems
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3414,34 +3457,40 @@ INSERT INTO build_allocation_agent_requested_systems (agent_id, system_id) VALUE
(define-method (datastore-fetch-build-to-allocate
(datastore <sqlite-datastore>)
- agent-id)
- (datastore-call-with-transaction
- datastore
+ build-id)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
db
- ;; This needs to guard against the plan being out of date
"
SELECT builds.uuid, derivations.id, derivations.name,
unprocessed_builds_with_derived_priorities.derived_priority
FROM builds
-INNER JOIN build_allocation_plan
- ON builds.id = build_allocation_plan.build_id
INNER JOIN derivations
ON builds.derivation_id = derivations.id
-INNER JOIN build_allocation_agent_requested_systems
- ON build_allocation_agent_requested_systems.agent_id = :agent_id
- AND build_allocation_agent_requested_systems.system_id = derivations.system_id
LEFT JOIN unprocessed_builds_with_derived_priorities
ON unprocessed_builds_with_derived_priorities.build_id = builds.id
-WHERE build_allocation_plan.agent_id = :agent_id
+WHERE builds.uuid = :uuid
AND builds.processed = 0
AND builds.canceled = 0
- AND builds.id NOT IN (SELECT build_id FROM allocated_builds)
-ORDER BY build_allocation_plan.ordering ASC"
- #:cache? #t))
- (output-conflicts-statement
+ AND builds.id NOT IN (SELECT build_id FROM allocated_builds)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:uuid build-id)
+
+ (sqlite-step-and-reset statement)))))
+
+(define-method (datastore-check-if-derivation-conflicts?
+ (datastore <sqlite-datastore>)
+ agent-id
+ derivation-id)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
(sqlite-prepare
db
"
@@ -3459,149 +3508,63 @@ WHERE build_derivation_outputs.derivation_id = :derivation_id
allocated_builds_derivation_outputs.output_id"
#:cache? #t)))
- (define (get-build-to-allocate)
- (let loop ((build-details (sqlite-step statement)))
- (match build-details
- (#f #f)
- (#(uuid derivation-id derivation-name derived_priority)
+ (sqlite-bind-arguments statement
+ #:agent_id agent-id
+ #:derivation_id derivation-id)
- (sqlite-bind-arguments output-conflicts-statement
- #:agent_id agent-id
- #:derivation_id derivation-id)
+ (->bool (sqlite-step-and-reset statement))))))
- (if (eq? (sqlite-step-and-reset output-conflicts-statement)
- #f)
- `((uuid . ,uuid)
- (derivation_name . ,derivation-name)
- (derived_priority . ,derived_priority))
- (loop (sqlite-step statement)))))))
+(define-method (datastore-insert-to-allocated-builds
+ (datastore <sqlite-datastore>)
+ agent-id
+ build-uuid)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO allocated_builds (build_id, agent_id)
+ VALUES (:build_id, :agent_id)"
+ #:cache? #t)))
(sqlite-bind-arguments
statement
+ #:build_id (db-find-build-id db build-uuid)
#:agent_id agent-id)
- (let ((result (get-build-to-allocate)))
- (sqlite-reset statement)
-
- result)))
-
- #:readonly? #t
- #:duration-metric-name "fetch_builds_to_allocate"))
-
-(define-method (datastore-insert-to-allocated-builds
- (datastore <sqlite-datastore>)
- agent-id
- build-uuids)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (sqlite-exec
- db
- (string-append
- "
-INSERT INTO allocated_builds (build_id, agent_id) VALUES "
- (string-join
- (map (lambda (build-uuid)
- (simple-format
- #f
- "(~A, '~A')"
- (db-find-build-id db build-uuid)
- agent-id))
- build-uuids)
- ", ")
- ";")))))
+ (sqlite-step-and-reset statement))))
+ #t)
-(define-method (datastore-remove-builds-from-plan
+(define-method (datastore-update-allocated-build-submit-outputs
(datastore <sqlite-datastore>)
- build-uuids)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ build-uuid
+ submit-outputs?)
+ (call-with-writer-thread
+ datastore
(lambda (db)
- (sqlite-exec
- db
- (string-append
- "
-DELETE FROM build_allocation_plan
-WHERE build_id IN ("
- (string-join
- (map (lambda (build-uuid)
- (number->string (db-find-build-id db build-uuid)))
- build-uuids)
- ", ")
- ")")))))
-
-(define-method (datastore-list-allocation-plan-builds
- (datastore <sqlite-datastore>)
- .
- rest)
- (apply
- (lambda* (agent-id #:key limit)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (let ((statement
- (sqlite-prepare
- db
- (string-append
- "
-SELECT builds.uuid, derivations.name, systems.system,
- builds.priority,
- unprocessed_builds_with_derived_priorities.derived_priority
-FROM builds
-INNER JOIN derivations
- ON builds.derivation_id = derivations.id
-INNER JOIN systems
- ON derivations.system_id = systems.id
-INNER JOIN build_allocation_plan
- ON builds.id = build_allocation_plan.build_id
-LEFT JOIN unprocessed_builds_with_derived_priorities
- ON builds.id = unprocessed_builds_with_derived_priorities.build_id
-WHERE build_allocation_plan.agent_id = :agent_id
- AND builds.processed = 0
- AND builds.id NOT IN (SELECT build_id FROM allocated_builds)
-ORDER BY build_allocation_plan.ordering ASC"
- (if limit
- "
-LIMIT :limit"
- ""))
- #:cache? #t)))
-
- (apply sqlite-bind-arguments
- statement
- #:agent_id agent-id
- (if limit
- (list #:limit limit)
- '()))
-
- (let ((builds (sqlite-map
- (match-lambda
- (#(uuid derivation_name system
- priority derived_priority)
- `((uuid . ,uuid)
- (derivation_name . ,derivation_name)
- (system . ,system)
- (priority . ,priority)
- (derived_priority . ,derived_priority)
- (tags . ,(vector-map
- (lambda (_ tag)
- (match tag
- ((key . value)
- `((key . ,key)
- (value . ,value)))))
- (datastore-fetch-build-tags
- datastore
- uuid))))))
- statement)))
- (sqlite-reset statement)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+UPDATE allocated_builds
+SET submit_outputs = :submit_outputs
+WHERE build_id = :build_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:build_id (db-find-build-id db build-uuid)
+ #:submit_outputs (if submit-outputs? 1 0))
- builds)))))
- rest))
+ (sqlite-step-and-reset statement))))
+ #t)
(define-method (datastore-list-agent-builds
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3609,7 +3572,7 @@ LIMIT :limit"
"
SELECT builds.uuid, derivations.name,
unprocessed_builds_with_derived_priorities.derived_priority,
- builds.canceled
+ builds.canceled, allocated_builds.submit_outputs
FROM builds
INNER JOIN derivations
ON builds.derivation_id = derivations.id
@@ -3626,11 +3589,17 @@ WHERE allocated_builds.agent_id = :agent_id"
(let ((builds (sqlite-map
(match-lambda
- (#(uuid derivation_name derived_priority canceled)
+ (#(uuid derivation_name derived_priority canceled
+ submit_outputs)
`((uuid . ,uuid)
(derivation_name . ,derivation_name)
(derived_priority . ,derived_priority)
- (canceled . ,(= 1 canceled)))))
+ (canceled . ,(= 1 canceled))
+ (submit_outputs . ,(cond
+ ((not submit_outputs)
+ 'null)
+ (else
+ (= 1 submit_outputs)))))))
statement)))
(sqlite-reset statement)
@@ -3639,8 +3608,8 @@ WHERE allocated_builds.agent_id = :agent_id"
(define-method (datastore-agent-for-build
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3702,13 +3671,14 @@ WHERE build_results.build_id = :build_id"
"_sqitch_registry.db")
(string-append "db:sqlite:" database-file))))
- (simple-format #t "running command: ~A\n"
- (string-join command))
- (unless (zero? (apply system* command))
- (simple-format
- (current-error-port)
- "error: sqitch command failed\n")
- (exit 1))))
+ (simple-format/safe #t "running command: ~A\n"
+ (string-join command))
+ (let ((pid (spawn (%config 'sqitch) command)))
+ (unless (zero? (cdr (waitpid pid)))
+ (simple-format/safe
+ (current-error-port)
+ "error: sqitch command failed\n")
+ (exit 1)))))
(define (changes-count db)
(let ((statement
@@ -3795,16 +3765,16 @@ WHERE name = :name"
(define-method (datastore-find-derivation
(datastore <sqlite-datastore>)
name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(db-find-derivation db name))))
(define-method (datastore-find-derivation-outputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3836,8 +3806,8 @@ WHERE derivation_id = :derivation_id"
(define-method (datastore-find-derivation-output-details
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3879,8 +3849,8 @@ WHERE derivation_id = :derivation_id"
(define-method (datastore-list-unbuilt-derivation-outputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3913,8 +3883,8 @@ WHERE derivation_id = :derivation_id"
(define-method (datastore-list-build-outputs
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3958,8 +3928,8 @@ WHERE builds.id = :build_id"
(define-method (datastore-find-derivation-system
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3982,8 +3952,8 @@ WHERE name = :name"
(define-method (datastore-find-derivation-inputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -4023,8 +3993,8 @@ WHERE derivations.id = :derivation_id"
(define-method (datastore-find-recursive-derivation-input-outputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -4065,8 +4035,8 @@ INNER JOIN outputs
(datastore <sqlite-datastore>)
start-derivation-name
output)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -4149,20 +4119,6 @@ SELECT system FROM systems WHERE id = :id"
(define derivation-name
(derivation-file-name derivation))
- (define (maybe-fix-fixed-output-field derivation-details)
- (let ((fixed-output? (fixed-output-derivation? derivation)))
- (unless (equal? (assq-ref derivation-details 'fixed-output?)
- fixed-output?)
- (let ((statement (sqlite-prepare
- db
- "
-UPDATE derivations SET fixed_output = :fixed_output WHERE name = :name"
- #:cache? #t)))
- (sqlite-bind-arguments statement
- #:name derivation-name
- #:fixed_output (if fixed-output? 1 0))
- (sqlite-step-and-reset statement)))))
-
(define (insert-derivation)
(let ((statement
(sqlite-prepare
@@ -4193,8 +4149,6 @@ INSERT INTO derivations (name, system_id, fixed_output)
(db-find-derivation db derivation-name)))
(if derivation-details
(begin
- (maybe-fix-fixed-output-field derivation-details)
-
(let ((derivation-outputs
(select-derivation-outputs db derivation-name)))
@@ -4503,15 +4457,15 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)"
(lambda* (uuid drv-name priority defer-until
#:key skip-updating-other-build-derived-priorities)
(define system-id
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(db-system->system-id
db
(datastore-find-derivation-system datastore drv-name)))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let* ((build-id (insert-build db drv-name uuid priority
defer-until))
@@ -4537,7 +4491,8 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)"
#:args
(list db
build-id
- derived-priority)))))))
+ derived-priority)))))
+ #:priority 'low))
rest)
#t)
@@ -4573,3 +4528,212 @@ VALUES (:agent_id, :password)"
#:password password)
(sqlite-step-and-reset statement)))
+
+(define-method (datastore-insert-background-job
+ (datastore <sqlite-datastore>)
+ type
+ args)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO background_jobs_queue (type, args)
+VALUES (:type, :args)
+RETURNING id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:type (symbol->string type)
+ #:args (call-with-output-string
+ (lambda (port)
+ (write args port))))
+
+ (match (sqlite-step-and-reset statement)
+ (#(id)
+
+ (metric-increment
+ (metrics-registry-fetch-metric
+ (slot-ref datastore 'metrics-registry)
+ "coordinator_background_job_inserted_total")
+ #:label-values `((name . ,type)))
+
+ id))))))
+
+(define-method (datastore-delete-background-job
+ (datastore <sqlite-datastore>)
+ id)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+DELETE FROM background_jobs_queue WHERE id = :id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:id id)
+
+ (sqlite-step-and-reset statement))
+ #t)
+ #:priority 'high))
+
+(define-method (datastore-select-background-jobs
+ (datastore <sqlite-datastore>)
+ .
+ args)
+ (apply
+ (lambda* (type #:key (limit 1))
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT id, args
+FROM background_jobs_queue
+WHERE type = :type
+ORDER BY id ASC
+LIMIT :limit"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:type (symbol->string type)
+ #:limit limit)
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(id args)
+ `((id . ,id)
+ (args . ,(call-with-input-string args read)))))
+ statement)))
+ (sqlite-reset statement)
+ result)))))
+ args))
+
+(define-method (datastore-count-background-jobs
+ (datastore <sqlite-datastore>))
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT type, COUNT(*)
+FROM background_jobs_queue
+GROUP BY type"
+ #:cache? #t)))
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(type count)
+ (cons type count)))
+ statement)))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define-method (datastore-check-and-correct-unprocessed-builds-all-inputs-built
+ (datastore <sqlite-datastore>)
+ .
+ args)
+ (define entries-to-check
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT build_id, builds.derivation_id
+FROM unprocessed_builds_with_derived_priorities
+INNER JOIN builds ON builds.id = build_id
+WHERE all_inputs_built = 0
+ORDER BY build_id DESC"
+ #:cache? #t)))
+ (let ((result (sqlite-map identity statement)))
+ (sqlite-reset statement)
+ result)))))
+
+ (define (all-inputs-built? derivation-id)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT 1
+FROM derivation_inputs
+INNER JOIN derivation_outputs
+ ON derivation_inputs.derivation_output_id = derivation_outputs.id
+INNER JOIN unbuilt_outputs
+ ON unbuilt_outputs.output_id = derivation_outputs.output_id
+WHERE derivation_inputs.derivation_id = :derivation_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:derivation_id derivation-id)
+
+ (match (sqlite-step-and-reset statement)
+ (#(1) #f)
+ (#f #t))))))
+
+ (define (update build-id)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+UPDATE unprocessed_builds_with_derived_priorities
+SET all_inputs_built = 1
+WHERE build_id = :build_id AND all_inputs_built = 0
+RETURNING 1"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:build_id build-id)
+
+ ;; This is to cope with the check not being transactional, so we
+ ;; might go to update a record which has just been changed to have
+ ;; all_inputs_built = 1
+ (match (sqlite-step-and-reset statement)
+ (#(1) #t)
+ (#f #f))))))
+
+ (apply
+ (lambda* (#:key (progress-reporter (const progress-reporter/silent)))
+ (let ((reporter
+ (progress-reporter (length entries-to-check))))
+ (start-progress-reporter! reporter)
+ (let loop ((entries entries-to-check)
+ (updated-count 0))
+ (if (null? entries)
+ (begin
+ (stop-progress-reporter! reporter)
+ updated-count)
+ (match (car entries)
+ (#(build-id derivation-id)
+ (progress-reporter-report! reporter)
+ (if (all-inputs-built? derivation-id)
+ (loop (cdr entries)
+ (+ updated-count
+ (if (update build-id)
+ 1
+ 0)))
+ (loop (cdr entries)
+ updated-count))))))))
+ args))