diff options
Diffstat (limited to 'guix-build-coordinator/datastore/sqlite.scm')
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 1636 |
1 files changed, 900 insertions, 736 deletions
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index 5e39849..bb1d8e8 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -10,8 +10,11 @@ #:use-module (ice-9 exceptions) #:use-module (sqlite3) #:use-module (fibers) + #:use-module (knots parallelism) + #:use-module (knots thread-pool) #:use-module (prometheus) #:use-module (guix base16) + #:use-module (guix progress) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) @@ -87,7 +90,7 @@ datastore-replace-agent-tags datastore-list-processed-builds datastore-list-unprocessed-builds - datastore-find-first-unallocated-deferred-build + datastore-find-deferred-build datastore-fetch-prioritised-unprocessed-builds datastore-insert-unprocessed-hook-event datastore-count-unprocessed-hook-events @@ -96,39 +99,53 @@ datastore-delete-unprocessed-hook-event datastore-list-agent-builds datastore-agent-for-build - datastore-count-build-allocation-plan-entries datastore-replace-build-allocation-plan - datastore-remove-build-from-allocation-plan datastore-count-allocated-builds datastore-agent-requested-systems datastore-update-agent-requested-systems datastore-fetch-build-to-allocate + datastore-check-if-derivation-conflicts? datastore-insert-to-allocated-builds - datastore-remove-builds-from-plan - datastore-list-allocation-plan-builds)) + datastore-update-allocated-build-submit-outputs + datastore-insert-background-job + datastore-delete-background-job + datastore-select-background-jobs + datastore-check-and-correct-unprocessed-builds-all-inputs-built)) + +(define %transaction-duration-histogram-buckets + (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf))) (define-class <sqlite-datastore> (<abstract-datastore>) database-file - worker-reader-thread-channel - worker-writer-thread-channel + reader-thread-pool + writer-thread-pool + low-priority-writer-thread-channel + default-priority-writer-thread-channel + high-priority-writer-thread-channel + writer-thread-channel-queue-stats metrics-registry) (define* (sqlite-datastore database-uri #:key update-database? metrics-registry - worker-thread-log-exception?) + thread-pool-log-exception?) (define database-file (string-drop database-uri (string-length "sqlite://"))) (when update-database? - (run-sqitch database-file)) + (retry-on-error + (lambda () + (run-sqitch database-file)) + #:times 2 + #:delay 5)) (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA optimize;") - (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") + (with-time-logging "truncating the WAL" + (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")) (sqlite-close db)) (let ((datastore (make <sqlite-datastore>))) @@ -136,141 +153,164 @@ (slot-set! datastore 'database-file database-file) (slot-set! datastore 'metrics-registry metrics-registry) - (slot-set! - datastore - 'worker-writer-thread-channel - (make-worker-thread-channel - (lambda () - (let ((db - (db-open database-file))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - (sqlite-exec db "PRAGMA synchronous = NORMAL;") - (sqlite-exec db "PRAGMA temp_store = MEMORY;") - (sqlite-exec db "PRAGMA foreign_keys = ON;") - - (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;") - - (sqlite-exec - db - " -CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( - build_id INTEGER NOT NULL, - agent_id TEXT NOT NULL, - ordering INTEGER NOT NULL, - PRIMARY KEY (agent_id, build_id) -);") - - (list db))) - #:name "ds write" - #:destructor - (let ((writer-thread-destructor-counter - (make-gauge-metric metrics-registry - "datastore_writer_thread_close_total"))) - (lambda (db) - (db-optimize db - database-file - metrics-registry - #:maybe-truncate-wal? #f) - - (metric-increment writer-thread-destructor-counter) - (sqlite-close db))) - #:lifetime 500 - #:expire-on-exception? #t - - ;; SQLite doesn't support parallel writes - #:parallelism 1 - #:delay-logger (let ((delay-metric - (make-histogram-metric - metrics-registry - "datastore_write_delay_seconds"))) - (lambda (seconds-delayed) - (metric-observe delay-metric - ;; TODO exact->inexact to work around - ;; a bug in guile-prometheus where - ;; the metric sum will output in the - ;; exact form including the /q - (exact->inexact seconds-delayed)) - (log-delay "datastore write" seconds-delayed) - (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database write delayed by ~1,2f seconds~%" - seconds-delayed)))) - #:duration-logger - (lambda (duration proc) - (when (> duration 10) - (format - (current-error-port) - "warning: database write took ~1,2f seconds (~a)~%" - duration - proc))) - #:log-exception? worker-thread-log-exception?)) - - ;; Make sure the worker thread has initialised, and created the in memory - ;; tables - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) - (const #t)) - - (slot-set! - datastore - 'worker-reader-thread-channel - (make-worker-thread-channel - (lambda () - (let ((db - (db-open database-file #:write? #f))) - (sqlite-exec db "PRAGMA temp_store = MEMORY;") - (sqlite-exec db "PRAGMA busy_timeout = 4000;") - (sqlite-exec db "PRAGMA cache_size = -16000;") - - (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;") - - (list db))) - #:name "ds read" - #:destructor - (let ((reader-thread-destructor-counter - (make-gauge-metric metrics-registry - "datastore_reader_thread_close_total"))) - (lambda (db) - (metric-increment reader-thread-destructor-counter) - (sqlite-close db))) - #:lifetime 50000 - #:expire-on-exception? #t - - ;; Use a minimum of 8 and a maximum of 16 threads - #:parallelism - (min (max (current-processor-count) - 8) - 16) - #:delay-logger (let ((delay-metric - (make-histogram-metric + (let ((writer-thread-pool + (make-thread-pool + ;; SQLite doesn't support parallel writes + 1 + #:thread-initializer + (lambda () + (let ((db + (db-open database-file))) + (sqlite-exec db "PRAGMA busy_timeout = 0;") + (sqlite-exec db "PRAGMA synchronous = NORMAL;") + (sqlite-exec db "PRAGMA temp_store = MEMORY;") + (sqlite-exec db "PRAGMA foreign_keys = ON;") + + (list db))) + #:name "ds write" + #:thread-destructor + (let ((writer-thread-destructor-counter + (make-gauge-metric metrics-registry + "datastore_writer_thread_close_total"))) + (lambda (db) + (db-optimize db + database-file metrics-registry - "datastore_read_delay_seconds"))) - (lambda (seconds-delayed) - (metric-observe delay-metric - ;; TODO exact->inexact to work around - ;; a bug in guile-prometheus where - ;; the metric sum will output in the - ;; exact form including the /q - (exact->inexact seconds-delayed)) - (log-delay "datastore read" seconds-delayed) - (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database read delayed by ~1,2f seconds~%" - seconds-delayed)))) - #:duration-logger - (lambda (duration proc) - (when (> duration 30) - (format - (current-error-port) - "warning: database read took ~1,2f seconds (~a)~%" - duration - proc))) - #:log-exception? worker-thread-log-exception?)) + #:maybe-truncate-wal? #f) + + (metric-increment writer-thread-destructor-counter) + (sqlite-close db))) + #:thread-lifetime 500 + #:expire-on-exception? #t + + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "datastore_write_delay_seconds" + #:buckets + %transaction-duration-histogram-buckets))) + (lambda (seconds-delayed proc) + (metric-observe delay-metric + ;; TODO exact->inexact to work around + ;; a bug in guile-prometheus where + ;; the metric sum will output in the + ;; exact form including the /q + (exact->inexact seconds-delayed)) + (log-delay "datastore write" seconds-delayed) + (when (> seconds-delayed 1) + (format/safe + (current-error-port) + "warning: database write delayed by ~1,2f seconds~%" + seconds-delayed)))) + #:duration-logger + (lambda (duration proc) + (when (> duration 10) + (format/safe + (current-error-port) + "warning: database write took ~1,2f seconds (~a)~%" + duration + proc))) + #:log-exception? thread-pool-log-exception?))) + + (slot-set! datastore + 'writer-thread-pool + writer-thread-pool) + ;; This is changed in datastore-spawn-fibers + (slot-set! datastore + 'low-priority-writer-thread-channel + (thread-pool-channel writer-thread-pool)) + (slot-set! datastore + 'default-priority-writer-thread-channel + (thread-pool-channel writer-thread-pool)) + (slot-set! datastore + 'high-priority-writer-thread-channel + (thread-pool-channel writer-thread-pool))) + + (let ((reader-thread-pool + (make-thread-pool + ;; Use a minimum of 8 and a maximum of 12 threads + (min (max (current-processor-count) + 8) + 12) + #:thread-initializer + (lambda () + (let ((db + (db-open database-file #:write? #f))) + (sqlite-exec db "PRAGMA temp_store = MEMORY;") + (sqlite-exec db "PRAGMA busy_timeout = 0;") + (sqlite-exec db "PRAGMA cache_size = -16000;") + + (list db))) + #:thread-destructor + (let ((reader-thread-destructor-counter + (make-gauge-metric metrics-registry + "datastore_reader_thread_close_total"))) + (lambda (db) + (metric-increment reader-thread-destructor-counter) + (sqlite-close db))) + #:name "ds read" + #:thread-lifetime 50000 + #:expire-on-exception? #t + + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "datastore_read_delay_seconds" + #:buckets + %transaction-duration-histogram-buckets))) + (lambda (seconds-delayed proc) + (metric-observe delay-metric + ;; TODO exact->inexact to work around + ;; a bug in guile-prometheus where + ;; the metric sum will output in the + ;; exact form including the /q + (exact->inexact seconds-delayed)) + (log-delay "datastore read" seconds-delayed) + (when (> seconds-delayed 1) + (format/safe + (current-error-port) + "warning: database read delayed by ~1,2f seconds~%" + seconds-delayed)))) + #:duration-logger + (lambda (duration proc) + (when (> duration 30) + (format/safe + (current-error-port) + "warning: database read took ~1,2f seconds (~a)~%" + duration + proc))) + #:log-exception? thread-pool-log-exception?))) + + (let ((metric (make-gauge-metric + metrics-registry + "datastore_reader_threads_total"))) + (metric-set metric + (vector-length + (thread-pool-proc-vector reader-thread-pool)))) + + + (slot-set! datastore + 'reader-thread-pool + reader-thread-pool)) datastore)) +(define* (call-with-writer-thread + datastore + proc + #:key priority duration-logger) + (call-with-thread + (slot-ref datastore 'writer-thread-pool) + proc + #:duration-logger duration-logger + #:channel + (slot-ref datastore + (assq-ref + '((high . high-priority-writer-thread-channel) + (default . default-priority-writer-thread-channel) + (low . low-priority-writer-thread-channel)) + (or priority 'default))))) + (define (sqlite-step-and-reset statement) (let ((val (sqlite-step statement))) (sqlite-reset statement) @@ -301,11 +341,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( metrics-registry checkpoint-duration-metric-name (lambda () - (if (> (wal-size) extreme-wal-size-threshold) - ;; Since the WAL is really getting too big, wait for much longer - (sqlite-exec db "PRAGMA busy_timeout = 300000;") - (sqlite-exec db "PRAGMA busy_timeout = 20;")) - (let* ((statement (sqlite-prepare db @@ -316,19 +351,17 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( (#(blocked? modified-page-count pages-moved-to-db) (if (= blocked? 1) (begin - (simple-format + (simple-format/safe (current-error-port) "warning: wal checkpoint blocked\n") #f) (begin - (simple-format + (simple-format/safe (current-error-port) "wal checkpoint completed (~A, ~A)\n" modified-page-count pages-moved-to-db) #t)))))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - result))) #t)) @@ -344,8 +377,8 @@ PRAGMA optimize;") (define-method (datastore-optimize (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (db-optimize db @@ -354,15 +387,49 @@ PRAGMA optimize;") (define-method (datastore-spawn-fibers (datastore <sqlite-datastore>)) + ;; Queue messages to the writer thread, so that they're handled in a first + ;; come first served manor + (let ((queues + queue-stats + (make-discrete-priority-queueing-channels + (thread-pool-channel + (slot-ref datastore 'writer-thread-pool)) + 3))) + (match queues + ((high default low) + (slot-set! datastore 'high-priority-writer-thread-channel high) + (slot-set! datastore 'default-priority-writer-thread-channel default) + (slot-set! datastore 'low-priority-writer-thread-channel low))) + (slot-set! datastore + 'writer-thread-channel-queue-stats + queue-stats)) + + (spawn-fiber + (lambda () + (while #t + (sleep 20) + (let ((procs + (vector->list + (thread-pool-proc-vector + (slot-ref datastore 'reader-thread-pool))))) + (when (every procedure? procs) + (for-each + (lambda (i proc) + (simple-format/safe (current-error-port) + "reader thread ~A running: ~A\n" + i proc)) + (iota (length procs)) + procs)))))) + (spawn-fiber (lambda () (while #t (sleep (* 60 10)) ; 10 minutes (with-exception-handler (lambda (exn) - (simple-format (current-error-port) - "exception when performing WAL checkpoint: ~A\n" - exn)) + (simple-format/safe (current-error-port) + "exception when performing WAL checkpoint: ~A\n" + exn)) (lambda () (with-time-logging "performing regular database maintenance" @@ -387,11 +454,18 @@ PRAGMA optimize;") (let ((setup-failures-total (make-gauge-metric registry "setup_failures_total" - #:labels '(agent_id reason)))) - - (letpar& ((setup-failure-counts - (with-time-logging "counting setup failures" - (datastore-count-setup-failures datastore)))) + #:labels '(agent_id reason))) + (background-jobs-inserted-total + (make-counter-metric registry + "coordinator_background_job_inserted_total" + #:labels '(name)))) + + (fibers-let ((setup-failure-counts + (with-time-logging "counting setup failures" + (datastore-count-setup-failures datastore))) + (background-job-counts + (with-time-logging "counting background jobs" + (datastore-count-background-jobs datastore)))) (for-each (match-lambda (((agent-id reason) . count) @@ -400,7 +474,14 @@ PRAGMA optimize;") #:label-values `((agent_id . ,agent-id) (reason . ,reason))))) - setup-failure-counts))) + setup-failure-counts) + + (for-each (match-lambda + ((type . count) + (metric-increment background-jobs-inserted-total + #:by count + #:label-values `((name . ,type))))) + background-job-counts))) #t) (define-method (datastore-update-metrics! @@ -433,12 +514,38 @@ PRAGMA optimize;") "datastore_wal_bytes") (make-gauge-metric registry "datastore_wal_bytes" - #:docstring "Size of the SQLite Write Ahead Log file")))) + #:docstring "Size of the SQLite Write Ahead Log file"))) + (reader-threads-used-metric + (or (metrics-registry-fetch-metric registry + "datastore_reader_threads_used_total") + (make-gauge-metric + registry "datastore_reader_threads_used_total"))) + (writer-queue-length-metric + (or (metrics-registry-fetch-metric registry + "datastore_writer_queue_total") + (make-gauge-metric + registry "datastore_writer_queue_total" + #:labels '(priority))))) - (letpar& ((build-counts - (datastore-count-builds datastore)) - (build-result-counts - (datastore-count-build-results datastore))) + (metric-set reader-threads-used-metric + (vector-count + (lambda (_ proc) + (->bool proc)) + (thread-pool-proc-vector + (slot-ref datastore 'reader-thread-pool)))) + + (for-each + (lambda (priority stats) + (metric-set writer-queue-length-metric + (assq-ref stats 'length) + #:label-values `((priority . ,priority)))) + '(high default low) + ((slot-ref datastore 'writer-thread-channel-queue-stats))) + + (fibers-let ((build-counts + (datastore-count-builds datastore)) + (build-result-counts + (datastore-count-build-results datastore))) (for-each (match-lambda ((system . count) (metric-set builds-total @@ -471,9 +578,10 @@ PRAGMA optimize;") internal-time-units-per-second)) (apply values vals))))) -(define (metric-observe-duration datastore - thing - duration-seconds) +(define* (metric-observe-duration datastore + thing + duration-seconds + #:key (buckets %default-histogram-buckets)) (define registry (slot-ref datastore 'metrics-registry)) (define metric-name (string-append "datastore_" thing "_duration_seconds")) @@ -481,15 +589,25 @@ PRAGMA optimize;") (let ((metric (or (metrics-registry-fetch-metric registry metric-name) (make-histogram-metric registry - metric-name)))) + metric-name + #:buckets buckets)))) (metric-observe metric duration-seconds))) -(define (call-with-worker-thread/delay-logging channel proc) - (call-with-worker-thread channel +(define (call-with-thread/delay-logging thread-pool proc) + (call-with-thread thread-pool + proc + #:duration-logger + (lambda (duration) + (log-delay proc duration)))) + +(define* (call-with-writer-thread/delay-logging datastore proc + #:key priority) + (call-with-writer-thread datastore proc #:duration-logger (lambda (duration) - (log-delay proc duration)))) + (log-delay proc duration)) + #:priority priority)) (define-exception-type &transaction-rollback-exception &exception make-transaction-rollback-exception @@ -503,21 +621,24 @@ PRAGMA optimize;") #:key readonly? (immediate? (not readonly?)) - duration-metric-name) + priority + duration-metric-name + (duration-metric-buckets + %transaction-duration-histogram-buckets)) (define (run-proc-within-transaction db) (define (attempt-begin) (with-exception-handler (lambda (exn) (match (exception-args exn) (('sqlite-exec 5 msg) - (simple-format + (simple-format/safe (current-error-port) "warning: issue starting transaction (code: 5, proc: ~A): ~A\n" proc msg) #f) (_ - (simple-format (current-error-port) - "exception starting transaction\n") + (simple-format/safe (current-error-port) + "exception starting transaction\n") (raise-exception exn)))) (lambda () (sqlite-exec db (if immediate? @@ -531,14 +652,14 @@ PRAGMA optimize;") (lambda (exn) (match (exception-args exn) (('sqlite-exec 5 msg) - (simple-format + (simple-format/safe (current-error-port) "warning: attempt commit (code: 5, proc: ~A): ~A\n" proc msg) #f) (_ - (simple-format (current-error-port) - "exception committing transaction\n") + (simple-format/safe (current-error-port) + "exception committing transaction\n") (raise-exception exn)))) (lambda () (sqlite-exec db "COMMIT TRANSACTION;") @@ -546,63 +667,88 @@ PRAGMA optimize;") #:unwind? #t)) (if (attempt-begin) - (call-with-values - (lambda () - (with-exception-handler - (lambda (exn) - (if (transaction-rollback-exception? exn) - (begin - (sqlite-exec db "ROLLBACK TRANSACTION;") - (transaction-rollback-exception-return-value exn)) - (begin - (simple-format (current-error-port) - "error: sqlite rolling back transaction (~A)\n" - exn) - (sqlite-exec db "ROLLBACK TRANSACTION;") - (raise-exception exn)))) + (with-exception-handler + (lambda (exn) + (if (transaction-rollback-exception? exn) + (begin + (sqlite-exec db "ROLLBACK TRANSACTION;") + (transaction-rollback-exception-return-value exn)) + (begin + (simple-format/safe + (current-error-port) + "error: sqlite rolling back transaction (~A)\n" + exn) + (sqlite-exec db "ROLLBACK TRANSACTION;") + (raise-exception exn)))) + (lambda () + (call-with-values (lambda () - (parameterize ((%current-transaction-proc proc)) - (call-with-delay-logging proc #:args (list db)))) - #:unwind? #t)) - (lambda vals - (let loop ((success? (attempt-commit))) - (if success? - (apply values vals) - (loop (attempt-commit)))))) + (with-exception-handler + (lambda (exn) + (unless (transaction-rollback-exception? exn) + (backtrace)) + (raise-exception exn)) + (lambda () + (parameterize ((%current-transaction-proc proc) + ;; Set the arguments parameter for the + ;; reader thread pool so that any nested + ;; calls to call-with-thread for the + ;; reader thread pool just use the writer + ;; db connection and thus this + ;; transaction + ((thread-pool-arguments-parameter + (slot-ref datastore 'reader-thread-pool)) + (list db))) + (call-with-delay-logging proc #:args (list db)))))) + (lambda vals + (let loop ((success? (attempt-commit))) + (if success? + (apply values vals) + (loop (attempt-commit))))))) + #:unwind? #t) ;; Database is busy, so retry (run-proc-within-transaction db))) - (call-with-worker-thread + (call-with-thread (slot-ref datastore (if readonly? - 'worker-reader-thread-channel - 'worker-writer-thread-channel)) + 'reader-thread-pool + 'writer-thread-pool)) (lambda (db) (if (%current-transaction-proc) (call-with-delay-logging proc #:args (list db)) ; already in transaction (run-proc-within-transaction db))) + #:channel + (if readonly? + (thread-pool-channel + (slot-ref datastore 'reader-thread-pool)) + (slot-ref datastore + (assq-ref + '((high . high-priority-writer-thread-channel) + (default . default-priority-writer-thread-channel) + (low . low-priority-writer-thread-channel)) + (or priority 'default)))) #:duration-logger (lambda (duration-seconds) (when (and (not readonly?) (> duration-seconds 2)) - (display - (format - #f - "warning: ~a:\n took ~4f seconds in transaction\n" - proc - duration-seconds) - (current-error-port)) - - (when duration-metric-name - (metric-observe-duration datastore - duration-metric-name - duration-seconds)))))) + (format/safe + (current-error-port) + "warning: ~a:\n took ~4f seconds in transaction\n" + proc + duration-seconds)) + + (when duration-metric-name + (metric-observe-duration datastore + duration-metric-name + duration-seconds + #:buckets duration-metric-buckets))))) (define-method (datastore-find-agent (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -630,8 +776,8 @@ SELECT description FROM agents WHERE id = :id" (define-method (datastore-find-agent-by-name (datastore <sqlite-datastore>) name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -658,8 +804,8 @@ SELECT id FROM agents WHERE name = :name" (define-method (datastore-insert-dynamic-auth-token (datastore <sqlite-datastore>) token) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -677,8 +823,8 @@ INSERT INTO dynamic_auth_tokens (token) VALUES (:token)" (define-method (datastore-dynamic-auth-token-exists? (datastore <sqlite-datastore>) token) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -705,8 +851,8 @@ SELECT 1 FROM dynamic_auth_tokens WHERE token = :token" (define-method (datastore-fetch-agent-tags (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -742,16 +888,16 @@ WHERE agent_tags.agent_id = :agent_id" uuid name description) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (insert-agent db uuid name description))) #t) (define-method (datastore-list-agents (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -779,8 +925,8 @@ SELECT id, name, description, active FROM agents ORDER BY id" (unless (boolean? active?) (error "datastore-set-agent-active called with non-boolean")) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -799,8 +945,8 @@ UPDATE agents SET active = :active WHERE id = :uuid" (define-method (datastore-find-agent-status (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -834,8 +980,8 @@ WHERE agent_id = :agent_id" 1min-load-average system-uptime processor-count) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -871,8 +1017,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr (datastore <sqlite-datastore>) agent-uuid password) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (insert-agent-password db agent-uuid password))) #t) @@ -881,8 +1027,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr (datastore <sqlite-datastore>) uuid password) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -904,8 +1050,8 @@ WHERE agent_id = :agent_id AND password = :password" (define-method (datastore-agent-list-passwords (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1002,6 +1148,7 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)" (lambda (db) (insert-derivation-and-return-outputs db derivation) (hash-clear! %derivation-outputs-cache)) + #:priority 'low #:duration-metric-name "store_derivation") #t) @@ -1009,8 +1156,8 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)" (define-method (datastore-build-exists-for-derivation-outputs? (datastore <sqlite-datastore>) derivation) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1042,8 +1189,8 @@ WHERE derivation_outputs.derivation_id = :derivation_id (define-method (datastore-build-required-by-another? (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1137,8 +1284,8 @@ SELECT name FROM derivations WHERE id = :id" (match (sqlite-step-and-reset statement) (#(name) name)))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let loop ((derivation-ids (list (db-find-derivation-id db derivation))) (result '())) @@ -1163,8 +1310,8 @@ SELECT name FROM derivations WHERE id = :id" args) (apply (lambda* (system #:key include-cancelled?) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1252,8 +1399,8 @@ FROM ( (define-method (datastore-list-builds-for-derivation-recursive-inputs (datastore <sqlite-datastore>) derivation) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1290,8 +1437,8 @@ INNER JOIN related_derivations (define-method (datastore-find-unprocessed-build-entry (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1320,8 +1467,8 @@ WHERE build_id = :build_id" (datastore <sqlite-datastore>) build-uuid tags) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((insert-tag-statement (sqlite-prepare @@ -1521,8 +1668,8 @@ WHERE build_id = :build_id" uuid explicit-priority-lower-bound) (define builds-to-consider - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) ;; Recursively find builds for all missing outputs that this build ;; takes as inputs. The order is important here, since we want to @@ -1672,8 +1819,8 @@ WHERE build_id = :build_id" override-derived-priority) (let ((build-id old-priority - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((build-id (db-find-build-id db uuid))) @@ -1730,46 +1877,10 @@ WHERE build_id = :build_id" #t) rest)) -(define-method (datastore-remove-build-from-allocation-plan - (datastore <sqlite-datastore>) - uuid) - (define (update-build-allocation-plan-metrics) - (let ((allocation-plan-metric - (metrics-registry-fetch-metric - (slot-ref datastore 'metrics-registry) - "build_allocation_plan_total"))) - (for-each - (match-lambda - ((agent-id . count) - (metric-set allocation-plan-metric - count - #:label-values - `((agent_id . ,agent-id))))) - (datastore-count-build-allocation-plan-entries datastore)))) - - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (let ((statement (sqlite-prepare - db - " -DELETE FROM build_allocation_plan WHERE build_id = :build_id" - #:cache? #t))) - - (sqlite-bind-arguments - statement - #:build_id (db-find-build-id db uuid)) - - (sqlite-step-and-reset statement) - - (unless (= 0 (changes-count db)) - (update-build-allocation-plan-metrics))))) - #t) - (define-method (datastore-count-build-results (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1827,8 +1938,8 @@ VALUES (:agent_id, :result, 1)" #t)))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -1879,14 +1990,12 @@ LIMIT 1" (#f #t) (#(1) #f)))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (let ((builds-statement - (sqlite-prepare - db - " -SELECT DISTINCT unprocessed_builds.id + (define (all-build-ids db) + (let ((statement + (sqlite-prepare + db + " +SELECT input_builds.id FROM builds INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id @@ -1894,45 +2003,62 @@ INNER JOIN derivation_outputs AS all_derivation_outputs ON all_derivation_outputs.output_id = derivation_outputs.output_id INNER JOIN derivation_inputs ON derivation_inputs.derivation_output_id = all_derivation_outputs.id -INNER JOIN builds AS unprocessed_builds - ON unprocessed_builds.processed = 0 - AND unprocessed_builds.derivation_id = derivation_inputs.derivation_id -INNER JOIN unprocessed_builds_with_derived_priorities - ON unprocessed_builds_with_derived_priorities.build_id = unprocessed_builds.id - AND unprocessed_builds_with_derived_priorities.all_inputs_built = 0 +INNER JOIN builds AS input_builds + ON input_builds.processed = 0 + AND input_builds.canceled = 0 + AND input_builds.derivation_id = derivation_inputs.derivation_id WHERE builds.id = :build_id" - #:cache? #t)) + #:cache? #t))) - (update-statement - (sqlite-prepare - db - " + (sqlite-bind-arguments + statement + #:build_id (db-find-build-id db build-uuid)) + + (let ((result + (sqlite-fold + (lambda (row result) + (match row + (#(build-id) + (if (all-inputs-built? db build-id) + (cons build-id result) + result)))) + '() + statement))) + (sqlite-reset statement) + + result))) + + (let ((build-ids + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (all-build-ids db))))) + (call-with-writer-thread/delay-logging + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " UPDATE unprocessed_builds_with_derived_priorities SET all_inputs_built = 1 WHERE build_id = :build_id" - #:cache? #t))) + #:cache? #t))) - (sqlite-bind-arguments builds-statement - #:build_id (db-find-build-id db build-uuid)) - - (sqlite-fold - (lambda (row result) - (match row - (#(build-id) - (when (all-inputs-built? db build-id) - (sqlite-bind-arguments update-statement - #:build_id build-id) - (sqlite-step-and-reset update-statement)))) - #f) - #f - builds-statement) - (sqlite-reset builds-statement))))) + (for-each + (lambda (build-id) + (sqlite-bind-arguments statement + #:build_id build-id) + (sqlite-step-and-reset statement)) + build-ids) + + #t))))) (define-method (datastore-remove-build-allocation (datastore <sqlite-datastore>) build-uuid agent-id) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -1954,8 +2080,8 @@ DELETE FROM allocated_builds (define-method (datastore-mark-build-as-processed (datastore <sqlite-datastore>) build-uuid end-time) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -1991,8 +2117,8 @@ DELETE FROM unprocessed_builds_with_derived_priorities (define-method (datastore-delete-relevant-outputs-from-unbuilt-outputs (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -2019,8 +2145,8 @@ WHERE output_id IN ( (datastore <sqlite-datastore>) build-uuid output-metadata) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (define (name->output-id name) (let ((statement @@ -2097,8 +2223,8 @@ INSERT INTO build_starts ( (define-method (datastore-find-build-starts (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2207,8 +2333,8 @@ INSERT INTO setup_failure_missing_inputs ( (define-method (datastore-list-setup-failure-missing-inputs (datastore <sqlite-datastore>) setup-failure-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2236,8 +2362,8 @@ WHERE setup_failure_id = :id" build-uuid agent-id failure-reason) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (insert-setup-failure-and-remove-allocation db (db-find-build-id db build-uuid) @@ -2254,8 +2380,8 @@ WHERE setup_failure_id = :id" (define-method (datastore-count-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2278,8 +2404,8 @@ FROM builds_counts" (define-method (datastore-for-each-build (datastore <sqlite-datastore>) proc) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2318,8 +2444,8 @@ SELECT id FROM builds WHERE uuid = :uuid" (define-method (datastore-find-build (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2377,12 +2503,14 @@ WHERE uuid = :uuid" (canceled 'unset) (priority-> 'unset) (priority-< 'unset) + (created-at-> 'unset) + (created-at-< 'unset) (after-id #f) (limit #f) ;; other-builds-dependent or no-dependent-builds (relationship 'unset)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (define tag->expression (let ((statement @@ -2395,7 +2523,7 @@ SELECT id FROM tags WHERE key = :key AND value = :value" (sqlite-prepare db " -SELECT id FROM tags WHERE key = :key" +SELECT 1 FROM tags WHERE key = :key LIMIT 1" #:cache? #t))) (lambda (tag not?) (match tag @@ -2416,28 +2544,27 @@ SELECT id FROM tags WHERE key = :key" (sqlite-bind-arguments key-statement #:key key) - (let* ((tag-ids (sqlite-map - (match-lambda - (#(id) id)) - key-statement)) - (result - (string-append - "(" - (string-join - (map - (lambda (id) + (let ((tag-with-key-exists? + (->bool (sqlite-step-and-reset key-statement)))) + (if tag-with-key-exists? + (let ((result (string-append + "(" (if not? "NOT " "") - "EXISTS (SELECT 1 FROM build_tags " - "WHERE build_id = builds.id AND tag_id = " - (number->string id) - ")")) - tag-ids) - (if not? " AND " " OR ")) - ")"))) - (sqlite-reset key-statement) - - result)))))) + " +EXISTS ( + SELECT 1 + FROM build_tags + INNER JOIN tags ON build_tags.tag_id = tags.id + WHERE build_id = builds.id + AND tags.key = '" + key "' +)" + ")"))) + result) + (if not? + "TRUE" + "FALSE")))))))) (let ((tag-expressions (map (lambda (tag) @@ -2459,6 +2586,8 @@ SELECT id FROM tags WHERE key = :key" (not (null? not-systems)) (not (eq? priority-> 'unset)) (not (eq? priority-< 'unset)) + (not (eq? created-at-> 'unset)) + (not (eq? created-at-< 'unset)) (not (eq? processed 'unset)) (not (eq? canceled 'unset)) (not (eq? relationship 'unset)) @@ -2509,6 +2638,14 @@ INNER JOIN derivations (list (simple-format #f "priority < ~A" priority-<)) '()) + (if (string? created-at->) + (list + (simple-format #f "created_at > '~A'" created-at->)) + '()) + (if (string? created-at-<) + (list + (simple-format #f "created_at < '~A'" created-at-<)) + '()) (cond ((eq? processed #t) '("processed = 1")) ((eq? processed #f) '("processed = 0")) @@ -2612,8 +2749,8 @@ WHERE derivation_outputs.derivation_id = builds.derivation_id)")) (define-method (datastore-fetch-build-tags (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2647,8 +2784,8 @@ WHERE build_tags.build_id = :build_id" (define-method (datastore-find-build-result (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2673,8 +2810,8 @@ WHERE build_id = :build_id" (define-method (datastore-find-build-derivation-system (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2710,8 +2847,8 @@ WHERE builds.id = :build_id" datastore "list_builds_for_output" (lambda () - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2759,8 +2896,8 @@ WHERE derivation_outputs.output_id = :output_id" rest) (apply (lambda* (output system #:key include-canceled?) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2804,8 +2941,8 @@ WHERE derivation_outputs.output_id = :output_id rest) (apply (lambda* (derivation #:key (include-canceled? #t)) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2840,8 +2977,8 @@ WHERE derivations.name = :derivation" (define-method (datastore-count-setup-failures (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2865,8 +3002,8 @@ GROUP BY agent_id, failure_reason" (define-method (datastore-list-setup-failures-for-build (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2898,8 +3035,8 @@ WHERE build_id = :build_id" args) (apply (lambda* (#:key agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2950,8 +3087,8 @@ WHERE builds.processed = 0 (define-method (datastore-list-processed-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2977,8 +3114,8 @@ WHERE processed = 1" (define-method (datastore-list-unprocessed-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3012,10 +3149,11 @@ ORDER BY priority DESC" builds))))) -(define-method (datastore-find-first-unallocated-deferred-build - (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) +(define-method (datastore-find-deferred-build + (datastore <sqlite-datastore>) + select?) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3028,25 +3166,29 @@ INNER JOIN derivations WHERE processed = 0 AND canceled = 0 AND deferred_until IS NOT NULL - AND builds.id NOT IN (SELECT build_id FROM build_allocation_plan) -ORDER BY deferred_until ASC -LIMIT 1" +ORDER BY deferred_until ASC" #:cache? #t))) - (match (sqlite-step-and-reset statement) - (#(uuid derivation_name priority created_at deferred_until) - `((uuid . ,uuid) - (derivation-name . ,derivation_name) - (priority . ,priority) - (created-at . ,(if (string? created_at) - (string->date created_at - "~Y-~m-~d ~H:~M:~S") - #f)) - (deferred-until . ,(if (string? deferred_until) - (string->date deferred_until - "~Y-~m-~d ~H:~M:~S") - #f)))) - (#f #f)))))) + (let loop ((row (sqlite-step statement))) + (match row + (#(uuid derivation_name priority created_at deferred_until) + (let ((res + (select? + `((uuid . ,uuid) + (derivation-name . ,derivation_name) + (priority . ,priority) + (created-at . ,(if (string? created_at) + (string->date created_at + "~Y-~m-~d ~H:~M:~S") + #f)) + (deferred-until . ,(if (string? deferred_until) + (string->date deferred_until + "~Y-~m-~d ~H:~M:~S") + #f)))))) + (if res + res + (loop (sqlite-step statement))))) + (#f #f))))))) (define-method (datastore-fetch-prioritised-unprocessed-builds (datastore <sqlite-datastore>)) @@ -3055,9 +3197,11 @@ LIMIT 1" (sqlite-prepare db " -SELECT builds.uuid +SELECT builds.uuid, systems.system FROM unprocessed_builds_with_derived_priorities INNER JOIN builds ON build_id = builds.id +INNER JOIN derivations ON builds.derivation_id = derivations.id +INNER JOIN systems ON derivations.system_id = systems.id WHERE all_inputs_built = 1 AND NOT EXISTS ( SELECT 1 @@ -3067,26 +3211,21 @@ WHERE all_inputs_built = 1 ) ORDER BY derived_priority ASC" #:cache? #t))) - (let ((result (sqlite-fold - (lambda (row result) - (cons (vector-ref row 0) - result)) - '() - statement))) + (let ((result (sqlite-fold cons '() statement))) (sqlite-reset statement) result))) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) fetch-prioritised-unprocessed-builds)) (define-method (datastore-insert-unprocessed-hook-event (datastore <sqlite-datastore>) event arguments) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (insert-unprocessed-hook-event db event @@ -3116,8 +3255,8 @@ VALUES (:event, :arguments)" (define-method (datastore-count-unprocessed-hook-events (datastore <sqlite-datastore>)) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3140,8 +3279,8 @@ SELECT event, COUNT(*) FROM unprocessed_hook_events GROUP BY event" (datastore <sqlite-datastore>) event limit) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3174,8 +3313,8 @@ LIMIT :limit" (define-method (datastore-find-unprocessed-hook-event (datastore <sqlite-datastore>) id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3201,8 +3340,8 @@ WHERE id = :id" (define-method (datastore-delete-unprocessed-hook-event (datastore <sqlite-datastore>) id) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -3215,110 +3354,14 @@ DELETE FROM unprocessed_hook_events WHERE id = :id" statement #:id id) - (sqlite-step-and-reset statement)))) - #t) - -(define-method (datastore-count-build-allocation-plan-entries - (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (let ((statement - (sqlite-prepare - db - " -SELECT agent_id, COUNT(*) -FROM build_allocation_plan -GROUP BY agent_id" - #:cache? #t))) - - (let ((result - (sqlite-map - (match-lambda - (#(agent_id count) - (cons agent_id count))) - statement))) - (sqlite-reset statement) - - result))))) - -(define-method (datastore-replace-build-allocation-plan - (datastore <sqlite-datastore>) - planned-builds) - (define (clear-current-plan db) - (sqlite-exec - db - "DELETE FROM build_allocation_plan")) - - (define insert-sql - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (string-append - " -INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES " - (string-join - (map (match-lambda - ((build-uuid agent-id ordering) - (simple-format - #f - "('~A', '~A', ~A)" - (db-find-build-id db build-uuid) - agent-id - ordering))) - planned-builds) - ", ") - ";")))) - - (define (insert-new-plan db planned-builds) - (sqlite-exec - db - insert-sql)) - - (datastore-call-with-transaction - datastore - (lambda (db) - (clear-current-plan db) - (unless (null? planned-builds) - (insert-new-plan db planned-builds))) - #:duration-metric-name "replace_build_allocation_plan") - - (let* ((agent-ids - (map (lambda (agent-details) - (assq-ref agent-details 'uuid)) - (datastore-list-agents datastore))) - (counts-by-agent - (make-vector (length agent-ids) 0))) - (for-each - (match-lambda - ((_ agent-id _) - (let ((index (list-index (lambda (list-agent-id) - (string=? agent-id list-agent-id)) - agent-ids))) - (vector-set! counts-by-agent - index - (+ (vector-ref counts-by-agent - index) - 1))))) - planned-builds) - - (let ((metric - (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) - "build_allocation_plan_total"))) - (for-each - (lambda (index agent-id) - (metric-set metric - (vector-ref counts-by-agent index) - #:label-values - `((agent_id . ,agent-id)))) - (iota (length agent-ids)) - agent-ids))) + (sqlite-step-and-reset statement))) + #:priority 'high) #t) (define-method (datastore-count-allocated-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3340,8 +3383,8 @@ SELECT agent_id, COUNT(*) FROM allocated_builds GROUP BY agent_id" (define-method (datastore-agent-requested-systems (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3414,34 +3457,40 @@ INSERT INTO build_allocation_agent_requested_systems (agent_id, system_id) VALUE (define-method (datastore-fetch-build-to-allocate (datastore <sqlite-datastore>) - agent-id) - (datastore-call-with-transaction - datastore + build-id) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare db - ;; This needs to guard against the plan being out of date " SELECT builds.uuid, derivations.id, derivations.name, unprocessed_builds_with_derived_priorities.derived_priority FROM builds -INNER JOIN build_allocation_plan - ON builds.id = build_allocation_plan.build_id INNER JOIN derivations ON builds.derivation_id = derivations.id -INNER JOIN build_allocation_agent_requested_systems - ON build_allocation_agent_requested_systems.agent_id = :agent_id - AND build_allocation_agent_requested_systems.system_id = derivations.system_id LEFT JOIN unprocessed_builds_with_derived_priorities ON unprocessed_builds_with_derived_priorities.build_id = builds.id -WHERE build_allocation_plan.agent_id = :agent_id +WHERE builds.uuid = :uuid AND builds.processed = 0 AND builds.canceled = 0 - AND builds.id NOT IN (SELECT build_id FROM allocated_builds) -ORDER BY build_allocation_plan.ordering ASC" - #:cache? #t)) - (output-conflicts-statement + AND builds.id NOT IN (SELECT build_id FROM allocated_builds)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:uuid build-id) + + (sqlite-step-and-reset statement))))) + +(define-method (datastore-check-if-derivation-conflicts? + (datastore <sqlite-datastore>) + agent-id + derivation-id) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement (sqlite-prepare db " @@ -3459,149 +3508,63 @@ WHERE build_derivation_outputs.derivation_id = :derivation_id allocated_builds_derivation_outputs.output_id" #:cache? #t))) - (define (get-build-to-allocate) - (let loop ((build-details (sqlite-step statement))) - (match build-details - (#f #f) - (#(uuid derivation-id derivation-name derived_priority) + (sqlite-bind-arguments statement + #:agent_id agent-id + #:derivation_id derivation-id) - (sqlite-bind-arguments output-conflicts-statement - #:agent_id agent-id - #:derivation_id derivation-id) + (->bool (sqlite-step-and-reset statement)))))) - (if (eq? (sqlite-step-and-reset output-conflicts-statement) - #f) - `((uuid . ,uuid) - (derivation_name . ,derivation-name) - (derived_priority . ,derived_priority)) - (loop (sqlite-step statement))))))) +(define-method (datastore-insert-to-allocated-builds + (datastore <sqlite-datastore>) + agent-id + build-uuid) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO allocated_builds (build_id, agent_id) + VALUES (:build_id, :agent_id)" + #:cache? #t))) (sqlite-bind-arguments statement + #:build_id (db-find-build-id db build-uuid) #:agent_id agent-id) - (let ((result (get-build-to-allocate))) - (sqlite-reset statement) - - result))) - - #:readonly? #t - #:duration-metric-name "fetch_builds_to_allocate")) - -(define-method (datastore-insert-to-allocated-builds - (datastore <sqlite-datastore>) - agent-id - build-uuids) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (sqlite-exec - db - (string-append - " -INSERT INTO allocated_builds (build_id, agent_id) VALUES " - (string-join - (map (lambda (build-uuid) - (simple-format - #f - "(~A, '~A')" - (db-find-build-id db build-uuid) - agent-id)) - build-uuids) - ", ") - ";"))))) + (sqlite-step-and-reset statement)))) + #t) -(define-method (datastore-remove-builds-from-plan +(define-method (datastore-update-allocated-build-submit-outputs (datastore <sqlite-datastore>) - build-uuids) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + build-uuid + submit-outputs?) + (call-with-writer-thread + datastore (lambda (db) - (sqlite-exec - db - (string-append - " -DELETE FROM build_allocation_plan -WHERE build_id IN (" - (string-join - (map (lambda (build-uuid) - (number->string (db-find-build-id db build-uuid))) - build-uuids) - ", ") - ")"))))) - -(define-method (datastore-list-allocation-plan-builds - (datastore <sqlite-datastore>) - . - rest) - (apply - (lambda* (agent-id #:key limit) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (let ((statement - (sqlite-prepare - db - (string-append - " -SELECT builds.uuid, derivations.name, systems.system, - builds.priority, - unprocessed_builds_with_derived_priorities.derived_priority -FROM builds -INNER JOIN derivations - ON builds.derivation_id = derivations.id -INNER JOIN systems - ON derivations.system_id = systems.id -INNER JOIN build_allocation_plan - ON builds.id = build_allocation_plan.build_id -LEFT JOIN unprocessed_builds_with_derived_priorities - ON builds.id = unprocessed_builds_with_derived_priorities.build_id -WHERE build_allocation_plan.agent_id = :agent_id - AND builds.processed = 0 - AND builds.id NOT IN (SELECT build_id FROM allocated_builds) -ORDER BY build_allocation_plan.ordering ASC" - (if limit - " -LIMIT :limit" - "")) - #:cache? #t))) - - (apply sqlite-bind-arguments - statement - #:agent_id agent-id - (if limit - (list #:limit limit) - '())) - - (let ((builds (sqlite-map - (match-lambda - (#(uuid derivation_name system - priority derived_priority) - `((uuid . ,uuid) - (derivation_name . ,derivation_name) - (system . ,system) - (priority . ,priority) - (derived_priority . ,derived_priority) - (tags . ,(vector-map - (lambda (_ tag) - (match tag - ((key . value) - `((key . ,key) - (value . ,value))))) - (datastore-fetch-build-tags - datastore - uuid)))))) - statement))) - (sqlite-reset statement) + (let ((statement + (sqlite-prepare + db + " +UPDATE allocated_builds +SET submit_outputs = :submit_outputs +WHERE build_id = :build_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:build_id (db-find-build-id db build-uuid) + #:submit_outputs (if submit-outputs? 1 0)) - builds))))) - rest)) + (sqlite-step-and-reset statement)))) + #t) (define-method (datastore-list-agent-builds (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3609,7 +3572,7 @@ LIMIT :limit" " SELECT builds.uuid, derivations.name, unprocessed_builds_with_derived_priorities.derived_priority, - builds.canceled + builds.canceled, allocated_builds.submit_outputs FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id @@ -3626,11 +3589,17 @@ WHERE allocated_builds.agent_id = :agent_id" (let ((builds (sqlite-map (match-lambda - (#(uuid derivation_name derived_priority canceled) + (#(uuid derivation_name derived_priority canceled + submit_outputs) `((uuid . ,uuid) (derivation_name . ,derivation_name) (derived_priority . ,derived_priority) - (canceled . ,(= 1 canceled))))) + (canceled . ,(= 1 canceled)) + (submit_outputs . ,(cond + ((not submit_outputs) + 'null) + (else + (= 1 submit_outputs))))))) statement))) (sqlite-reset statement) @@ -3639,8 +3608,8 @@ WHERE allocated_builds.agent_id = :agent_id" (define-method (datastore-agent-for-build (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3702,13 +3671,14 @@ WHERE build_results.build_id = :build_id" "_sqitch_registry.db") (string-append "db:sqlite:" database-file)))) - (simple-format #t "running command: ~A\n" - (string-join command)) - (unless (zero? (apply system* command)) - (simple-format - (current-error-port) - "error: sqitch command failed\n") - (exit 1)))) + (simple-format/safe #t "running command: ~A\n" + (string-join command)) + (let ((pid (spawn (%config 'sqitch) command))) + (unless (zero? (cdr (waitpid pid))) + (simple-format/safe + (current-error-port) + "error: sqitch command failed\n") + (exit 1))))) (define (changes-count db) (let ((statement @@ -3795,16 +3765,16 @@ WHERE name = :name" (define-method (datastore-find-derivation (datastore <sqlite-datastore>) name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (db-find-derivation db name)))) (define-method (datastore-find-derivation-outputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3836,8 +3806,8 @@ WHERE derivation_id = :derivation_id" (define-method (datastore-find-derivation-output-details (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3879,8 +3849,8 @@ WHERE derivation_id = :derivation_id" (define-method (datastore-list-unbuilt-derivation-outputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3913,8 +3883,8 @@ WHERE derivation_id = :derivation_id" (define-method (datastore-list-build-outputs (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3958,8 +3928,8 @@ WHERE builds.id = :build_id" (define-method (datastore-find-derivation-system (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3982,8 +3952,8 @@ WHERE name = :name" (define-method (datastore-find-derivation-inputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -4023,8 +3993,8 @@ WHERE derivations.id = :derivation_id" (define-method (datastore-find-recursive-derivation-input-outputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -4065,8 +4035,8 @@ INNER JOIN outputs (datastore <sqlite-datastore>) start-derivation-name output) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -4149,20 +4119,6 @@ SELECT system FROM systems WHERE id = :id" (define derivation-name (derivation-file-name derivation)) - (define (maybe-fix-fixed-output-field derivation-details) - (let ((fixed-output? (fixed-output-derivation? derivation))) - (unless (equal? (assq-ref derivation-details 'fixed-output?) - fixed-output?) - (let ((statement (sqlite-prepare - db - " -UPDATE derivations SET fixed_output = :fixed_output WHERE name = :name" - #:cache? #t))) - (sqlite-bind-arguments statement - #:name derivation-name - #:fixed_output (if fixed-output? 1 0)) - (sqlite-step-and-reset statement))))) - (define (insert-derivation) (let ((statement (sqlite-prepare @@ -4193,8 +4149,6 @@ INSERT INTO derivations (name, system_id, fixed_output) (db-find-derivation db derivation-name))) (if derivation-details (begin - (maybe-fix-fixed-output-field derivation-details) - (let ((derivation-outputs (select-derivation-outputs db derivation-name))) @@ -4503,15 +4457,15 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)" (lambda* (uuid drv-name priority defer-until #:key skip-updating-other-build-derived-priorities) (define system-id - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (db-system->system-id db (datastore-find-derivation-system datastore drv-name))))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let* ((build-id (insert-build db drv-name uuid priority defer-until)) @@ -4537,7 +4491,8 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)" #:args (list db build-id - derived-priority))))))) + derived-priority))))) + #:priority 'low)) rest) #t) @@ -4573,3 +4528,212 @@ VALUES (:agent_id, :password)" #:password password) (sqlite-step-and-reset statement))) + +(define-method (datastore-insert-background-job + (datastore <sqlite-datastore>) + type + args) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO background_jobs_queue (type, args) +VALUES (:type, :args) +RETURNING id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:type (symbol->string type) + #:args (call-with-output-string + (lambda (port) + (write args port)))) + + (match (sqlite-step-and-reset statement) + (#(id) + + (metric-increment + (metrics-registry-fetch-metric + (slot-ref datastore 'metrics-registry) + "coordinator_background_job_inserted_total") + #:label-values `((name . ,type))) + + id)))))) + +(define-method (datastore-delete-background-job + (datastore <sqlite-datastore>) + id) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +DELETE FROM background_jobs_queue WHERE id = :id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:id id) + + (sqlite-step-and-reset statement)) + #t) + #:priority 'high)) + +(define-method (datastore-select-background-jobs + (datastore <sqlite-datastore>) + . + args) + (apply + (lambda* (type #:key (limit 1)) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT id, args +FROM background_jobs_queue +WHERE type = :type +ORDER BY id ASC +LIMIT :limit" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:type (symbol->string type) + #:limit limit) + + (let ((result + (sqlite-map + (match-lambda + (#(id args) + `((id . ,id) + (args . ,(call-with-input-string args read))))) + statement))) + (sqlite-reset statement) + result))))) + args)) + +(define-method (datastore-count-background-jobs + (datastore <sqlite-datastore>)) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT type, COUNT(*) +FROM background_jobs_queue +GROUP BY type" + #:cache? #t))) + + (let ((result + (sqlite-map + (match-lambda + (#(type count) + (cons type count))) + statement))) + (sqlite-reset statement) + + result))))) + +(define-method (datastore-check-and-correct-unprocessed-builds-all-inputs-built + (datastore <sqlite-datastore>) + . + args) + (define entries-to-check + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT build_id, builds.derivation_id +FROM unprocessed_builds_with_derived_priorities +INNER JOIN builds ON builds.id = build_id +WHERE all_inputs_built = 0 +ORDER BY build_id DESC" + #:cache? #t))) + (let ((result (sqlite-map identity statement))) + (sqlite-reset statement) + result))))) + + (define (all-inputs-built? derivation-id) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT 1 +FROM derivation_inputs +INNER JOIN derivation_outputs + ON derivation_inputs.derivation_output_id = derivation_outputs.id +INNER JOIN unbuilt_outputs + ON unbuilt_outputs.output_id = derivation_outputs.output_id +WHERE derivation_inputs.derivation_id = :derivation_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:derivation_id derivation-id) + + (match (sqlite-step-and-reset statement) + (#(1) #f) + (#f #t)))))) + + (define (update build-id) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +UPDATE unprocessed_builds_with_derived_priorities +SET all_inputs_built = 1 +WHERE build_id = :build_id AND all_inputs_built = 0 +RETURNING 1" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:build_id build-id) + + ;; This is to cope with the check not being transactional, so we + ;; might go to update a record which has just been changed to have + ;; all_inputs_built = 1 + (match (sqlite-step-and-reset statement) + (#(1) #t) + (#f #f)))))) + + (apply + (lambda* (#:key (progress-reporter (const progress-reporter/silent))) + (let ((reporter + (progress-reporter (length entries-to-check)))) + (start-progress-reporter! reporter) + (let loop ((entries entries-to-check) + (updated-count 0)) + (if (null? entries) + (begin + (stop-progress-reporter! reporter) + updated-count) + (match (car entries) + (#(build-id derivation-id) + (progress-reporter-report! reporter) + (if (all-inputs-built? derivation-id) + (loop (cdr entries) + (+ updated-count + (if (update build-id) + 1 + 0))) + (loop (cdr entries) + updated-count)))))))) + args)) |