(define-module (guix-build-coordinator datastore sqlite) #:use-module (oop goops) #:use-module (srfi srfi-1) #:use-module (srfi srfi-19) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (sqlite3) #:use-module (fibers) #:use-module (prometheus) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator datastore abstract) #:export (sqlite-datastore datastore-optimize datastore-spawn-fibers datastore-initialise-metrics! datastore-update-metrics! datastore-update datastore-call-with-transaction datastore-store-derivation datastore-list-related-derivations-with-no-build-for-outputs datastore-list-failed-builds-with-blocking-count datastore-list-builds-for-derivation-recursive-inputs datastore-insert-build datastore-cancel-build datastore-count-builds datastore-for-each-build datastore-find-build datastore-list-builds datastore-insert-build-tags datastore-fetch-build-tags datastore-find-build-result datastore-find-build-derivation-system datastore-count-builds-for-derivation datastore-count-build-results datastore-insert-build-result datastore-remove-build-allocation datastore-mark-build-as-processed datastore-delete-relevant-outputs-from-unbuilt-outputs datastore-store-output-metadata datastore-list-unbuilt-derivation-outputs datastore-list-build-outputs datastore-new-agent datastore-list-agents datastore-find-agent datastore-find-agent-by-name datastore-insert-dynamic-auth-token datastore-dynamic-auth-token-exists? datastore-fetch-agent-tags datastore-store-build-start datastore-find-build-starts datastore-count-setup-failures datastore-list-setup-failures-for-build datastore-fetch-setup-failures datastore-store-setup-failure datastore-store-setup-failure/missing-inputs datastore-list-setup-failure-missing-inputs datastore-find-derivation datastore-find-derivation-system datastore-find-derivation-inputs datastore-find-derivation-for-output datastore-find-derivation-outputs datastore-list-builds-for-output datastore-list-builds-for-output-and-system datastore-new-agent-password datastore-agent-password-exists? datastore-agent-list-passwords datastore-replace-agent-tags datastore-list-processed-builds datastore-list-unprocessed-builds datastore-find-first-unallocated-deferred-build datastore-fetch-build-ids-and-propagated-priorities-for-unprocessed-builds datastore-insert-unprocessed-hook-event datastore-count-unprocessed-hook-events datastore-list-unprocessed-hook-events datastore-delete-unprocessed-hook-event datastore-list-agent-builds datastore-agent-for-build datastore-count-build-allocation-plan-entries datastore-replace-build-allocation-plan datastore-remove-build-from-allocation-plan datastore-count-allocated-builds datastore-agent-requested-systems datastore-update-agent-requested-systems datastore-fetch-build-to-allocate datastore-insert-to-allocated-builds datastore-remove-builds-from-plan datastore-select-allocated-builds datastore-list-allocation-plan-builds)) (define-class () database-file worker-reader-thread-channel worker-writer-thread-channel metrics-registry) (define* (sqlite-datastore database-uri #:key update-database? metrics-registry) (define database-file (string-drop database-uri (string-length "sqlite://"))) (when update-database? (run-sqitch database-file)) (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA optimize;") (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") (sqlite-close db)) (let ((datastore (make ))) (slot-set! datastore 'database-file database-file) (slot-set! datastore 'metrics-registry metrics-registry) (slot-set! datastore 'worker-reader-thread-channel (make-worker-thread-channel (lambda () (let ((db (db-open database-file #:write? #f))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (list db))) #:destructor (let ((reader-thread-destructor-counter (make-gauge-metric metrics-registry "datastore_reader_thread_close_total"))) (lambda (db) (metric-increment reader-thread-destructor-counter) (sqlite-close db))) #:lifetime 50000 ;; Use a minimum of 2 and a maximum of 8 threads #:parallelism (min (max (current-processor-count) 2) 8) #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "datastore_read_delay_seconds"))) (lambda (seconds-delayed) (metric-observe delay-metric ;; TODO exact->inexact to work around ;; a bug in guile-prometheus where ;; the metric sum will output in the ;; exact form including the /q (exact->inexact seconds-delayed)) (when (> seconds-delayed 1) (format (current-error-port) "warning: database read delayed by ~1,2f seconds~%" seconds-delayed)))))) (slot-set! datastore 'worker-writer-thread-channel (make-worker-thread-channel (lambda () (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (sqlite-exec db "PRAGMA foreign_keys = ON;") (list db))) #:destructor (lambda (db) (db-optimize db database-file metrics-registry) (sqlite-close db)) #:lifetime 500 ;; SQLite doesn't support parallel writes #:parallelism 1 #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "datastore_write_delay_seconds"))) (lambda (seconds-delayed) (metric-observe delay-metric ;; TODO exact->inexact to work around ;; a bug in guile-prometheus where ;; the metric sum will output in the ;; exact form including the /q (exact->inexact seconds-delayed)) (when (> seconds-delayed 1) (format (current-error-port) "warning: database write delayed by ~1,2f seconds~%" seconds-delayed)))))) datastore)) (define (db-optimize db db-filename metrics-registry) (define (wal-size) (let ((db-wal-filename (string-append db-filename "-wal"))) (stat:size (stat db-wal-filename)))) (define MiB (* (expt 2 20) 1.)) (define wal-size-threshold (* 5 MiB)) (let ((checkpoint-duration-metric-name "datastore_wal_checkpoint_duration_seconds")) (when (> (wal-size) wal-size-threshold) (call-with-duration-metric metrics-registry checkpoint-duration-metric-name (lambda () (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")))) (call-with-duration-metric metrics-registry "datastore_optimize_duration_seconds" (lambda () (sqlite-exec db " PRAGMA analysis_limit=1000; PRAGMA optimize;"))))) (define-method (datastore-optimize (datastore )) (retry-on-error (lambda () (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (db-optimize db (slot-ref datastore 'database-file) (slot-ref datastore 'metrics-registry))))) #:times 5 #:delay 5)) (define-method (datastore-spawn-fibers (datastore )) (spawn-fiber (lambda () (while #t (sleep (* 60 5)) ; 5 minutes (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception when performing WAL checkpoint: ~A\n" exn)) (lambda () (datastore-optimize datastore)) #:unwind? #t))) #:parallel? #t)) (define-method (datastore-initialise-metrics! (datastore )) (define registry (slot-ref datastore 'metrics-registry)) (let ((builds-total (make-gauge-metric registry "builds_total" #:labels '(system))) (build-results-total (make-gauge-metric registry "build_results_total" #:labels '(agent_id result))) (setup-failures-total (make-gauge-metric registry "setup_failures_total" #:labels '(agent_id reason))) (build-allocation-plan-total (make-gauge-metric registry "build_allocation_plan_total" #:labels '(agent_id)))) (for-each (match-lambda ((system . count) (metric-set builds-total count #:label-values `((system . ,system))))) (datastore-count-builds datastore)) (for-each (match-lambda (((agent-id result) . count) (metric-set build-results-total count #:label-values `((agent_id . ,agent-id) (result . ,result))))) (datastore-count-build-results datastore)) (for-each (match-lambda (((agent-id reason) . count) (metric-set setup-failures-total count #:label-values `((agent_id . ,agent-id) (reason . ,reason))))) (datastore-count-setup-failures datastore)) (for-each (match-lambda ((agent-id . count) (metric-set build-allocation-plan-total count #:label-values `((agent_id . ,agent-id))))) (datastore-count-build-allocation-plan-entries datastore))) #t) (define-method (datastore-update-metrics! (datastore )) (let* ((db-filename (slot-ref datastore 'database-file)) (db-wal-filename (string-append db-filename "-wal")) (registry (slot-ref datastore 'metrics-registry)) (db-bytes (or (metrics-registry-fetch-metric registry "datastore_bytes") (make-gauge-metric registry "datastore_bytes" #:docstring "Size of the SQLite database file"))) (db-wal-bytes (or (metrics-registry-fetch-metric registry "datastore_wal_bytes") (make-gauge-metric registry "datastore_wal_bytes" #:docstring "Size of the SQLite Write Ahead Log file")))) (metric-set db-bytes (stat:size (stat db-filename))) (metric-set db-wal-bytes (stat:size (stat db-wal-filename)))) #t) (define (call-with-time-tracking datastore thing thunk) (define registry (slot-ref datastore 'metrics-registry)) (define metric-name (string-append "datastore_" thing "_duration_seconds")) (if registry (let* ((metric (or (metrics-registry-fetch-metric registry metric-name) (make-histogram-metric registry metric-name))) (start-time (get-internal-real-time))) (let ((result (thunk))) (metric-observe metric (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)) result)) (thunk))) (define %current-transaction-proc (make-parameter #f)) (define* (datastore-call-with-transaction datastore proc #:key readonly? duration-metric-name) (define (run-proc-within-transaction db) (if (%current-transaction-proc) (proc db) ; already in transaction (begin (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: sqlite rolling back transaction\n") (sqlite-exec db "ROLLBACK TRANSACTION;") (raise-exception exn)) (lambda () (call-with-values (lambda () (parameterize ((%current-transaction-proc proc)) (proc db))) (lambda vals (sqlite-exec db "COMMIT TRANSACTION;") (apply values vals)))))))) (call-with-worker-thread (slot-ref datastore (if readonly? 'worker-reader-thread-channel 'worker-writer-thread-channel)) (lambda (db) (if duration-metric-name (call-with-time-tracking datastore duration-metric-name (lambda () (run-proc-within-transaction db))) (run-proc-within-transaction db))))) (define-method (datastore-find-agent (datastore ) uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT description FROM agents WHERE id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id uuid) (let ((result (match (sqlite-map (match-lambda (#(description) `((description . ,description)))) statement) (() #f) ((agent) agent)))) (sqlite-reset statement) result))))) (define-method (datastore-find-agent-by-name (datastore ) name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id FROM agents WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name name) (let ((result (match (sqlite-map (match-lambda (#(id) id)) statement) (() #f) ((agent) agent)))) (sqlite-reset statement) result))))) (define-method (datastore-insert-dynamic-auth-token (datastore ) token) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " INSERT INTO dynamic_auth_tokens (token) VALUES (:token)" #:cache? #t))) (sqlite-bind-arguments statement #:token token) (sqlite-step statement) (sqlite-reset statement))))) (define-method (datastore-dynamic-auth-token-exists? (datastore ) token) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT 1 FROM dynamic_auth_tokens WHERE token = :token" #:cache? #t))) (sqlite-bind-arguments statement #:token token) (let ((result (match (sqlite-map (match-lambda (#(1) #t)) statement) ((#t) #t) (() #f)))) (sqlite-reset statement) result))))) (define-method (datastore-fetch-agent-tags (datastore ) agent-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT key, value FROM tags INNER JOIN agent_tags ON tags.id = agent_tags.tag_id WHERE agent_tags.agent_id = :agent_id" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((result (list->vector (sqlite-fold (lambda (row result) (match row (#(key value) `((,key . ,value) ,@result)))) '() statement)))) (sqlite-reset statement) result))))) (define-method (datastore-new-agent (datastore ) uuid name description) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-agent db uuid name description))) #t) (define-method (datastore-list-agents (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, description FROM agents ORDER BY id" #:cache? #t))) (let ((agents (sqlite-map (match-lambda (#(id description) `((uuid . ,id) (description . ,description)))) statement))) (sqlite-reset statement) agents))))) (define-method (datastore-new-agent-password (datastore ) agent-uuid password) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-agent-password db agent-uuid password))) #t) (define-method (datastore-agent-password-exists? (datastore ) uuid password) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT 1 FROM agent_passwords \ WHERE agent_id = :agent_id AND password = :password" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id uuid #:password password) (let ((result (match (sqlite-step statement) (#f #f) (#(1) #t)))) (sqlite-reset statement) result))))) (define-method (datastore-agent-list-passwords (datastore ) uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT password FROM agent_passwords WHERE agent_id = :agent_id" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id uuid) (let ((result (sqlite-map (match-lambda (#(password) password)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-replace-agent-tags (datastore ) agent-id tags) (datastore-call-with-transaction datastore (lambda (db) (let ((delete-agent-tags-statement (sqlite-prepare db " DELETE FROM agent_tags WHERE agent_id = :agent_id" #:cache? #t)) (insert-tag-statement (sqlite-prepare db " INSERT INTO tags (\"key\", \"value\") VALUES (:tagkey, :tagvalue)" #:cache? #t)) (find-tag-statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :tag_key AND value = :tag_value" #:cache? #t)) (agent-tags-statement (sqlite-prepare db " INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)" #:cache? #t))) (define (tag->id key value) (sqlite-bind-arguments find-tag-statement #:tag_key key #:tag_value value) (let ((result (match (sqlite-step find-tag-statement) (#(id) id) (#f (sqlite-bind-arguments insert-tag-statement #:tagkey key #:tagvalue value) (sqlite-step insert-tag-statement) (sqlite-reset insert-tag-statement) (last-insert-rowid db))))) (sqlite-reset find-tag-statement) result)) (define (insert-tag key value) (sqlite-bind-arguments agent-tags-statement #:agent_id agent-id #:tag_id (tag->id key value)) (sqlite-step agent-tags-statement) (sqlite-reset agent-tags-statement)) (sqlite-bind-arguments delete-agent-tags-statement #:agent_id agent-id) (sqlite-step delete-agent-tags-statement) (sqlite-reset delete-agent-tags-statement) (for-each (match-lambda ((('key . key) ('value . value)) (insert-tag key value)) ((key . value) (insert-tag key value))) (if (vector? tags) (vector->list tags) tags))))) #t) (define-method (datastore-store-derivation (datastore ) derivation) (datastore-call-with-transaction datastore (lambda (db) (insert-derivation-and-return-outputs db derivation)) #:duration-metric-name "store_derivation") #t) (define-method (datastore-list-related-derivations-with-no-build-for-outputs (datastore ) derivation) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " WITH RECURSIVE related_derivations(id) AS ( VALUES(:derivation_id) UNION SELECT derivation_outputs.derivation_id FROM derivation_outputs INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id INNER JOIN related_derivations ON related_derivations.id = derivation_inputs.derivation_id ) SELECT derivations.name FROM related_derivations INNER JOIN derivations ON related_derivations.id = derivations.id WHERE related_derivations.id != :derivation_id AND NOT EXISTS ( SELECT 1 FROM builds INNER JOIN derivation_outputs AS other_derivation_derivation_outputs ON other_derivation_derivation_outputs.derivation_id = builds.derivation_id INNER JOIN derivations AS other_derivations ON other_derivation_derivation_outputs.derivation_id = other_derivations.id INNER JOIN derivation_outputs ON derivation_outputs.output_id = other_derivation_derivation_outputs.output_id WHERE derivation_outputs.derivation_id = related_derivations.id AND other_derivations.system_id = derivations.system_id AND builds.canceled = 0 ) " #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation)) (let ((result (sqlite-map (match-lambda (#(derivation) derivation)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-failed-builds-with-blocking-count (datastore ) . args) (apply (lambda* (system #:key include-cancelled?) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db (string-append " SELECT * FROM ( SELECT builds.uuid, derivations.name, ( WITH RECURSIVE related_derivations(id) AS ( VALUES(builds.derivation_id) UNION SELECT derivation_inputs.derivation_id FROM derivation_outputs INNER JOIN related_derivations ON derivation_outputs.derivation_id = related_derivations.id INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id ) SELECT COUNT(DISTINCT blocked_builds.uuid) FROM related_derivations INNER JOIN builds AS blocked_builds ON related_derivations.id = blocked_builds.derivation_id AND blocked_builds.processed = 0 AND blocked_builds.canceled = 0 ) AS blocking_count FROM builds INNER JOIN derivations ON derivations.id = builds.derivation_id LEFT JOIN build_results ON builds.id = build_results.build_id WHERE ( ( builds.processed = 1 AND build_results.result = 'failure'" (if include-cancelled? " ) OR ( builds.canceled = 1" "") " ) )" (if system " AND derivations.system_id = :system_id" "") " AND NOT EXISTS ( SELECT 1 FROM derivation_outputs INNER JOIN derivation_outputs AS other_build_derivation_outputs ON derivation_outputs.output_id = other_build_derivation_outputs.output_id INNER JOIN builds AS other_builds ON other_build_derivation_outputs.derivation_id = other_builds.derivation_id INNER JOIN build_results AS other_build_results ON other_builds.id = other_build_results.build_id WHERE derivation_outputs.derivation_id = builds.derivation_id AND other_build_results.result = 'success' ) ) AS data WHERE blocking_count > 0 ORDER BY 3 DESC, 2, 1") #:cache? #t))) (when system (sqlite-bind-arguments statement #:system (db-system->system-id db system))) (let ((result (sqlite-map (match-lambda (#(uuid derivation-name blocked-count) `((uuid . ,uuid) (derivation_name . ,derivation-name) (blocked_count . ,blocked-count)))) statement))) (sqlite-reset statement) result))))) args)) (define-method (datastore-list-builds-for-derivation-recursive-inputs (datastore ) derivation) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " WITH RECURSIVE related_derivations(id) AS ( VALUES(:derivation_id) UNION SELECT derivation_outputs.derivation_id FROM derivation_outputs INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id INNER JOIN related_derivations ON related_derivations.id = derivation_inputs.derivation_id ) SELECT builds.uuid FROM builds INNER JOIN related_derivations ON related_derivations.id = builds.derivation_id" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation)) (let ((result (sqlite-map (match-lambda (#(uuid) uuid)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-insert-build-tags (datastore ) build-uuid tags) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((insert-tag-statement (sqlite-prepare db " INSERT INTO tags (\"key\", \"value\") VALUES (:tagkey, :tagvalue)" #:cache? #t)) (find-tag-statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :tag_key AND value = :tag_value" #:cache? #t)) (build-tags-statement (sqlite-prepare db " INSERT INTO build_tags (build_id, tag_id) VALUES (:build_id, :tag_id)" #:cache? #t))) (define (tag->id key value) (sqlite-bind-arguments find-tag-statement #:tag_key key #:tag_value value) (let ((result (match (sqlite-step find-tag-statement) (#(id) id) (#f (sqlite-bind-arguments insert-tag-statement #:tagkey key #:tagvalue value) (sqlite-step insert-tag-statement) (sqlite-reset insert-tag-statement) (last-insert-rowid db))))) (sqlite-reset find-tag-statement) result)) (for-each (match-lambda ((key . value) (sqlite-bind-arguments build-tags-statement #:build_id (db-find-build-id db build-uuid) #:tag_id (tag->id key value)) (sqlite-step build-tags-statement) (sqlite-reset build-tags-statement))) (if (vector? tags) (vector->list tags) tags))))) #t) (define-method (datastore-cancel-build (datastore ) uuid) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " UPDATE builds SET canceled = 1 WHERE uuid = :uuid" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid) (sqlite-step statement) (sqlite-reset statement)))) #t) (define-method (datastore-remove-build-from-allocation-plan (datastore ) uuid) (define (update-build-allocation-plan-metrics) (let ((allocation-plan-metric (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) "build_allocation_plan_total"))) (for-each (match-lambda ((agent-id . count) (metric-set allocation-plan-metric count #:label-values `((agent_id . ,agent-id))))) (datastore-count-build-allocation-plan-entries datastore)))) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " DELETE FROM build_allocation_plan WHERE build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db uuid)) (sqlite-step statement) (sqlite-reset statement) (unless (eq? 0 (changes-count db)) (update-build-allocation-plan-metrics))))) #t) (define-method (datastore-count-build-results (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, result, COUNT(*) FROM build_results GROUP BY agent_id, result" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id result count) (cons (list agent_id result) count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-insert-build-result (datastore ) build-uuid agent-id result failure-reason) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " INSERT INTO build_results ( build_id, agent_id, result, failure_reason ) VALUES (" (number->string (db-find-build-id db build-uuid)) ", '" agent-id "', '" result "', " (if failure-reason (string-append "'" failure-reason "'") "NULL") ")")))) #t) (define-method (datastore-remove-build-allocation (datastore ) build-uuid agent-id) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " DELETE FROM allocated_builds WHERE build_id = " (number->string (db-find-build-id db build-uuid)) " AND agent_id = '" agent-id "'")))) #t) (define-method (datastore-mark-build-as-processed (datastore ) build-uuid end-time) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " UPDATE builds SET processed = 1 " (if end-time (string-append ", end_time = '" end-time "'") "") " WHERE id = " (number->string (db-find-build-id db build-uuid)))))) #t) (define-method (datastore-delete-relevant-outputs-from-unbuilt-outputs (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " DELETE FROM unbuilt_outputs WHERE output_id IN ( SELECT derivation_outputs.output_id FROM derivation_outputs INNER JOIN builds ON builds.derivation_id = derivation_outputs.derivation_id WHERE builds.id = :build_id )" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (sqlite-step statement) (sqlite-reset statement) #t)))) (define-method (datastore-store-output-metadata (datastore ) build-uuid output-metadata) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (define (name->output-id name) (let ((statement (sqlite-prepare db " SELECT derivation_outputs.id FROM derivation_outputs INNER JOIN builds ON builds.derivation_id = derivation_outputs.derivation_id WHERE builds.uuid = :build_uuid AND derivation_outputs.name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:build_uuid build-uuid #:name name) (match (sqlite-step statement) (#(id) (sqlite-reset statement) id)))) (sqlite-exec db (string-append " INSERT INTO output_metadata (build_id, derivation_output_id, hash, size, store_references) VALUES " (string-join (map (lambda (output) (simple-format #f "('~A', ~A, '~A', ~A, '~A')" (db-find-build-id db build-uuid) (name->output-id (assoc-ref output "name")) (assoc-ref output "hash") (assoc-ref output "size") (string-join (vector->list (assoc-ref output "references")) " "))) output-metadata) ", "))) #t))) (define-method (datastore-store-build-start (datastore ) build-uuid agent-id) (define (handle-inserting-unprocessed-hook-event db build-uuid agent-id) (insert-unprocessed-hook-event db "build-started" (list build-uuid agent-id))) (datastore-call-with-transaction datastore (lambda (db) (sqlite-exec db (string-append " INSERT INTO build_starts ( build_id, agent_id, start_time ) VALUES (" (number->string (db-find-build-id db build-uuid)) ", '" agent-id "', " "datetime('now')" ")")) (handle-inserting-unprocessed-hook-event db build-uuid agent-id))) #t) (define-method (datastore-find-build-starts (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT start_time, agent_id FROM build_starts WHERE build_id = :build_id ORDER BY start_time DESC" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (sqlite-map (match-lambda (#(start_time agent_id) `((start-time . ,(match (strptime "%F %T" start_time) ((parts . _) parts))) (agent-id . ,agent_id)))) statement))) (sqlite-reset statement) result))))) (define (insert-setup-failure-and-remove-allocation db build-id agent-id failure-reason) (sqlite-exec db (string-append " DELETE FROM allocated_builds WHERE build_id = '" (number->string build-id) "' AND agent_id = '" agent-id "'")) (sqlite-exec db (string-append " INSERT INTO setup_failures ( build_id, agent_id, failure_reason ) VALUES ('" (number->string build-id) "', '" agent-id "', '" failure-reason "')")) (last-insert-rowid db)) (define-method (datastore-store-setup-failure/missing-inputs (datastore ) build-uuid agent-id missing-inputs) (define (insert-missing-inputs db setup-failure-id missing-inputs) (sqlite-exec db (string-append " INSERT INTO setup_failure_missing_inputs ( setup_failure_id, missing_input_store_path ) VALUES " (string-join (map (lambda (missing-input) (simple-format #f "(~A, '~A')" setup-failure-id missing-input)) missing-inputs) ", ")))) (define (handle-inserting-unprocessed-hook-event db build-uuid missing-inputs) (insert-unprocessed-hook-event db "build-missing-inputs" (list build-uuid missing-inputs))) (datastore-call-with-transaction datastore (lambda (db) (let* ((build-id (db-find-build-id db build-uuid)) (setup-failure-id (insert-setup-failure-and-remove-allocation db build-id agent-id "missing_inputs"))) (insert-missing-inputs db setup-failure-id missing-inputs)) ;; TODO This logic should be part of the coordinator, but it's here to be ;; inside the transaction (handle-inserting-unprocessed-hook-event db build-uuid missing-inputs)) #:duration-metric-name "store_setup_failure_missing_inputs") (metric-increment (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) "setup_failures_total") #:label-values `((agent_id . ,agent-id) (reason . "missing_inputs"))) #t) (define-method (datastore-list-setup-failure-missing-inputs (datastore ) setup-failure-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT missing_input_store_path FROM setup_failure_missing_inputs WHERE setup_failure_id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id setup-failure-id) (let ((result (sqlite-map (match-lambda (#(missing-input) missing-input)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-store-setup-failure (datastore ) build-uuid agent-id failure-reason) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-setup-failure-and-remove-allocation db (db-find-build-id db build-uuid) agent-id failure-reason) (metric-increment (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) "setup_failures_total") #:label-values `((agent_id . ,agent-id) (reason . ,failure-reason))))) #t) (define-method (datastore-count-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT derivations.system_id, COUNT(*) FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id GROUP BY derivations.system_id" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(system-id count) (cons (db-system-id->system db system-id) count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-for-each-build (datastore ) proc) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid FROM builds ORDER BY id" #:cache? #t))) (let loop ((row (sqlite-step statement))) (match row (#(uuid) (proc uuid) (loop (sqlite-step statement))) (#f (sqlite-reset statement) #t))))))) (define (db-find-build-id db uuid) (let ((statement (sqlite-prepare db " SELECT id FROM builds WHERE uuid = :uuid" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid) (let ((result (match (sqlite-step statement) (#f #f) (#(id) id)))) (sqlite-reset statement) result))) (define-method (datastore-find-build (datastore ) uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivations.name, priority, processed, canceled, created_at, end_time FROM builds INNER JOIN derivations ON derivations.id = builds.derivation_id WHERE uuid = :uuid" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid) (let ((result (match (sqlite-step statement) (#(uuid derivation_name priority processed canceled created_at end_time) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority) (processed . ,(cond ((= 0 processed) #f) ((= 1 processed) #t) (else (error "unknown processed value")))) (canceled . ,(cond ((= 0 canceled) #f) ((= 1 canceled) #t) (else (error "unknown canceled value")))) (created-at . ,(if (string? created_at) (match (strptime "%F %T" created_at) ((parts . _) parts)) #f)) (end-time . ,(if (string? end_time) (match (strptime "%F %T" end_time) ((parts . _) parts)) #f))))))) (sqlite-reset statement) result))))) (define-method (datastore-list-builds (datastore ) . rest) (define* (list-builds #:key (tags '()) (not-tags '()) (systems '()) (not-systems '()) (processed 'unset) (canceled 'unset) (after-id #f) (limit 1000)) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (define tag->expression (let ((statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :key AND value = :value" #:cache? #t)) (key-statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :key" #:cache? #t))) (lambda (tag not?) (match tag ((key . value) (sqlite-bind-arguments statement #:key key #:value value) (let ((result (match (sqlite-step statement) (#(id) (simple-format #f "tag_string ~A '%,~A,%'" (if not? "NOT LIKE" "LIKE") id)) (#f #f)))) (sqlite-reset statement) result)) (key (sqlite-bind-arguments key-statement #:key key) (let* ((tag-ids (sqlite-map (match-lambda (#(id) id)) key-statement)) (result (string-append "(" (string-join (map (lambda (id) (simple-format #f "tag_string ~A '%,~A,%'" (if not? "NOT LIKE" "LIKE") id)) tag-ids) (if not? " AND " " OR ")) ")"))) (sqlite-reset key-statement) result)))))) (let ((tag-expressions (map (lambda (tag) (tag->expression tag #f)) tags)) (not-tag-expressions (filter-map (lambda (tag) (tag->expression tag #t)) not-tags))) ;; If one of the requested tags doesn't exist, nothing can be tagged to ;; it, so just return nothing (if (memq #f tag-expressions) '() (let* ((where-needed? (or (not (null? tag-expressions)) (not (null? not-tag-expressions)) (not (null? systems)) (not (null? not-systems)) (not (eq? processed 'unset)) (not (eq? canceled 'unset)))) (statement (sqlite-prepare db (string-append " SELECT uuid, derivations.name, priority, processed, canceled, created_at, end_time FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id LEFT JOIN ( SELECT build_id, (',' || group_concat(tag_id) || ',') AS tag_string FROM build_tags GROUP BY build_id ) AS all_build_tags ON builds.id = all_build_tags.build_id " (if where-needed? (string-append "WHERE\n" (string-join (append (let ((all-tag-expressions (append tag-expressions not-tag-expressions))) (if (null? all-tag-expressions) '() all-tag-expressions)) (if (null? systems) '() (list (string-append "(" (string-join (map (lambda (system) (simple-format #f "derivations.system_id = ~A" (db-system->system-id db system))) systems) " OR ") ")"))) (map (lambda (system) (simple-format #f "derivations.system_id != ~A" (db-system->system-id db system))) not-systems) (cond ((eq? processed #t) '("processed = 1")) ((eq? processed #f) '("processed = 0")) (else '())) (cond ((eq? canceled #t) '("canceled = 1")) ((eq? canceled #f) '("canceled = 0")) (else '())) (if after-id '("uuid > :after_id") '())) " AND ") "\n") "") "ORDER BY uuid ASC\n" (if limit (string-append "LIMIT " (number->string limit) "\n") "")) #:cache? #f))) (when after-id (sqlite-bind-arguments statement #:after_id after-id)) (let ((result (sqlite-map (match-lambda (#(uuid derivation_name priority processed canceled created_at end_time) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority) (processed . ,(cond ((= 0 processed) #f) ((= 1 processed) #t) (else (error "unknown processed value")))) (canceled . ,(cond ((= 0 canceled) #f) ((= 1 canceled) #t) (else (error "unknown canceled value")))) (created-at . ,(if (string? created_at) (match (strptime "%F %T" created_at) ((parts . _) parts)) #f)) (end-time . ,(if (string? end_time) (match (strptime "%F %T" end_time) ((parts . _) parts)) #f))))) statement))) (sqlite-finalize statement) result))))))) (apply list-builds rest)) (define-method (datastore-fetch-build-tags (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT key, value FROM tags INNER JOIN build_tags ON tags.id = build_tags.tag_id WHERE build_tags.build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (list->vector (sqlite-fold (lambda (row result) (match row (#(key value) `((,key . ,value) ,@result)))) '() statement)))) (sqlite-reset statement) result))))) (define-method (datastore-find-build-result (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, result, failure_reason FROM build_results WHERE build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (match (sqlite-step statement) (#(agent_id result failure_reason) `((agent_id . ,agent_id) (result . ,result) (failure_reason . ,failure_reason))) (#f #f)))) (sqlite-reset statement) result))))) (define-method (datastore-find-build-derivation-system (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT systems.system FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id INNER JOIN systems ON derivations.system_id = systems.id WHERE builds.id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((system (match (sqlite-step statement) (#(system) system)))) (sqlite-reset statement) system))))) (define-method (datastore-list-builds-for-output (datastore ) output) (call-with-time-tracking datastore "list_builds_for_output" (lambda () (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivations.name, priority, processed, canceled, result FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id LEFT JOIN build_results ON builds.id = build_results.build_id WHERE derivation_outputs.output_id = :output_id" #:cache? #t))) (sqlite-bind-arguments statement #:output_id (db-output->output-id db output)) (let ((result (sqlite-map (match-lambda (#(uuid derivation priority processed canceled result) `((uuid . ,uuid) (derivation . ,derivation) (priority . ,priority) (processed . ,(cond ((= 0 processed) #f) ((= 1 processed) #t) (else (error "unknown value")))) (canceled . ,(cond ((= 0 canceled) #f) ((= 1 canceled) #t) (else (error "unknown value")))) (result . ,result)))) statement))) (sqlite-reset statement) result))))))) (define-method (datastore-list-builds-for-output-and-system (datastore ) . rest) (apply (lambda* (output system #:key include-canceled?) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db (string-append " SELECT uuid, derivations.name FROM builds INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE derivation_outputs.output_id = :output_id AND derivations.system_id = :system_id" (if include-canceled? "" " AND builds.canceled = 0")) #:cache? #t))) (sqlite-bind-arguments statement #:output_id (db-output->output-id db output) #:system_id (db-system->system-id db system)) (let ((result (sqlite-map (match-lambda (#(uuid derivation) `((uuid . ,uuid) (derivation . ,derivation)))) statement))) (sqlite-reset statement) result))))) rest)) (define-method (datastore-count-builds-for-derivation (datastore ) . rest) (apply (lambda* (derivation #:key (include-canceled? #t)) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db (string-append " SELECT COUNT(*) FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE derivations.name = :derivation" (if include-canceled? "" " AND canceled = 0")) #:cache? #t))) (sqlite-bind-arguments statement #:derivation derivation) (let ((result (match (sqlite-step statement) (#(x) x)))) (sqlite-reset statement) result))))) rest)) (define-method (datastore-update (datastore )) (run-sqitch (slot-ref datastore 'database-file)) #t) (define-method (datastore-count-setup-failures (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, failure_reason, COUNT(*) FROM setup_failures GROUP BY agent_id, failure_reason" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id failure_reason count) (cons (list agent_id failure_reason) count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-setup-failures-for-build (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, agent_id, failure_reason FROM setup_failures WHERE build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (sqlite-map (match-lambda (#(id agent-id failure-reason) `((id . ,id) (agent-id . ,agent-id) (failure-reason . ,failure-reason)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-fetch-setup-failures (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT setup_failures.id, builds.uuid, agent_id, failure_reason FROM setup_failures INNER JOIN builds ON builds.id = setup_failures.build_id WHERE builds.processed = 0 AND builds.canceled = 0 AND builds.id NOT IN ( SELECT build_id FROM allocated_builds )" #:cache? #t))) (let ((result (sqlite-fold (lambda (row result) (match row (#(id build-id agent-id failure-reason) (let ((failures-for-build-id (or (hash-ref result build-id) '()))) (hash-set! result build-id (cons `((id . ,id) (agent-id . ,agent-id) (failure-reason . ,failure-reason)) failures-for-build-id))))) result) (make-hash-table) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-processed-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivations.name, priority FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE processed = 1" #:cache? #t))) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name priority) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-list-unprocessed-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivations.name, priority FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE processed = 0 AND canceled = 0 AND ( deferred_until IS NULL OR deferred_until < datetime('now') ) AND builds.id NOT IN ( SELECT build_id FROM allocated_builds ) ORDER BY priority DESC" #:cache? #t))) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name priority) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-find-first-unallocated-deferred-build (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivations.name, priority, created_at, deferred_until FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE processed = 0 AND canceled = 0 AND deferred_until IS NOT NULL AND builds.id NOT IN (SELECT build_id FROM build_allocation_plan) ORDER BY deferred_until ASC LIMIT 1" #:cache? #t))) (let ((result (match (sqlite-step statement) (#(uuid derivation_name priority created_at deferred_until) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority) (created-at . ,(if (string? created_at) (string->date created_at "~Y-~m-~d ~H:~M:~S") #f)) (deferred-until . ,(if (string? deferred_until) (string->date deferred_until "~Y-~m-~d ~H:~M:~S") #f)))) (#f #f)))) (sqlite-reset statement) result))))) (define-method (datastore-fetch-build-ids-and-propagated-priorities-for-unprocessed-builds (datastore ) created-after) (define (populate-unprocessed-builds-table db) (sqlite-exec db " DROP TABLE IF EXISTS temp.unprocessed_builds") (let ((statement (sqlite-prepare db (string-append " CREATE TEMP TABLE unprocessed_builds AS SELECT id FROM builds WHERE processed = 0 AND canceled = 0 AND ( deferred_until IS NULL OR deferred_until < datetime('now') ) " (if created-after (simple-format #f "AND created_at >= ~A\n" created-after) "") " AND NOT EXISTS ( SELECT 1 FROM derivation_inputs INNER JOIN derivation_outputs ON derivation_inputs.derivation_output_id = derivation_outputs.id INNER JOIN unbuilt_outputs ON unbuilt_outputs.output_id = derivation_outputs.output_id WHERE builds.derivation_id = derivation_inputs.derivation_id ) EXCEPT SELECT build_id FROM allocated_builds ")))) (sqlite-step statement) (sqlite-finalize statement) #t)) (define (query-unprocessed-builds-table db) (let ((statement (sqlite-prepare db " SELECT builds.uuid FROM unprocessed_builds INNER JOIN builds ON builds.id = unprocessed_builds.id" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(uuid) uuid)) statement))) (sqlite-reset statement) result))) (define (fetch-propagated-priorities-for-unprocessed-builds db) (let ((statement (sqlite-prepare db " WITH RECURSIVE builds_with_derived_priority( id, derivation_id, derived_priority ) AS ( SELECT builds.id, builds.derivation_id, builds.priority FROM builds INNER JOIN unprocessed_builds ON builds.id = unprocessed_builds.id UNION SELECT builds.id, builds.derivation_id, max(builds.priority, builds_with_derived_priority.derived_priority) FROM builds_with_derived_priority INNER JOIN derivation_outputs ON builds_with_derived_priority.derivation_id = derivation_outputs.derivation_id INNER JOIN derivation_outputs AS all_derivation_outputs ON all_derivation_outputs.output_id = derivation_outputs.output_id INNER JOIN derivation_inputs ON derivation_inputs.derivation_output_id = all_derivation_outputs.id INNER JOIN builds ON builds.processed = 0 AND builds.derivation_id = derivation_inputs.derivation_id ) SELECT builds.uuid, MAX(derived_priority) FROM builds_with_derived_priority INNER JOIN builds ON builds.id = builds_with_derived_priority.id WHERE builds.processed = 0 GROUP BY builds.uuid" #:cache? #t))) (let ((result (sqlite-fold (lambda (row result) (match row (#(uuid derived-priority) (hash-set! result uuid derived-priority))) result) (make-hash-table 10000) statement))) (sqlite-reset statement) result))) (datastore-call-with-transaction datastore (lambda (db) (populate-unprocessed-builds-table db) (let ((propagated-priorities (fetch-propagated-priorities-for-unprocessed-builds db)) (unprocessed-build-ids (query-unprocessed-builds-table db))) (values propagated-priorities unprocessed-build-ids))) #:readonly? #t)) (define-method (datastore-insert-unprocessed-hook-event (datastore ) event arguments) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-unprocessed-hook-event db event arguments)))) (define (insert-unprocessed-hook-event db event arguments) (let ((statement (sqlite-prepare db " INSERT INTO unprocessed_hook_events (event, arguments) VALUES (:event, :arguments)" #:cache? #t))) (sqlite-bind-arguments statement #:event event #:arguments (call-with-output-string (lambda (port) (write arguments port)))) (sqlite-step statement) (sqlite-reset statement)) #t) (define-method (datastore-count-unprocessed-hook-events (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT event, COUNT(*) FROM unprocessed_hook_events GROUP BY event" #:cache? #t))) (let ((counts (sqlite-map (match-lambda (#(event count) `((event . ,event) (count . ,count)))) statement))) (sqlite-reset statement) counts))))) (define-method (datastore-list-unprocessed-hook-events (datastore ) event limit) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, event, arguments FROM unprocessed_hook_events WHERE event = :event LIMIT :limit" #:cache? #t))) (sqlite-bind-arguments statement #:event (symbol->string event) #:limit limit) (let ((events (sqlite-map (match-lambda (#(id event arguments) (list id (string->symbol event) (call-with-input-string arguments (lambda (port) (read port)))))) statement))) (sqlite-reset statement) events))))) (define-method (datastore-delete-unprocessed-hook-event (datastore ) id) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " DELETE FROM unprocessed_hook_events WHERE id = " (number->string id)))))) (define-method (datastore-count-build-allocation-plan-entries (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, COUNT(*) FROM build_allocation_plan GROUP BY agent_id" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id count) (cons agent_id count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-replace-build-allocation-plan (datastore ) planned-builds) (define (clear-current-plan db) (sqlite-exec db "DELETE FROM build_allocation_plan")) (define (insert-new-plan db planned-builds) (sqlite-exec db (string-append " INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES " (string-join (map (match-lambda ((build-uuid agent-id ordering) (simple-format #f "('~A', '~A', ~A)" (db-find-build-id db build-uuid) agent-id ordering))) planned-builds) ", ") ";"))) (datastore-call-with-transaction datastore (lambda (db) (clear-current-plan db) (unless (null? planned-builds) (insert-new-plan db planned-builds))) #:duration-metric-name "replace_build_allocation_plan") (let* ((agent-ids (map (lambda (agent-details) (assq-ref agent-details 'uuid)) (datastore-list-agents datastore))) (counts-by-agent (make-vector (length agent-ids) 0))) (for-each (match-lambda ((_ agent-id _) (let ((index (list-index (lambda (list-agent-id) (string=? agent-id list-agent-id)) agent-ids))) (vector-set! counts-by-agent index (+ (vector-ref counts-by-agent index) 1))))) planned-builds) (let ((metric (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) "build_allocation_plan_total"))) (for-each (lambda (index agent-id) (metric-set metric (vector-ref counts-by-agent index) #:label-values `((agent_id . ,agent-id)))) (iota (length agent-ids)) agent-ids))) #t) (define-method (datastore-count-allocated-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, COUNT(*) FROM allocated_builds GROUP BY agent_id" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id count) (cons agent_id count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-agent-requested-systems (datastore ) agent-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT system_id FROM build_allocation_agent_requested_systems WHERE agent_id = :agent_id ORDER BY system_id ASC" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((result (sqlite-map (match-lambda (#(system-id) (db-system-id->system db system-id))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-update-agent-requested-systems (datastore ) agent-id systems) (define update-not-needed? (equal? (sort systems stringsystem-id db system) (db-insert-system db system)))) systems) ", ") ";")) #t)))) (define-method (datastore-fetch-build-to-allocate (datastore ) agent-id) (datastore-call-with-transaction datastore (lambda (db) (let ((statement (sqlite-prepare db ;; This needs to guard against the plan being out of date " SELECT builds.uuid, derivations.name FROM builds INNER JOIN build_allocation_plan ON builds.id = build_allocation_plan.build_id INNER JOIN derivations ON builds.derivation_id = derivations.id INNER JOIN build_allocation_agent_requested_systems ON build_allocation_agent_requested_systems.agent_id = :agent_id AND build_allocation_agent_requested_systems.system_id = derivations.system_id WHERE build_allocation_plan.agent_id = :agent_id AND builds.processed = 0 AND builds.canceled = 0 AND builds.id NOT IN (SELECT build_id FROM allocated_builds) ORDER BY build_allocation_plan.ordering DESC" #:cache? #t)) (output-conflicts-statement (sqlite-prepare db " SELECT 1 FROM derivation_outputs AS build_derivation_outputs INNER JOIN allocated_builds ON allocated_builds.agent_id = :agent_id INNER JOIN builds AS allocated_build_details ON allocated_build_details.id = allocated_builds.build_id INNER JOIN derivation_outputs AS allocated_builds_derivation_outputs ON allocated_build_details.derivation_id = allocated_builds_derivation_outputs.derivation_id WHERE build_derivation_outputs.derivation_id = :derivation_id AND build_derivation_outputs.output_id = allocated_builds_derivation_outputs.output_id" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((builds (sqlite-fold (lambda (row result) (cons (match row (#f #f) (#(uuid derivation_name) `((uuid . ,uuid) ;; TODO Switch this to derivation_name (derivation-name . ,derivation_name)))) result)) '() statement))) (sqlite-reset statement) (find (lambda (build-details) (sqlite-bind-arguments output-conflicts-statement #:derivation_id (db-find-derivation-id db (assq-ref build-details 'derivation_name))) (let ((result (sqlite-step output-conflicts-statement))) (sqlite-reset output-conflicts-statement) (match result (#f #t) (_ #f)))) builds)))) #:readonly? #t)) (define-method (datastore-insert-to-allocated-builds (datastore ) agent-id build-uuids) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " INSERT INTO allocated_builds (build_id, agent_id) VALUES " (string-join (map (lambda (build-uuid) (simple-format #f "(~A, '~A')" (db-find-build-id db build-uuid) agent-id)) build-uuids) ", ") ";"))))) (define-method (datastore-remove-builds-from-plan (datastore ) build-uuids) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " DELETE FROM build_allocation_plan WHERE build_id IN (" (string-join (map (lambda (build-uuid) (number->string (db-find-build-id db build-uuid))) build-uuids) ", ") ")"))))) (define-method (datastore-select-allocated-builds (datastore ) agent-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT builds.uuid, derivations.name FROM allocated_builds INNER JOIN builds ON allocated_builds.build_id = builds.id INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE agent_id = :agent_id" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((result (sqlite-fold (lambda (row result) (cons (match row (#(uuid derivation_name) `((uuid . ,uuid) ;; TODO Switch this to derivation_name (derivation-name . ,derivation_name)))) result)) '() statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-allocation-plan-builds (datastore ) agent-id limit) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db ;; This needs to guard against the plan being out of date " SELECT builds.uuid, derivations.name FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id INNER JOIN build_allocation_plan ON builds.id = build_allocation_plan.build_id WHERE build_allocation_plan.agent_id = :agent_id AND builds.processed = 0 AND builds.id NOT IN (SELECT build_id FROM allocated_builds) ORDER BY build_allocation_plan.ordering ASC LIMIT :limit" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id #:limit limit) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name) `((uuid . ,uuid) (derivation-name . ,derivation_name)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-list-agent-builds (datastore ) agent-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT builds.uuid, derivations.name, builds.priority FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id INNER JOIN allocated_builds ON builds.id = allocated_builds.build_id WHERE allocated_builds.agent_id = :agent_id" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name priority) `((uuid . ,uuid) ;; TODO Switch this to derivation_name (derivation-name . ,derivation_name) (priority . ,priority)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-agent-for-build (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id FROM allocated_builds WHERE allocated_builds.build_id = :build_id UNION SELECT agent_id FROM build_results WHERE build_results.build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (match (sqlite-step statement) (#(agent-id) agent-id) (#f #f)))) (sqlite-reset statement) result))))) (define* (db-open database #:key (write? #t)) (define flags (list (if write? SQLITE_OPEN_READWRITE SQLITE_OPEN_READONLY) SQLITE_OPEN_NOMUTEX)) (unless (file-exists? database) (run-sqitch database)) (sqlite-open database (apply logior flags))) (define (run-sqitch database-file) (let ((command (list (%config 'sqitch) "deploy" "--db-client" (%config 'sqitch-sqlite) ;; if sqitch.conf exists (which it should when developing), ;; just use the current directory as the chdir value. Otherwise ;; use the directory which should contain the right files after ;; installation. "--chdir" (if (file-exists? "sqitch.conf") (getcwd) (string-append (dirname (%config 'sqitch-plan)) "/sqlite")) "--plan-file" (%config 'sqitch-plan) "--registry" (string-append (canonicalize-path (dirname database-file)) "/" (basename (if (string-suffix? ".db" database-file) (string-drop-right database-file 3) database-file)) "_sqitch_registry.db") (string-append "db:sqlite:" database-file)))) (simple-format #t "running command: ~A\n" (string-join command)) (unless (zero? (apply system* command)) (simple-format (current-error-port) "error: sqitch command failed\n") (exit 1)))) (define (changes-count db) (let ((statement (sqlite-prepare db "SELECT changes();" #:cache? #t))) (let ((count (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) count))) (define (last-insert-rowid db) (let ((statement (sqlite-prepare db "SELECT last_insert_rowid();" #:cache? #t))) (let ((id (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) id))) (define (select-derivation-outputs db derivation-name) (let ((statement (sqlite-prepare db " SELECT name, id FROM derivation_outputs WHERE derivation_id = :derivation_id" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation-name)) (let ((outputs (sqlite-map (match-lambda (#(name output-id) (cons name output-id))) statement))) (sqlite-reset statement) outputs))) (define (db-find-derivation-id db name) (let ((statement (sqlite-prepare db " SELECT id FROM derivations WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name name) (let ((result (match (sqlite-step statement) (#f #f) (#(id) id)))) (sqlite-reset statement) result))) (define (db-find-derivation db name) (let ((statement (sqlite-prepare db " SELECT systems.system, fixed_output FROM derivations INNER JOIN systems ON systems.id = derivations.system_id WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name name) (let ((result (match (sqlite-step statement) (#f #f) (#(system fixed_output) `((system . ,system) (fixed-output? . ,(cond ((eq? fixed_output 0) #f) ((eq? fixed_output 1) #t) (else fixed_output)))))))) (sqlite-reset statement) result))) (define-method (datastore-find-derivation (datastore ) name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (db-find-derivation db name)))) (define-method (datastore-find-derivation-outputs (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT name, outputs.output FROM derivation_outputs INNER JOIN outputs ON derivation_outputs.output_id = outputs.id WHERE derivation_id = :derivation_id" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation-name)) (let ((result (sqlite-map (match-lambda (#(name output) `((name . ,name) (output . ,output)))) statement))) (sqlite-reset statement) (if (null? result) #f result)))))) (define-method (datastore-list-unbuilt-derivation-outputs (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT derivation_outputs.name, derivation_outputs.output_id FROM derivation_outputs INNER JOIN unbuilt_outputs ON derivation_outputs.output_id = unbuilt_outputs.output_id WHERE derivation_id = :derivation_id" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation-name)) (let ((result (sqlite-map (match-lambda (#(name output) `((name . ,name) (output . ,output)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-build-outputs (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT name, output, hash, size, store_references FROM builds INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id LEFT JOIN output_metadata ON output_metadata.derivation_output_id = derivation_outputs.id AND output_metadata.build_id = builds.id WHERE builds.id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (sqlite-map (match-lambda (#(name output hash size store_references) `((name . ,name) (output . ,output) (hash . ,hash) (size . ,size) (references . ,(and store_references (list->vector (string-split store_references #\space))))))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-find-derivation-system (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT systems.system FROM derivations INNER JOIN systems ON systems.id = derivations.system_id WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name derivation-name) (let ((system (match (sqlite-step statement) (#f #f) (#(system) system)))) (sqlite-reset statement) system))))) (define-method (datastore-find-derivation-inputs (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT output_derivations.name, derivation_outputs.name, outputs.output FROM derivations INNER JOIN derivation_inputs ON derivation_inputs.derivation_id = derivations.id INNER JOIN derivation_outputs ON derivation_inputs.derivation_output_id = derivation_outputs.id INNER JOIN derivations AS output_derivations ON output_derivations.id = derivation_outputs.derivation_id INNER JOIN outputs ON derivation_outputs.output_id = outputs.id WHERE derivations.id = :derivation_id" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation-name)) (let ((result (sqlite-map (match-lambda (#(derivation output-name output) `((derivation . ,derivation) (output_name . ,output-name) (output . ,output)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-find-derivation-for-output (datastore ) start-derivation-name output) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " WITH RECURSIVE related_derivations(id) AS ( VALUES(:derivation_id) UNION SELECT derivation_outputs.derivation_id FROM derivation_outputs INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id INNER JOIN related_derivations ON related_derivations.id = derivation_inputs.derivation_id ) SELECT derivations.name FROM related_derivations INNER JOIN derivations ON derivations.id = related_derivations.id INNER JOIN derivation_outputs ON related_derivations.id = derivation_outputs.derivation_id WHERE output = :output " #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db start-derivation-name) #:output output) (let ((result (match (sqlite-step statement) (#f #f) (#(derivation) derivation)))) (sqlite-reset statement) result))))) (define (db-insert-system db system) (let ((statement (sqlite-prepare db " INSERT INTO systems (system) VALUES (:system)" #:cache? #t))) (sqlite-bind-arguments statement #:system system) (sqlite-step statement) (let ((id (last-insert-rowid db))) (sqlite-reset statement) id))) (define (db-system->system-id db system) (let ((statement (sqlite-prepare db " SELECT id FROM systems WHERE system = :system" #:cache? #t))) (sqlite-bind-arguments statement #:system system) (match (sqlite-step statement) (#f #f) (#(id) (sqlite-reset statement) id)))) (define (db-system-id->system db system-id) (let ((statement (sqlite-prepare db " SELECT system FROM systems WHERE id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id system-id) (match (sqlite-step statement) (#f #f) (#(id) (sqlite-reset statement) id)))) (define (insert-derivation-and-return-outputs db derivation) (define derivation-name (derivation-file-name derivation)) (define (insert-derivation) (let ((derivation-details (db-find-derivation db derivation-name)) (fixed-output? (fixed-output-derivation? derivation))) (if derivation-details (begin (unless (equal? (assq-ref derivation-details 'fixed-output?) fixed-output?) (sqlite-exec db (simple-format #f " UPDATE derivations SET fixed_output = ~A WHERE name = '~A'" (if fixed-output? 1 0) derivation-name))) 0) (let ((statement (sqlite-prepare db " INSERT OR IGNORE INTO derivations (name, system_id, fixed_output) VALUES (:name, :system_id, :fixed_output)" #:cache? #t))) (sqlite-bind-arguments statement #:name derivation-name #:system_id (let ((system (derivation-system derivation))) (or (db-system->system-id db system) (db-insert-system db system))) #:fixed_output (if fixed-output? 1 0)) (sqlite-step statement) (sqlite-reset statement) (changes-count db))))) (let ((changes (insert-derivation))) (unless (eq? changes 0) (insert-derivation-inputs db derivation-name (derivation-inputs derivation)) (insert-derivation-outputs db derivation-name (derivation-outputs derivation))) (select-derivation-outputs db derivation-name))) (define (insert-derivation-inputs db derivation-name derivation-inputs) (unless (null? derivation-inputs) (let ((derivation-output-ids (append-map (lambda (derivation-input) (let ((output-ids-by-name (insert-derivation-and-return-outputs db (derivation-input-derivation derivation-input)))) (map (lambda (output-name) (assoc-ref output-ids-by-name output-name)) (derivation-input-sub-derivations derivation-input)))) derivation-inputs))) (sqlite-exec db (string-append " INSERT INTO derivation_inputs (derivation_id, derivation_output_id) VALUES " (string-join (map (lambda (derivation-output-id) (simple-format #f "('~A', ~A)" (db-find-derivation-id db derivation-name) derivation-output-id)) derivation-output-ids) ", ") ";"))))) (define (db-insert-output db output) (let ((statement (sqlite-prepare db " INSERT INTO outputs (output) VALUES (:output)" #:cache? #t))) (sqlite-bind-arguments statement #:output output) (sqlite-step statement) (let ((id (last-insert-rowid db))) (sqlite-reset statement) id))) (define (db-output->output-id db output) (let ((statement (sqlite-prepare db " SELECT id FROM outputs WHERE output = :output" #:cache? #t))) (sqlite-bind-arguments statement #:output output) (match (sqlite-step statement) (#f #f) (#(id) (sqlite-reset statement) id)))) (define (insert-derivation-outputs db derivation-name derivation-outputs) (define output-has-successful-build? (let ((statement (sqlite-prepare db " SELECT build_results.result FROM derivation_outputs INNER JOIN builds ON builds.derivation_id = derivation_outputs.derivation_id INNER JOIN build_results ON builds.id = build_results.build_id WHERE derivation_outputs.output_id = :output_id" #:cache? #t))) (lambda (output-id) (sqlite-bind-arguments statement #:output_id output-id) (let* ((build-results (sqlite-map (match-lambda (#(result) result)) statement)) (result (if (member "success" build-results) #t #f))) (sqlite-reset statement) result)))) (define insert-into-unbuilt-outputs (let ((statement (sqlite-prepare db " INSERT OR IGNORE INTO unbuilt_outputs (output_id) VALUES (:output_id)" #:cache? #t))) (lambda (output-id) (sqlite-bind-arguments statement #:output_id output-id) (sqlite-step statement) (sqlite-reset statement) #t))) (let ((derivation-outputs-with-ids (map (match-lambda ((name . derivation-output) (let ((output (derivation-output-path derivation-output))) (cons name (or (db-output->output-id db output) (db-insert-output db output)))))) derivation-outputs))) (sqlite-exec db (string-append " INSERT INTO derivation_outputs (derivation_id, name, output_id) VALUES " (string-join (map (match-lambda ((name . output-id) (simple-format #f "('~A', '~A', ~A)" (db-find-derivation-id db derivation-name) name output-id))) derivation-outputs-with-ids) ", ") ";")) (for-each (lambda (output-id) (unless (output-has-successful-build? output-id) (insert-into-unbuilt-outputs output-id))) (map cdr derivation-outputs-with-ids))) #t) (define-method (datastore-insert-build (datastore ) uuid derivation-name priority defer-until) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " INSERT INTO builds (uuid, derivation_id, priority, created_at, deferred_until) VALUES (:uuid, :derivation_id, :priority, datetime('now'), :deferred_until)" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid #:derivation_id (db-find-derivation-id db derivation-name) #:priority priority #:deferred_until (and=> defer-until (lambda (date) (date->string date "~1 ~3")))) (sqlite-step statement) (sqlite-reset statement)))) #t) (define (insert-agent db uuid name description) (let ((statement (sqlite-prepare db " INSERT INTO agents (id, name, description) VALUES (:id, :name, :description)" #:cache? #t))) (sqlite-bind-arguments statement #:id uuid #:name name #:description description) (sqlite-step statement) (sqlite-reset statement))) (define (insert-agent-password db uuid password) (let ((statement (sqlite-prepare db " INSERT INTO agent_passwords (agent_id, password) VALUES (:agent_id, :password)" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id uuid #:password password) (sqlite-step statement) (sqlite-reset statement)))