diff options
Diffstat (limited to 'guix-build-coordinator/datastore/sqlite.scm')
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 550 |
1 files changed, 194 insertions, 356 deletions
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index e67a940..ac04362 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -87,7 +87,7 @@ datastore-replace-agent-tags datastore-list-processed-builds datastore-list-unprocessed-builds - datastore-find-first-unallocated-deferred-build + datastore-find-deferred-build datastore-fetch-prioritised-unprocessed-builds datastore-insert-unprocessed-hook-event datastore-count-unprocessed-hook-events @@ -96,21 +96,20 @@ datastore-delete-unprocessed-hook-event datastore-list-agent-builds datastore-agent-for-build - datastore-count-build-allocation-plan-entries datastore-replace-build-allocation-plan - datastore-remove-build-from-allocation-plan datastore-count-allocated-builds datastore-agent-requested-systems datastore-update-agent-requested-systems datastore-fetch-build-to-allocate - datastore-insert-to-allocated-builds - datastore-remove-builds-from-plan - datastore-list-allocation-plan-builds)) + datastore-check-if-derivation-conflicts? + datastore-insert-to-allocated-builds)) (define-class <sqlite-datastore> (<abstract-datastore>) database-file worker-reader-thread-channel + worker-reader-thread-proc-vector worker-writer-thread-channel + worker-writer-thread-proc-vector metrics-registry) (define* (sqlite-datastore database-uri @@ -140,138 +139,129 @@ (slot-set! datastore 'database-file database-file) (slot-set! datastore 'metrics-registry metrics-registry) - (slot-set! - datastore - 'worker-writer-thread-channel - (make-worker-thread-channel - (lambda () - (let ((db - (db-open database-file))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - (sqlite-exec db "PRAGMA synchronous = NORMAL;") - (sqlite-exec db "PRAGMA temp_store = MEMORY;") - (sqlite-exec db "PRAGMA foreign_keys = ON;") - - (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;") - - (sqlite-exec - db - " -CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( - build_id INTEGER NOT NULL, - agent_id TEXT NOT NULL, - ordering INTEGER NOT NULL, - PRIMARY KEY (agent_id, build_id) -);") - - (list db))) - #:name "ds write" - #:destructor - (let ((writer-thread-destructor-counter - (make-gauge-metric metrics-registry - "datastore_writer_thread_close_total"))) - (lambda (db) - (db-optimize db - database-file - metrics-registry - #:maybe-truncate-wal? #f) - - (metric-increment writer-thread-destructor-counter) - (sqlite-close db))) - #:lifetime 500 - #:expire-on-exception? #t - - ;; SQLite doesn't support parallel writes - #:parallelism 1 - #:delay-logger (let ((delay-metric - (make-histogram-metric - metrics-registry - "datastore_write_delay_seconds"))) - (lambda (seconds-delayed) - (metric-observe delay-metric - ;; TODO exact->inexact to work around - ;; a bug in guile-prometheus where - ;; the metric sum will output in the - ;; exact form including the /q - (exact->inexact seconds-delayed)) - (log-delay "datastore write" seconds-delayed) - (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database write delayed by ~1,2f seconds~%" - seconds-delayed)))) - #:duration-logger - (lambda (duration proc) - (when (> duration 10) - (format - (current-error-port) - "warning: database write took ~1,2f seconds (~a)~%" - duration - proc))) - #:log-exception? worker-thread-log-exception?)) - - ;; Make sure the worker thread has initialised, and created the in memory - ;; tables - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) - (const #t)) - - (slot-set! - datastore - 'worker-reader-thread-channel - (make-worker-thread-channel - (lambda () - (let ((db - (db-open database-file #:write? #f))) - (sqlite-exec db "PRAGMA temp_store = MEMORY;") - (sqlite-exec db "PRAGMA busy_timeout = 4000;") - (sqlite-exec db "PRAGMA cache_size = -16000;") - - (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;") - - (list db))) - #:name "ds read" - #:destructor - (let ((reader-thread-destructor-counter - (make-gauge-metric metrics-registry - "datastore_reader_thread_close_total"))) - (lambda (db) - (metric-increment reader-thread-destructor-counter) - (sqlite-close db))) - #:lifetime 50000 - #:expire-on-exception? #t - - ;; Use a minimum of 8 and a maximum of 16 threads - #:parallelism - (min (max (current-processor-count) - 8) - 16) - #:delay-logger (let ((delay-metric - (make-histogram-metric + (let ((channel + proc-vector + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file))) + (sqlite-exec db "PRAGMA busy_timeout = 0;") + (sqlite-exec db "PRAGMA synchronous = NORMAL;") + (sqlite-exec db "PRAGMA temp_store = MEMORY;") + (sqlite-exec db "PRAGMA foreign_keys = ON;") + + (list db))) + #:name "ds write" + #:destructor + (let ((writer-thread-destructor-counter + (make-gauge-metric metrics-registry + "datastore_writer_thread_close_total"))) + (lambda (db) + (db-optimize db + database-file metrics-registry - "datastore_read_delay_seconds"))) - (lambda (seconds-delayed) - (metric-observe delay-metric - ;; TODO exact->inexact to work around - ;; a bug in guile-prometheus where - ;; the metric sum will output in the - ;; exact form including the /q - (exact->inexact seconds-delayed)) - (log-delay "datastore read" seconds-delayed) - (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database read delayed by ~1,2f seconds~%" - seconds-delayed)))) - #:duration-logger - (lambda (duration proc) - (when (> duration 30) - (format - (current-error-port) - "warning: database read took ~1,2f seconds (~a)~%" - duration - proc))) - #:log-exception? worker-thread-log-exception?)) + #:maybe-truncate-wal? #f) + + (metric-increment writer-thread-destructor-counter) + (sqlite-close db))) + #:lifetime 500 + #:expire-on-exception? #t + + ;; SQLite doesn't support parallel writes + #:parallelism 1 + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "datastore_write_delay_seconds"))) + (lambda (seconds-delayed) + (metric-observe delay-metric + ;; TODO exact->inexact to work around + ;; a bug in guile-prometheus where + ;; the metric sum will output in the + ;; exact form including the /q + (exact->inexact seconds-delayed)) + (log-delay "datastore write" seconds-delayed) + (when (> seconds-delayed 1) + (format + (current-error-port) + "warning: database write delayed by ~1,2f seconds~%" + seconds-delayed)))) + #:duration-logger + (lambda (duration proc) + (when (> duration 10) + (format + (current-error-port) + "warning: database write took ~1,2f seconds (~a)~%" + duration + proc))) + #:log-exception? worker-thread-log-exception?))) + + (slot-set! datastore + 'worker-writer-thread-channel + channel) + (slot-set! datastore + 'worker-writer-thread-proc-vector + proc-vector)) + + (let ((channel + proc-vector + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file #:write? #f))) + (sqlite-exec db "PRAGMA temp_store = MEMORY;") + (sqlite-exec db "PRAGMA busy_timeout = 0;") + (sqlite-exec db "PRAGMA cache_size = -16000;") + + (list db))) + #:name "ds read" + #:destructor + (let ((reader-thread-destructor-counter + (make-gauge-metric metrics-registry + "datastore_reader_thread_close_total"))) + (lambda (db) + (metric-increment reader-thread-destructor-counter) + (sqlite-close db))) + #:lifetime 50000 + #:expire-on-exception? #t + + ;; Use a minimum of 8 and a maximum of 16 threads + #:parallelism + (min (max (current-processor-count) + 8) + 16) + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "datastore_read_delay_seconds"))) + (lambda (seconds-delayed) + (metric-observe delay-metric + ;; TODO exact->inexact to work around + ;; a bug in guile-prometheus where + ;; the metric sum will output in the + ;; exact form including the /q + (exact->inexact seconds-delayed)) + (log-delay "datastore read" seconds-delayed) + (when (> seconds-delayed 1) + (format + (current-error-port) + "warning: database read delayed by ~1,2f seconds~%" + seconds-delayed)))) + #:duration-logger + (lambda (duration proc) + (when (> duration 30) + (format + (current-error-port) + "warning: database read took ~1,2f seconds (~a)~%" + duration + proc))) + #:log-exception? worker-thread-log-exception?))) + (slot-set! datastore + 'worker-reader-thread-channel + channel) + (slot-set! datastore + 'worker-reader-thread-proc-vector + proc-vector)) datastore)) @@ -305,11 +295,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( metrics-registry checkpoint-duration-metric-name (lambda () - (if (> (wal-size) extreme-wal-size-threshold) - ;; Since the WAL is really getting too big, wait for much longer - (sqlite-exec db "PRAGMA busy_timeout = 300000;") - (sqlite-exec db "PRAGMA busy_timeout = 20;")) - (let* ((statement (sqlite-prepare db @@ -331,8 +316,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( modified-page-count pages-moved-to-db) #t)))))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - result))) #t)) @@ -358,6 +341,25 @@ PRAGMA optimize;") (define-method (datastore-spawn-fibers (datastore <sqlite-datastore>)) + ;; Queue messages to the writer thread, so that they're handled in a first + ;; come first served manor + (slot-set! + datastore + 'worker-writer-thread-channel + (make-queueing-channel + (slot-ref datastore 'worker-writer-thread-channel))) + + (spawn-fiber + (lambda () + (while #t + (sleep 20) + (vector-for-each + (lambda (i proc) + (simple-format (current-error-port) + "reader thread ~A running: ~A\n" + i proc)) + (slot-ref datastore 'worker-reader-thread-proc-vector))))) + (spawn-fiber (lambda () (while #t @@ -1734,42 +1736,6 @@ WHERE build_id = :build_id" #t) rest)) -(define-method (datastore-remove-build-from-allocation-plan - (datastore <sqlite-datastore>) - uuid) - (define (update-build-allocation-plan-metrics) - (let ((allocation-plan-metric - (metrics-registry-fetch-metric - (slot-ref datastore 'metrics-registry) - "build_allocation_plan_total"))) - (for-each - (match-lambda - ((agent-id . count) - (metric-set allocation-plan-metric - count - #:label-values - `((agent_id . ,agent-id))))) - (datastore-count-build-allocation-plan-entries datastore)))) - - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (let ((statement (sqlite-prepare - db - " -DELETE FROM build_allocation_plan WHERE build_id = :build_id" - #:cache? #t))) - - (sqlite-bind-arguments - statement - #:build_id (db-find-build-id db uuid)) - - (sqlite-step-and-reset statement) - - (unless (= 0 (changes-count db)) - (update-build-allocation-plan-metrics))))) - #t) - (define-method (datastore-count-build-results (datastore <sqlite-datastore>)) (call-with-worker-thread @@ -3016,8 +2982,9 @@ ORDER BY priority DESC" builds))))) -(define-method (datastore-find-first-unallocated-deferred-build - (datastore <sqlite-datastore>)) +(define-method (datastore-find-deferred-build + (datastore <sqlite-datastore>) + select?) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) @@ -3032,25 +2999,29 @@ INNER JOIN derivations WHERE processed = 0 AND canceled = 0 AND deferred_until IS NOT NULL - AND builds.id NOT IN (SELECT build_id FROM build_allocation_plan) -ORDER BY deferred_until ASC -LIMIT 1" +ORDER BY deferred_until ASC" #:cache? #t))) - (match (sqlite-step-and-reset statement) - (#(uuid derivation_name priority created_at deferred_until) - `((uuid . ,uuid) - (derivation-name . ,derivation_name) - (priority . ,priority) - (created-at . ,(if (string? created_at) - (string->date created_at - "~Y-~m-~d ~H:~M:~S") - #f)) - (deferred-until . ,(if (string? deferred_until) - (string->date deferred_until - "~Y-~m-~d ~H:~M:~S") - #f)))) - (#f #f)))))) + (let loop ((row (sqlite-step statement))) + (match row + (#(uuid derivation_name priority created_at deferred_until) + (let ((res + (select? + `((uuid . ,uuid) + (derivation-name . ,derivation_name) + (priority . ,priority) + (created-at . ,(if (string? created_at) + (string->date created_at + "~Y-~m-~d ~H:~M:~S") + #f)) + (deferred-until . ,(if (string? deferred_until) + (string->date deferred_until + "~Y-~m-~d ~H:~M:~S") + #f)))))) + (if res + res + (loop (sqlite-step statement))))) + (#f #f))))))) (define-method (datastore-fetch-prioritised-unprocessed-builds (datastore <sqlite-datastore>)) @@ -3222,103 +3193,6 @@ DELETE FROM unprocessed_hook_events WHERE id = :id" (sqlite-step-and-reset statement)))) #t) -(define-method (datastore-count-build-allocation-plan-entries - (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (let ((statement - (sqlite-prepare - db - " -SELECT agent_id, COUNT(*) -FROM build_allocation_plan -GROUP BY agent_id" - #:cache? #t))) - - (let ((result - (sqlite-map - (match-lambda - (#(agent_id count) - (cons agent_id count))) - statement))) - (sqlite-reset statement) - - result))))) - -(define-method (datastore-replace-build-allocation-plan - (datastore <sqlite-datastore>) - planned-builds) - (define (clear-current-plan db) - (sqlite-exec - db - "DELETE FROM build_allocation_plan")) - - (define insert-sql - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (string-append - " -INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES " - (string-join - (map (match-lambda - ((build-uuid agent-id ordering) - (simple-format - #f - "('~A', '~A', ~A)" - (db-find-build-id db build-uuid) - agent-id - ordering))) - planned-builds) - ", ") - ";")))) - - (define (insert-new-plan db planned-builds) - (sqlite-exec - db - insert-sql)) - - (datastore-call-with-transaction - datastore - (lambda (db) - (clear-current-plan db) - (unless (null? planned-builds) - (insert-new-plan db planned-builds))) - #:duration-metric-name "replace_build_allocation_plan") - - (let* ((agent-ids - (map (lambda (agent-details) - (assq-ref agent-details 'uuid)) - (datastore-list-agents datastore))) - (counts-by-agent - (make-vector (length agent-ids) 0))) - (for-each - (match-lambda - ((_ agent-id _) - (let ((index (list-index (lambda (list-agent-id) - (string=? agent-id list-agent-id)) - agent-ids))) - (vector-set! counts-by-agent - index - (+ (vector-ref counts-by-agent - index) - 1))))) - planned-builds) - - (let ((metric - (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) - "build_allocation_plan_total"))) - (for-each - (lambda (index agent-id) - (metric-set metric - (vector-ref counts-by-agent index) - #:label-values - `((agent_id . ,agent-id)))) - (iota (length agent-ids)) - agent-ids))) - #t) - (define-method (datastore-count-allocated-builds (datastore <sqlite-datastore>)) (call-with-worker-thread/delay-logging @@ -3418,34 +3292,40 @@ INSERT INTO build_allocation_agent_requested_systems (agent_id, system_id) VALUE (define-method (datastore-fetch-build-to-allocate (datastore <sqlite-datastore>) - agent-id) - (datastore-call-with-transaction - datastore + build-id) + (call-with-worker-thread + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db - ;; This needs to guard against the plan being out of date " SELECT builds.uuid, derivations.id, derivations.name, unprocessed_builds_with_derived_priorities.derived_priority FROM builds -INNER JOIN build_allocation_plan - ON builds.id = build_allocation_plan.build_id INNER JOIN derivations ON builds.derivation_id = derivations.id -INNER JOIN build_allocation_agent_requested_systems - ON build_allocation_agent_requested_systems.agent_id = :agent_id - AND build_allocation_agent_requested_systems.system_id = derivations.system_id LEFT JOIN unprocessed_builds_with_derived_priorities ON unprocessed_builds_with_derived_priorities.build_id = builds.id -WHERE build_allocation_plan.agent_id = :agent_id +WHERE builds.uuid = :uuid AND builds.processed = 0 AND builds.canceled = 0 - AND builds.id NOT IN (SELECT build_id FROM allocated_builds) -ORDER BY build_allocation_plan.ordering ASC" - #:cache? #t)) - (output-conflicts-statement + AND builds.id NOT IN (SELECT build_id FROM allocated_builds)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:uuid build-id) + + (sqlite-step-and-reset statement))))) + +(define-method (datastore-check-if-derivation-conflicts? + (datastore <sqlite-datastore>) + agent-id + derivation-id) + (call-with-worker-thread + (slot-ref datastore 'worker-reader-thread-channel) + (lambda (db) + (let ((statement (sqlite-prepare db " @@ -3463,34 +3343,11 @@ WHERE build_derivation_outputs.derivation_id = :derivation_id allocated_builds_derivation_outputs.output_id" #:cache? #t))) - (define (get-build-to-allocate) - (let loop ((build-details (sqlite-step statement))) - (match build-details - (#f #f) - (#(uuid derivation-id derivation-name derived_priority) - - (sqlite-bind-arguments output-conflicts-statement - #:agent_id agent-id - #:derivation_id derivation-id) - - (if (eq? (sqlite-step-and-reset output-conflicts-statement) - #f) - `((uuid . ,uuid) - (derivation_name . ,derivation-name) - (derived_priority . ,derived_priority)) - (loop (sqlite-step statement))))))) - - (sqlite-bind-arguments - statement - #:agent_id agent-id) - - (let ((result (get-build-to-allocate))) - (sqlite-reset statement) - - result))) + (sqlite-bind-arguments statement + #:agent_id agent-id + #:derivation_id derivation-id) - #:readonly? #t - #:duration-metric-name "fetch_builds_to_allocate")) + (->bool (sqlite-step-and-reset statement)))))) (define-method (datastore-insert-to-allocated-builds (datastore <sqlite-datastore>) @@ -3515,25 +3372,6 @@ INSERT INTO allocated_builds (build_id, agent_id) VALUES " ", ") ";"))))) -(define-method (datastore-remove-builds-from-plan - (datastore <sqlite-datastore>) - build-uuids) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (sqlite-exec - db - (string-append - " -DELETE FROM build_allocation_plan -WHERE build_id IN (" - (string-join - (map (lambda (build-uuid) - (number->string (db-find-build-id db build-uuid))) - build-uuids) - ", ") - ")"))))) - (define-method (datastore-list-allocation-plan-builds (datastore <sqlite-datastore>) . |