(define-module (guix-build-coordinator datastore sqlite) #:use-module (oop goops) #:use-module (srfi srfi-1) #:use-module (srfi srfi-19) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (sqlite3) #:use-module (fibers) #:use-module (prometheus) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator datastore abstract) #:export (sqlite-datastore datastore-optimize datastore-spawn-fibers datastore-initialise-metrics! datastore-update-metrics! datastore-update datastore-call-with-transaction datastore-store-derivation datastore-build-exists-for-derivation-outputs? datastore-list-related-derivations-with-no-build-for-outputs datastore-list-failed-builds-with-blocking-count datastore-list-builds-for-derivation-recursive-inputs datastore-insert-build datastore-cancel-build datastore-count-builds datastore-for-each-build datastore-find-build datastore-list-builds datastore-insert-build-tags datastore-fetch-build-tags datastore-find-build-result datastore-find-build-derivation-system datastore-count-builds-for-derivation datastore-count-build-results datastore-insert-build-result datastore-update-unprocessed-builds-for-build-success datastore-remove-build-allocation datastore-mark-build-as-processed datastore-delete-relevant-outputs-from-unbuilt-outputs datastore-store-output-metadata datastore-list-unbuilt-derivation-outputs datastore-list-build-outputs datastore-new-agent datastore-list-agents datastore-set-agent-active datastore-find-agent datastore-find-agent-by-name datastore-insert-dynamic-auth-token datastore-dynamic-auth-token-exists? datastore-fetch-agent-tags datastore-store-build-start datastore-find-build-starts datastore-count-setup-failures datastore-list-setup-failures-for-build datastore-fetch-setup-failures datastore-store-setup-failure datastore-store-setup-failure/missing-inputs datastore-list-setup-failure-missing-inputs datastore-find-derivation datastore-find-derivation-system datastore-find-derivation-inputs datastore-find-derivation-for-output datastore-find-derivation-outputs datastore-list-builds-for-output datastore-list-builds-for-output-and-system datastore-new-agent-password datastore-agent-password-exists? datastore-agent-list-passwords datastore-replace-agent-tags datastore-list-processed-builds datastore-list-unprocessed-builds datastore-find-first-unallocated-deferred-build datastore-fetch-prioritised-unprocessed-builds datastore-insert-unprocessed-hook-event datastore-count-unprocessed-hook-events datastore-list-unprocessed-hook-events datastore-delete-unprocessed-hook-event datastore-list-agent-builds datastore-agent-for-build datastore-count-build-allocation-plan-entries datastore-replace-build-allocation-plan datastore-remove-build-from-allocation-plan datastore-count-allocated-builds datastore-agent-requested-systems datastore-update-agent-requested-systems datastore-fetch-build-to-allocate datastore-insert-to-allocated-builds datastore-remove-builds-from-plan datastore-select-allocated-builds datastore-list-allocation-plan-builds)) (define-class () database-file worker-reader-thread-channel worker-writer-thread-channel metrics-registry) (define* (sqlite-datastore database-uri #:key update-database? metrics-registry) (define database-file (string-drop database-uri (string-length "sqlite://"))) (when update-database? (run-sqitch database-file)) (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA optimize;") (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") (sqlite-close db)) (let ((datastore (make ))) (slot-set! datastore 'database-file database-file) (slot-set! datastore 'metrics-registry metrics-registry) (slot-set! datastore 'worker-writer-thread-channel (make-worker-thread-channel (lambda () (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (sqlite-exec db "PRAGMA synchronous = NORMAL;") (sqlite-exec db "PRAGMA temp_store = MEMORY;") (sqlite-exec db "PRAGMA foreign_keys = ON;") (list db))) #:destructor (lambda (db) (db-optimize db database-file metrics-registry) (sqlite-close db)) #:lifetime 500 ;; SQLite doesn't support parallel writes #:parallelism 1 #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "datastore_write_delay_seconds"))) (lambda (seconds-delayed) (metric-observe delay-metric ;; TODO exact->inexact to work around ;; a bug in guile-prometheus where ;; the metric sum will output in the ;; exact form including the /q (exact->inexact seconds-delayed)) (log-delay "datastore write" seconds-delayed) (when (> seconds-delayed 1) (format (current-error-port) "warning: database write delayed by ~1,2f seconds~%" seconds-delayed)))))) (slot-set! datastore 'worker-reader-thread-channel (make-worker-thread-channel (lambda () (let ((db (db-open database-file #:write? #f))) (sqlite-exec db "PRAGMA temp_store = MEMORY;") (sqlite-exec db "PRAGMA busy_timeout = 5000;") (sqlite-exec db "PRAGMA cache_size = -16000;") (list db))) #:destructor (let ((reader-thread-destructor-counter (make-gauge-metric metrics-registry "datastore_reader_thread_close_total"))) (lambda (db) (metric-increment reader-thread-destructor-counter) (sqlite-close db))) #:lifetime 50000 ;; Use a minimum of 2 and a maximum of 8 threads #:parallelism (min (max (current-processor-count) 2) 8) #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "datastore_read_delay_seconds"))) (lambda (seconds-delayed) (metric-observe delay-metric ;; TODO exact->inexact to work around ;; a bug in guile-prometheus where ;; the metric sum will output in the ;; exact form including the /q (exact->inexact seconds-delayed)) (log-delay "datastore read" seconds-delayed) (when (> seconds-delayed 1) (format (current-error-port) "warning: database read delayed by ~1,2f seconds~%" seconds-delayed)))))) datastore)) (define (db-optimize db db-filename metrics-registry) (define (wal-size) (let ((db-wal-filename (string-append db-filename "-wal"))) (stat:size (stat db-wal-filename)))) (define MiB (* (expt 2 20) 1.)) (define wal-size-threshold (* 5 MiB)) (let ((checkpoint-duration-metric-name "datastore_wal_checkpoint_duration_seconds")) (when (> (wal-size) wal-size-threshold) (call-with-duration-metric metrics-registry checkpoint-duration-metric-name (lambda () (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")))) (call-with-duration-metric metrics-registry "datastore_optimize_duration_seconds" (lambda () (sqlite-exec db " PRAGMA analysis_limit=1000; PRAGMA optimize;"))))) (define-method (datastore-optimize (datastore )) (retry-on-error (lambda () (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (db-optimize db (slot-ref datastore 'database-file) (slot-ref datastore 'metrics-registry))))) #:times 5 #:delay 5)) (define-method (datastore-spawn-fibers (datastore )) (spawn-fiber (lambda () (while #t (sleep (* 60 5)) ; 5 minutes (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception when performing WAL checkpoint: ~A\n" exn)) (lambda () (datastore-optimize datastore)) #:unwind? #t))) #:parallel? #t)) (define-method (datastore-initialise-metrics! (datastore )) (define registry (slot-ref datastore 'metrics-registry)) (let ((builds-total (make-gauge-metric registry "builds_total" #:labels '(system))) (build-results-total (make-gauge-metric registry "build_results_total" #:labels '(agent_id result))) (setup-failures-total (make-gauge-metric registry "setup_failures_total" #:labels '(agent_id reason)))) (letpar& ((build-counts (datastore-count-builds datastore)) (build-result-counts (datastore-count-build-results datastore)) (setup-failure-counts (datastore-count-setup-failures datastore))) (for-each (match-lambda ((system . count) (metric-set builds-total count #:label-values `((system . ,system))))) build-counts) (for-each (match-lambda (((agent-id result) . count) (metric-set build-results-total count #:label-values `((agent_id . ,agent-id) (result . ,result))))) build-result-counts) (for-each (match-lambda (((agent-id reason) . count) (metric-set setup-failures-total count #:label-values `((agent_id . ,agent-id) (reason . ,reason))))) setup-failure-counts))) #t) (define-method (datastore-update-metrics! (datastore )) (let* ((db-filename (slot-ref datastore 'database-file)) (db-wal-filename (string-append db-filename "-wal")) (registry (slot-ref datastore 'metrics-registry)) (db-bytes (or (metrics-registry-fetch-metric registry "datastore_bytes") (make-gauge-metric registry "datastore_bytes" #:docstring "Size of the SQLite database file"))) (db-wal-bytes (or (metrics-registry-fetch-metric registry "datastore_wal_bytes") (make-gauge-metric registry "datastore_wal_bytes" #:docstring "Size of the SQLite Write Ahead Log file")))) (metric-set db-bytes (stat:size (stat db-filename))) (metric-set db-wal-bytes (stat:size (stat db-wal-filename)))) #t) (define (call-with-time-tracking datastore thing thunk) (define registry (slot-ref datastore 'metrics-registry)) (define metric-name (string-append "datastore_" thing "_duration_seconds")) (if registry (let* ((metric (or (metrics-registry-fetch-metric registry metric-name) (make-histogram-metric registry metric-name))) (start-time (get-internal-real-time))) (let ((result (thunk))) (metric-observe metric (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)) result)) (thunk))) (define (call-with-worker-thread/delay-logging channel proc) (call-with-worker-thread channel proc #:duration-logger (lambda (duration) (log-delay proc duration)))) (define %current-transaction-proc (make-parameter #f)) (define* (datastore-call-with-transaction datastore proc #:key readonly? duration-metric-name) (define (run-proc-within-transaction db) (if (%current-transaction-proc) (proc db) ; already in transaction (begin (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: sqlite rolling back transaction\n") (sqlite-exec db "ROLLBACK TRANSACTION;") (raise-exception exn)) (lambda () (call-with-values (lambda () (parameterize ((%current-transaction-proc proc)) (call-with-delay-logging proc #:args (list db)))) (lambda vals (sqlite-exec db "COMMIT TRANSACTION;") (apply values vals)))))))) (match (call-with-worker-thread (slot-ref datastore (if readonly? 'worker-reader-thread-channel 'worker-writer-thread-channel)) (lambda (db) (let ((start-time (get-internal-real-time))) (call-with-values (lambda () (if duration-metric-name (call-with-time-tracking datastore duration-metric-name (lambda () (run-proc-within-transaction db))) (run-proc-within-transaction db))) (lambda vals (let ((duration-seconds (/ (- (get-internal-real-time) start-time) internal-time-units-per-second))) (when (and (not readonly?) (> duration-seconds 2)) (display (format #f "warning: ~a:\n took ~4f seconds in transaction\n" proc duration-seconds) (current-error-port))) (cons duration-seconds vals))))))) ((duration vals ...) (log-delay proc duration) (apply values vals)))) (define-method (datastore-find-agent (datastore ) uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT description FROM agents WHERE id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id uuid) (let ((result (match (sqlite-map (match-lambda (#(description) `((description . ,description)))) statement) (() #f) ((agent) agent)))) (sqlite-reset statement) result))))) (define-method (datastore-find-agent-by-name (datastore ) name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id FROM agents WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name name) (let ((result (match (sqlite-map (match-lambda (#(id) id)) statement) (() #f) ((agent) agent)))) (sqlite-reset statement) result))))) (define-method (datastore-insert-dynamic-auth-token (datastore ) token) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " INSERT INTO dynamic_auth_tokens (token) VALUES (:token)" #:cache? #t))) (sqlite-bind-arguments statement #:token token) (sqlite-step statement) (sqlite-reset statement))))) (define-method (datastore-dynamic-auth-token-exists? (datastore ) token) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT 1 FROM dynamic_auth_tokens WHERE token = :token" #:cache? #t))) (sqlite-bind-arguments statement #:token token) (let ((result (match (sqlite-map (match-lambda (#(1) #t)) statement) ((#t) #t) (() #f)))) (sqlite-reset statement) result))))) (define-method (datastore-fetch-agent-tags (datastore ) agent-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT key, value FROM tags INNER JOIN agent_tags ON tags.id = agent_tags.tag_id WHERE agent_tags.agent_id = :agent_id" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((result (list->vector (sqlite-fold (lambda (row result) (match row (#(key value) `((,key . ,value) ,@result)))) '() statement)))) (sqlite-reset statement) result))))) (define-method (datastore-new-agent (datastore ) uuid name description) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-agent db uuid name description))) #t) (define-method (datastore-list-agents (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, name, description, active FROM agents ORDER BY id" #:cache? #t))) (let ((agents (sqlite-map (match-lambda (#(id name description active) `((uuid . ,id) (name . ,name) (description . ,description) (active . ,(eq? active 1))))) statement))) (sqlite-reset statement) agents))))) (define-method (datastore-set-agent-active (datastore ) agent-uuid active?) (unless (boolean? active?) (error "datastore-set-agent-active called with non-boolean")) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " UPDATE agents SET active = :active WHERE id = :uuid" #:cache? #f))) (sqlite-bind-arguments statement #:uuid agent-uuid #:active (if active? 1 0)) (sqlite-step statement) (sqlite-finalize statement)))) active?) (define-method (datastore-new-agent-password (datastore ) agent-uuid password) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-agent-password db agent-uuid password))) #t) (define-method (datastore-agent-password-exists? (datastore ) uuid password) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT 1 FROM agent_passwords \ WHERE agent_id = :agent_id AND password = :password" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id uuid #:password password) (let ((result (match (sqlite-step statement) (#f #f) (#(1) #t)))) (sqlite-reset statement) result))))) (define-method (datastore-agent-list-passwords (datastore ) uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT password FROM agent_passwords WHERE agent_id = :agent_id" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id uuid) (let ((result (sqlite-map (match-lambda (#(password) password)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-replace-agent-tags (datastore ) agent-id tags) (datastore-call-with-transaction datastore (lambda (db) (let ((delete-agent-tags-statement (sqlite-prepare db " DELETE FROM agent_tags WHERE agent_id = :agent_id" #:cache? #t)) (insert-tag-statement (sqlite-prepare db " INSERT INTO tags (\"key\", \"value\") VALUES (:tagkey, :tagvalue)" #:cache? #t)) (find-tag-statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :tag_key AND value = :tag_value" #:cache? #t)) (agent-tags-statement (sqlite-prepare db " INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)" #:cache? #t))) (define (tag->id key value) (sqlite-bind-arguments find-tag-statement #:tag_key key #:tag_value value) (let ((result (match (sqlite-step find-tag-statement) (#(id) id) (#f (sqlite-bind-arguments insert-tag-statement #:tagkey key #:tagvalue value) (sqlite-step insert-tag-statement) (sqlite-reset insert-tag-statement) (last-insert-rowid db))))) (sqlite-reset find-tag-statement) result)) (define (insert-tag key value) (sqlite-bind-arguments agent-tags-statement #:agent_id agent-id #:tag_id (tag->id key value)) (sqlite-step agent-tags-statement) (sqlite-reset agent-tags-statement)) (sqlite-bind-arguments delete-agent-tags-statement #:agent_id agent-id) (sqlite-step delete-agent-tags-statement) (sqlite-reset delete-agent-tags-statement) (for-each (match-lambda ((('key . key) ('value . value)) (insert-tag key value)) ((key . value) (insert-tag key value))) (if (vector? tags) (vector->list tags) tags))))) #t) (define %derivation-outputs-cache (make-hash-table)) (define-method (datastore-store-derivation (datastore ) derivation) (datastore-call-with-transaction datastore (lambda (db) (insert-derivation-and-return-outputs db derivation) (hash-clear! %derivation-outputs-cache)) #:duration-metric-name "store_derivation") #t) (define-method (datastore-build-exists-for-derivation-outputs? (datastore ) derivation) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT 1 FROM derivation_outputs INNER JOIN derivation_outputs AS all_derivation_outputs ON all_derivation_outputs.output_id = derivation_outputs.output_id INNER JOIN derivations ON derivations.id = all_derivation_outputs.derivation_id INNER JOIN builds ON builds.derivation_id = derivations.id WHERE derivation_outputs.derivation_id = :derivation_id AND (SELECT system_id FROM derivations WHERE id = :derivation_id) = derivations.system_id AND builds.canceled = 0 " #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation)) (let ((result (sqlite-step statement))) (sqlite-reset statement) (if result #t #f)))))) (define-method (datastore-list-related-derivations-with-no-build-for-outputs (datastore ) derivation) (define (get-input-derivations-with-no-builds db derivation-id) (let ((statement (sqlite-prepare db " SELECT derivations.id FROM derivation_inputs INNER JOIN derivation_outputs ON derivation_inputs.derivation_output_id = derivation_outputs.id INNER JOIN derivations ON derivations.id = derivation_outputs.derivation_id WHERE derivation_inputs.derivation_id = :derivation_id AND NOT EXISTS ( SELECT 1 FROM builds INNER JOIN derivation_outputs AS other_derivation_derivation_outputs ON other_derivation_derivation_outputs.derivation_id = builds.derivation_id INNER JOIN derivations AS other_derivations ON other_derivation_derivation_outputs.derivation_id = other_derivations.id INNER JOIN derivation_outputs AS all_other_derivation_derivation_outputs ON all_other_derivation_derivation_outputs.output_id = other_derivation_derivation_outputs.output_id WHERE all_other_derivation_derivation_outputs.derivation_id = derivations.id AND other_derivations.system_id = derivations.system_id AND builds.canceled = 0 ) " #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id derivation-id) (let ((result (sqlite-map (match-lambda (#(derivation-id) derivation-id)) statement))) (sqlite-reset statement) result))) (define (get-derivation-name db derivation-id) (let ((statement (sqlite-prepare db " SELECT name FROM derivations WHERE id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id derivation-id) (let ((result (match (sqlite-step statement) (#(name) name)))) (sqlite-reset statement) result))) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let loop ((derivation-ids (list (db-find-derivation-id db derivation))) (result '())) (let ((new-ids (delete-duplicates! (append-map! (lambda (derivation-id) (get-input-derivations-with-no-builds db derivation-id)) derivation-ids)))) (if (null? new-ids) (map (lambda (derivation-id) (get-derivation-name db derivation-id)) (delete-duplicates! result)) (loop new-ids (append! result new-ids)))))))) (define-method (datastore-list-failed-builds-with-blocking-count (datastore ) . args) (apply (lambda* (system #:key include-cancelled?) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db (string-append " SELECT * FROM ( SELECT builds.uuid, derivations.name, ( WITH RECURSIVE related_derivations(id) AS ( VALUES(builds.derivation_id) UNION SELECT derivation_inputs.derivation_id FROM derivation_outputs INNER JOIN related_derivations ON derivation_outputs.derivation_id = related_derivations.id INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id ) SELECT COUNT(DISTINCT blocked_builds.uuid) FROM related_derivations INNER JOIN builds AS blocked_builds ON related_derivations.id = blocked_builds.derivation_id AND blocked_builds.processed = 0 AND blocked_builds.canceled = 0 ) AS blocking_count FROM builds INNER JOIN derivations ON derivations.id = builds.derivation_id LEFT JOIN build_results ON builds.id = build_results.build_id WHERE ( ( builds.processed = 1 AND build_results.result = 'failure'" (if include-cancelled? " ) OR ( builds.canceled = 1" "") " ) )" (if system " AND derivations.system_id = :system_id" "") " AND NOT EXISTS ( SELECT 1 FROM derivation_outputs INNER JOIN derivation_outputs AS other_build_derivation_outputs ON derivation_outputs.output_id = other_build_derivation_outputs.output_id INNER JOIN builds AS other_builds ON other_build_derivation_outputs.derivation_id = other_builds.derivation_id INNER JOIN build_results AS other_build_results ON other_builds.id = other_build_results.build_id WHERE derivation_outputs.derivation_id = builds.derivation_id AND other_build_results.result = 'success' ) ) AS data WHERE blocking_count > 0 ORDER BY 3 DESC, 2, 1") #:cache? #t))) (when system (sqlite-bind-arguments statement #:system (db-system->system-id db system))) (let ((result (sqlite-map (match-lambda (#(uuid derivation-name blocked-count) `((uuid . ,uuid) (derivation_name . ,derivation-name) (blocked_count . ,blocked-count)))) statement))) (sqlite-reset statement) result))))) args)) (define-method (datastore-list-builds-for-derivation-recursive-inputs (datastore ) derivation) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " WITH RECURSIVE related_derivations(id) AS ( VALUES(:derivation_id) UNION SELECT derivation_outputs.derivation_id FROM derivation_outputs INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id INNER JOIN related_derivations ON related_derivations.id = derivation_inputs.derivation_id ) SELECT builds.uuid FROM builds INNER JOIN related_derivations ON related_derivations.id = builds.derivation_id" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation)) (let ((result (sqlite-map (match-lambda (#(uuid) uuid)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-insert-build-tags (datastore ) build-uuid tags) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((insert-tag-statement (sqlite-prepare db " INSERT INTO tags (\"key\", \"value\") VALUES (:tagkey, :tagvalue)" #:cache? #t)) (find-tag-statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :tag_key AND value = :tag_value" #:cache? #t)) (build-tags-statement (sqlite-prepare db " INSERT INTO build_tags (build_id, tag_id) VALUES (:build_id, :tag_id)" #:cache? #t))) (define (tag->id key value) (sqlite-bind-arguments find-tag-statement #:tag_key key #:tag_value value) (let ((result (match (sqlite-step find-tag-statement) (#(id) id) (#f (sqlite-bind-arguments insert-tag-statement #:tagkey key #:tagvalue value) (sqlite-step insert-tag-statement) (sqlite-reset insert-tag-statement) (last-insert-rowid db))))) (sqlite-reset find-tag-statement) result)) (for-each (match-lambda ((key . value) (sqlite-bind-arguments build-tags-statement #:build_id (db-find-build-id db build-uuid) #:tag_id (tag->id key value)) (sqlite-step build-tags-statement) (sqlite-reset build-tags-statement))) (if (vector? tags) (vector->list tags) tags))))) #t) (define-method (datastore-cancel-build (datastore ) uuid) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " UPDATE builds SET canceled = 1 WHERE uuid = :uuid" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid) (sqlite-step statement) (sqlite-reset statement)) (let ((statement (sqlite-prepare db " DELETE FROM unprocessed_builds_with_derived_priorities WHERE build_id = ( SELECT id FROM builds WHERE uuid = :uuid )" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid) (sqlite-step statement) (sqlite-reset statement)))) #t) (define-method (datastore-remove-build-from-allocation-plan (datastore ) uuid) (define (update-build-allocation-plan-metrics) (let ((allocation-plan-metric (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) "build_allocation_plan_total"))) (for-each (match-lambda ((agent-id . count) (metric-set allocation-plan-metric count #:label-values `((agent_id . ,agent-id))))) (datastore-count-build-allocation-plan-entries datastore)))) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " DELETE FROM build_allocation_plan WHERE build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db uuid)) (sqlite-step statement) (sqlite-reset statement) (unless (eq? 0 (changes-count db)) (update-build-allocation-plan-metrics))))) #t) (define-method (datastore-count-build-results (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, result, COUNT(*) FROM build_results GROUP BY agent_id, result" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id result count) (cons (list agent_id result) count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-insert-build-result (datastore ) build-uuid agent-id result failure-reason) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " INSERT INTO build_results ( build_id, agent_id, result, failure_reason ) VALUES (" (number->string (db-find-build-id db build-uuid)) ", '" agent-id "', '" result "', " (if failure-reason (string-append "'" failure-reason "'") "NULL") ")")))) #t) (define-method (datastore-update-unprocessed-builds-for-build-success (datastore ) build-uuid) (define (all-inputs-built? db build-id) (let ((statement (sqlite-prepare db " SELECT 1 FROM derivation_inputs INNER JOIN derivation_outputs ON derivation_inputs.derivation_output_id = derivation_outputs.id INNER JOIN unbuilt_outputs ON unbuilt_outputs.output_id = derivation_outputs.output_id INNER JOIN builds ON builds.derivation_id = derivation_inputs.derivation_id WHERE builds.id = :build_id LIMIT 1" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id) (let ((result (match (sqlite-step statement) (#f #t) (#(1) #f)))) (sqlite-reset statement) result))) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((builds-statement (sqlite-prepare db " SELECT DISTINCT unprocessed_builds.id FROM builds INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id INNER JOIN derivation_outputs AS all_derivation_outputs ON all_derivation_outputs.output_id = derivation_outputs.output_id INNER JOIN derivation_inputs ON derivation_inputs.derivation_output_id = all_derivation_outputs.id INNER JOIN builds AS unprocessed_builds ON unprocessed_builds.processed = 0 AND unprocessed_builds.derivation_id = derivation_inputs.derivation_id INNER JOIN unprocessed_builds_with_derived_priorities ON unprocessed_builds_with_derived_priorities.build_id = unprocessed_builds.id AND unprocessed_builds_with_derived_priorities.all_inputs_built = 0 WHERE builds.id = :build_id" #:cache? #t)) (update-statement (sqlite-prepare db " UPDATE unprocessed_builds_with_derived_priorities SET all_inputs_built = 1 WHERE build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments builds-statement #:build_id (db-find-build-id db build-uuid)) (sqlite-fold (lambda (row result) (match row (#(build-id) (when (all-inputs-built? db build-id) (sqlite-bind-arguments update-statement #:build_id build-id) (sqlite-step update-statement) (sqlite-reset update-statement)))) #f) #f builds-statement) (sqlite-reset builds-statement))))) (define-method (datastore-remove-build-allocation (datastore ) build-uuid agent-id) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " DELETE FROM allocated_builds WHERE build_id = " (number->string (db-find-build-id db build-uuid)) " AND agent_id = '" agent-id "'")))) #t) (define-method (datastore-mark-build-as-processed (datastore ) build-uuid end-time) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " UPDATE builds SET processed = 1 " (if end-time (string-append ", end_time = '" end-time "'") "") " WHERE id = " (number->string (db-find-build-id db build-uuid)))) (let ((statement (sqlite-prepare db " DELETE FROM unprocessed_builds_with_derived_priorities WHERE build_id = ( SELECT id FROM builds WHERE uuid = :uuid )" #:cache? #t))) (sqlite-bind-arguments statement #:uuid build-uuid) (sqlite-step statement) (sqlite-reset statement)))) #t) (define-method (datastore-delete-relevant-outputs-from-unbuilt-outputs (datastore ) build-uuid) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " DELETE FROM unbuilt_outputs WHERE output_id IN ( SELECT derivation_outputs.output_id FROM derivation_outputs INNER JOIN builds ON builds.derivation_id = derivation_outputs.derivation_id WHERE builds.id = :build_id )" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (sqlite-step statement) (sqlite-reset statement) #t)))) (define-method (datastore-store-output-metadata (datastore ) build-uuid output-metadata) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (define (name->output-id name) (let ((statement (sqlite-prepare db " SELECT derivation_outputs.id FROM derivation_outputs INNER JOIN builds ON builds.derivation_id = derivation_outputs.derivation_id WHERE builds.uuid = :build_uuid AND derivation_outputs.name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:build_uuid build-uuid #:name name) (match (sqlite-step statement) (#(id) (sqlite-reset statement) id)))) (sqlite-exec db (string-append " INSERT INTO output_metadata (build_id, derivation_output_id, hash, size, store_references) VALUES " (string-join (map (lambda (output) (simple-format #f "('~A', ~A, '~A', ~A, '~A')" (db-find-build-id db build-uuid) (name->output-id (assoc-ref output "name")) (assoc-ref output "hash") (assoc-ref output "size") (string-join (vector->list (assoc-ref output "references")) " "))) output-metadata) ", "))) #t))) (define-method (datastore-store-build-start (datastore ) build-uuid agent-id) (define (handle-inserting-unprocessed-hook-event db build-uuid agent-id) (insert-unprocessed-hook-event db "build-started" (list build-uuid agent-id))) (datastore-call-with-transaction datastore (lambda (db) (sqlite-exec db (string-append " INSERT INTO build_starts ( build_id, agent_id, start_time ) VALUES (" (number->string (db-find-build-id db build-uuid)) ", '" agent-id "', " "datetime('now')" ")")) (handle-inserting-unprocessed-hook-event db build-uuid agent-id))) #t) (define-method (datastore-find-build-starts (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT start_time, agent_id FROM build_starts WHERE build_id = :build_id ORDER BY start_time DESC" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (sqlite-map (match-lambda (#(start_time agent_id) `((start-time . ,(match (strptime "%F %T" start_time) ((parts . _) parts))) (agent-id . ,agent_id)))) statement))) (sqlite-reset statement) result))))) (define (insert-setup-failure-and-remove-allocation db build-id agent-id failure-reason) (sqlite-exec db (string-append " DELETE FROM allocated_builds WHERE build_id = '" (number->string build-id) "' AND agent_id = '" agent-id "'")) (sqlite-exec db (string-append " INSERT INTO setup_failures ( build_id, agent_id, failure_reason ) VALUES ('" (number->string build-id) "', '" agent-id "', '" failure-reason "')")) (last-insert-rowid db)) (define-method (datastore-store-setup-failure/missing-inputs (datastore ) build-uuid agent-id missing-inputs) (define (insert-missing-inputs db setup-failure-id missing-inputs) (sqlite-exec db (string-append " INSERT INTO setup_failure_missing_inputs ( setup_failure_id, missing_input_store_path ) VALUES " (string-join (map (lambda (missing-input) (simple-format #f "(~A, '~A')" setup-failure-id missing-input)) missing-inputs) ", ")))) (define (handle-inserting-unprocessed-hook-event db build-uuid missing-inputs) (insert-unprocessed-hook-event db "build-missing-inputs" (list build-uuid missing-inputs))) (datastore-call-with-transaction datastore (lambda (db) (let* ((build-id (db-find-build-id db build-uuid)) (setup-failure-id (insert-setup-failure-and-remove-allocation db build-id agent-id "missing_inputs"))) (insert-missing-inputs db setup-failure-id missing-inputs)) ;; TODO This logic should be part of the coordinator, but it's here to be ;; inside the transaction (handle-inserting-unprocessed-hook-event db build-uuid missing-inputs)) #:duration-metric-name "store_setup_failure_missing_inputs") (metric-increment (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) "setup_failures_total") #:label-values `((agent_id . ,agent-id) (reason . "missing_inputs"))) #t) (define-method (datastore-list-setup-failure-missing-inputs (datastore ) setup-failure-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT missing_input_store_path FROM setup_failure_missing_inputs WHERE setup_failure_id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id setup-failure-id) (let ((result (sqlite-map (match-lambda (#(missing-input) missing-input)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-store-setup-failure (datastore ) build-uuid agent-id failure-reason) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-setup-failure-and-remove-allocation db (db-find-build-id db build-uuid) agent-id failure-reason) (metric-increment (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) "setup_failures_total") #:label-values `((agent_id . ,agent-id) (reason . ,failure-reason))))) #t) (define-method (datastore-count-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT derivations.system_id, COUNT(*) FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id GROUP BY derivations.system_id" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(system-id count) (cons (db-system-id->system db system-id) count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-for-each-build (datastore ) proc) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid FROM builds ORDER BY id" #:cache? #t))) (let loop ((row (sqlite-step statement))) (match row (#(uuid) (proc uuid) (loop (sqlite-step statement))) (#f (sqlite-reset statement) #t))))))) (define (db-find-build-id db uuid) (let ((statement (sqlite-prepare db " SELECT id FROM builds WHERE uuid = :uuid" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid) (let ((result (match (sqlite-step statement) (#f #f) (#(id) id)))) (sqlite-reset statement) result))) (define-method (datastore-find-build (datastore ) uuid) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivations.name, priority, processed, canceled, created_at, end_time FROM builds INNER JOIN derivations ON derivations.id = builds.derivation_id WHERE uuid = :uuid" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid) (let ((result (match (sqlite-step statement) (#(uuid derivation_name priority processed canceled created_at end_time) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority) (processed . ,(cond ((= 0 processed) #f) ((= 1 processed) #t) (else (error "unknown processed value")))) (canceled . ,(cond ((= 0 canceled) #f) ((= 1 canceled) #t) (else (error "unknown canceled value")))) (created-at . ,(if (string? created_at) (match (strptime "%F %T" created_at) ((parts . _) parts)) #f)) (end-time . ,(if (string? end_time) (match (strptime "%F %T" end_time) ((parts . _) parts)) #f))))))) (sqlite-reset statement) result))))) (define-method (datastore-list-builds (datastore ) . rest) (define* (list-builds #:key (tags '()) (not-tags '()) (systems '()) (not-systems '()) (processed 'unset) (canceled 'unset) (after-id #f) (limit 1000)) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (define tag->expression (let ((statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :key AND value = :value" #:cache? #t)) (key-statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :key" #:cache? #t))) (lambda (tag not?) (match tag ((key . value) (sqlite-bind-arguments statement #:key key #:value value) (let ((result (match (sqlite-step statement) (#(id) (simple-format #f "tag_string ~A '%,~A,%'" (if not? "NOT LIKE" "LIKE") id)) (#f #f)))) (sqlite-reset statement) result)) (key (sqlite-bind-arguments key-statement #:key key) (let* ((tag-ids (sqlite-map (match-lambda (#(id) id)) key-statement)) (result (string-append "(" (string-join (map (lambda (id) (simple-format #f "tag_string ~A '%,~A,%'" (if not? "NOT LIKE" "LIKE") id)) tag-ids) (if not? " AND " " OR ")) ")"))) (sqlite-reset key-statement) result)))))) (let ((tag-expressions (map (lambda (tag) (tag->expression tag #f)) tags)) (not-tag-expressions (filter-map (lambda (tag) (tag->expression tag #t)) not-tags))) ;; If one of the requested tags doesn't exist, nothing can be tagged to ;; it, so just return nothing (if (memq #f tag-expressions) '() (let* ((where-needed? (or (not (null? tag-expressions)) (not (null? not-tag-expressions)) (not (null? systems)) (not (null? not-systems)) (not (eq? processed 'unset)) (not (eq? canceled 'unset)))) (statement (sqlite-prepare db (string-append " SELECT uuid, derivations.name, priority, processed, canceled, created_at, end_time FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id LEFT JOIN ( SELECT build_id, (',' || group_concat(tag_id) || ',') AS tag_string FROM build_tags GROUP BY build_id ) AS all_build_tags ON builds.id = all_build_tags.build_id " (if where-needed? (string-append "WHERE\n" (string-join (append (let ((all-tag-expressions (append tag-expressions not-tag-expressions))) (if (null? all-tag-expressions) '() all-tag-expressions)) (if (null? systems) '() (list (string-append "(" (string-join (map (lambda (system) (simple-format #f "derivations.system_id = ~A" (db-system->system-id db system))) systems) " OR ") ")"))) (map (lambda (system) (simple-format #f "derivations.system_id != ~A" (db-system->system-id db system))) not-systems) (cond ((eq? processed #t) '("processed = 1")) ((eq? processed #f) '("processed = 0")) (else '())) (cond ((eq? canceled #t) '("canceled = 1")) ((eq? canceled #f) '("canceled = 0")) (else '())) (if after-id '("uuid > :after_id") '())) " AND ") "\n") "") "ORDER BY uuid ASC\n" (if limit (string-append "LIMIT " (number->string limit) "\n") "")) #:cache? #f))) (when after-id (sqlite-bind-arguments statement #:after_id after-id)) (let ((result (sqlite-map (match-lambda (#(uuid derivation_name priority processed canceled created_at end_time) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority) (processed . ,(cond ((= 0 processed) #f) ((= 1 processed) #t) (else (error "unknown processed value")))) (canceled . ,(cond ((= 0 canceled) #f) ((= 1 canceled) #t) (else (error "unknown canceled value")))) (created-at . ,(if (string? created_at) (match (strptime "%F %T" created_at) ((parts . _) parts)) #f)) (end-time . ,(if (string? end_time) (match (strptime "%F %T" end_time) ((parts . _) parts)) #f))))) statement))) (sqlite-finalize statement) result))))))) (apply list-builds rest)) (define-method (datastore-fetch-build-tags (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT key, value FROM tags INNER JOIN build_tags ON tags.id = build_tags.tag_id WHERE build_tags.build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (list->vector (sqlite-fold (lambda (row result) (match row (#(key value) `((,key . ,value) ,@result)))) '() statement)))) (sqlite-reset statement) result))))) (define-method (datastore-find-build-result (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, result, failure_reason FROM build_results WHERE build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (match (sqlite-step statement) (#(agent_id result failure_reason) `((agent_id . ,agent_id) (result . ,result) (failure_reason . ,failure_reason))) (#f #f)))) (sqlite-reset statement) result))))) (define-method (datastore-find-build-derivation-system (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT systems.system FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id INNER JOIN systems ON derivations.system_id = systems.id WHERE builds.id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((system (match (sqlite-step statement) (#(system) system)))) (sqlite-reset statement) system))))) (define-method (datastore-list-builds-for-output (datastore ) output) (call-with-time-tracking datastore "list_builds_for_output" (lambda () (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivations.name, priority, processed, canceled, result FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id LEFT JOIN build_results ON builds.id = build_results.build_id WHERE derivation_outputs.output_id = :output_id" #:cache? #t))) (sqlite-bind-arguments statement #:output_id (db-output->output-id db output)) (let ((result (sqlite-map (match-lambda (#(uuid derivation priority processed canceled result) `((uuid . ,uuid) (derivation . ,derivation) (priority . ,priority) (processed . ,(cond ((= 0 processed) #f) ((= 1 processed) #t) (else (error "unknown value")))) (canceled . ,(cond ((= 0 canceled) #f) ((= 1 canceled) #t) (else (error "unknown value")))) (result . ,result)))) statement))) (sqlite-reset statement) result))))))) (define-method (datastore-list-builds-for-output-and-system (datastore ) . rest) (apply (lambda* (output system #:key include-canceled?) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db (string-append " SELECT uuid, derivations.name FROM builds INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE derivation_outputs.output_id = :output_id AND derivations.system_id = :system_id" (if include-canceled? "" " AND builds.canceled = 0")) #:cache? #t))) (sqlite-bind-arguments statement #:output_id (db-output->output-id db output) #:system_id (db-system->system-id db system)) (let ((result (sqlite-map (match-lambda (#(uuid derivation) `((uuid . ,uuid) (derivation . ,derivation)))) statement))) (sqlite-reset statement) result))))) rest)) (define-method (datastore-count-builds-for-derivation (datastore ) . rest) (apply (lambda* (derivation #:key (include-canceled? #t)) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db (string-append " SELECT COUNT(*) FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE derivations.name = :derivation" (if include-canceled? "" " AND canceled = 0")) #:cache? #t))) (sqlite-bind-arguments statement #:derivation derivation) (let ((result (match (sqlite-step statement) (#(x) x)))) (sqlite-reset statement) result))))) rest)) (define-method (datastore-update (datastore )) (run-sqitch (slot-ref datastore 'database-file)) #t) (define-method (datastore-count-setup-failures (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, failure_reason, COUNT(*) FROM setup_failures GROUP BY agent_id, failure_reason" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id failure_reason count) (cons (list agent_id failure_reason) count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-setup-failures-for-build (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, agent_id, failure_reason FROM setup_failures WHERE build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (sqlite-map (match-lambda (#(id agent-id failure-reason) `((id . ,id) (agent-id . ,agent-id) (failure-reason . ,failure-reason)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-fetch-setup-failures (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT setup_failures.id, builds.uuid, agent_id, failure_reason FROM setup_failures INNER JOIN builds ON builds.id = setup_failures.build_id WHERE builds.processed = 0 AND builds.canceled = 0 AND builds.id NOT IN ( SELECT build_id FROM allocated_builds )" #:cache? #t))) (let ((result (sqlite-fold (lambda (row result) (match row (#(id build-id agent-id failure-reason) (let ((failures-for-build-id (or (hash-ref result build-id) '()))) (hash-set! result build-id (cons `((id . ,id) (agent-id . ,agent-id) (failure-reason . ,failure-reason)) failures-for-build-id))))) result) (make-hash-table) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-processed-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivations.name, priority FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE processed = 1" #:cache? #t))) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name priority) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-list-unprocessed-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivations.name, priority FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE processed = 0 AND canceled = 0 AND ( deferred_until IS NULL OR deferred_until < datetime('now') ) AND builds.id NOT IN ( SELECT build_id FROM allocated_builds ) ORDER BY priority DESC" #:cache? #t))) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name priority) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-find-first-unallocated-deferred-build (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivations.name, priority, created_at, deferred_until FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE processed = 0 AND canceled = 0 AND deferred_until IS NOT NULL AND builds.id NOT IN (SELECT build_id FROM build_allocation_plan) ORDER BY deferred_until ASC LIMIT 1" #:cache? #t))) (let ((result (match (sqlite-step statement) (#(uuid derivation_name priority created_at deferred_until) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority) (created-at . ,(if (string? created_at) (string->date created_at "~Y-~m-~d ~H:~M:~S") #f)) (deferred-until . ,(if (string? deferred_until) (string->date deferred_until "~Y-~m-~d ~H:~M:~S") #f)))) (#f #f)))) (sqlite-reset statement) result))))) (define-method (datastore-fetch-prioritised-unprocessed-builds (datastore )) (define (fetch-prioritised-unprocessed-builds db) (let ((statement (sqlite-prepare db " SELECT builds.uuid FROM unprocessed_builds_with_derived_priorities INNER JOIN builds ON build_id = builds.id WHERE all_inputs_built = 1 AND NOT EXISTS ( SELECT 1 FROM allocated_builds WHERE unprocessed_builds_with_derived_priorities.build_id = allocated_builds.build_id ) ORDER BY derived_priority ASC" #:cache? #t))) (let ((result (sqlite-fold (lambda (row result) (cons (vector-ref row 0) result)) '() statement))) (sqlite-reset statement) result))) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) fetch-prioritised-unprocessed-builds)) (define-method (datastore-insert-unprocessed-hook-event (datastore ) event arguments) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-unprocessed-hook-event db event arguments)))) (define (insert-unprocessed-hook-event db event arguments) (let ((statement (sqlite-prepare db " INSERT INTO unprocessed_hook_events (event, arguments) VALUES (:event, :arguments)" #:cache? #t))) (sqlite-bind-arguments statement #:event event #:arguments (call-with-output-string (lambda (port) (write arguments port)))) (sqlite-step statement) (sqlite-reset statement)) #t) (define-method (datastore-count-unprocessed-hook-events (datastore )) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT event, COUNT(*) FROM unprocessed_hook_events GROUP BY event" #:cache? #t))) (let ((counts (sqlite-map (match-lambda (#(event count) `((event . ,event) (count . ,count)))) statement))) (sqlite-reset statement) counts))))) (define-method (datastore-list-unprocessed-hook-events (datastore ) event limit) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, event, arguments FROM unprocessed_hook_events WHERE event = :event LIMIT :limit" #:cache? #t))) (sqlite-bind-arguments statement #:event (symbol->string event) #:limit limit) (let ((events (sqlite-map (match-lambda (#(id event arguments) (list id (string->symbol event) (call-with-input-string arguments (lambda (port) (read port)))))) statement))) (sqlite-reset statement) events))))) (define-method (datastore-delete-unprocessed-hook-event (datastore ) id) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " DELETE FROM unprocessed_hook_events WHERE id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id id) (sqlite-step statement) (sqlite-reset statement)))) #t) (define-method (datastore-count-build-allocation-plan-entries (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, COUNT(*) FROM build_allocation_plan GROUP BY agent_id" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id count) (cons agent_id count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-replace-build-allocation-plan (datastore ) planned-builds) (define (clear-current-plan db) (sqlite-exec db "DELETE FROM build_allocation_plan")) (define insert-sql (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (string-append " INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES " (string-join (map (match-lambda ((build-uuid agent-id ordering) (simple-format #f "('~A', '~A', ~A)" (db-find-build-id db build-uuid) agent-id ordering))) planned-builds) ", ") ";")))) (define (insert-new-plan db planned-builds) (sqlite-exec db insert-sql)) (datastore-call-with-transaction datastore (lambda (db) (clear-current-plan db) (unless (null? planned-builds) (insert-new-plan db planned-builds))) #:duration-metric-name "replace_build_allocation_plan") (let* ((agent-ids (map (lambda (agent-details) (assq-ref agent-details 'uuid)) (datastore-list-agents datastore))) (counts-by-agent (make-vector (length agent-ids) 0))) (for-each (match-lambda ((_ agent-id _) (let ((index (list-index (lambda (list-agent-id) (string=? agent-id list-agent-id)) agent-ids))) (vector-set! counts-by-agent index (+ (vector-ref counts-by-agent index) 1))))) planned-builds) (let ((metric (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) "build_allocation_plan_total"))) (for-each (lambda (index agent-id) (metric-set metric (vector-ref counts-by-agent index) #:label-values `((agent_id . ,agent-id)))) (iota (length agent-ids)) agent-ids))) #t) (define-method (datastore-count-allocated-builds (datastore )) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, COUNT(*) FROM allocated_builds GROUP BY agent_id" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id count) (cons agent_id count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-agent-requested-systems (datastore ) agent-id) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT system_id FROM build_allocation_agent_requested_systems WHERE agent_id = :agent_id ORDER BY system_id ASC" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((result (sqlite-map (match-lambda (#(system-id) (db-system-id->system db system-id))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-update-agent-requested-systems (datastore ) agent-id systems) (define update-not-needed? (equal? (sort systems stringsystem-id db system) (db-insert-system db system)))) systems) ", ") ";")) #t)))) (define-method (datastore-fetch-build-to-allocate (datastore ) agent-id) (datastore-call-with-transaction datastore (lambda (db) (let ((statement (sqlite-prepare db ;; This needs to guard against the plan being out of date " SELECT builds.uuid, derivations.id, derivations.name FROM builds INNER JOIN build_allocation_plan ON builds.id = build_allocation_plan.build_id INNER JOIN derivations ON builds.derivation_id = derivations.id INNER JOIN build_allocation_agent_requested_systems ON build_allocation_agent_requested_systems.agent_id = :agent_id AND build_allocation_agent_requested_systems.system_id = derivations.system_id WHERE build_allocation_plan.agent_id = :agent_id AND builds.processed = 0 AND builds.canceled = 0 AND builds.id NOT IN (SELECT build_id FROM allocated_builds) ORDER BY build_allocation_plan.ordering ASC" #:cache? #t)) (output-conflicts-statement (sqlite-prepare db " SELECT 1 FROM derivation_outputs AS build_derivation_outputs INNER JOIN allocated_builds ON allocated_builds.agent_id = :agent_id INNER JOIN builds AS allocated_build_details ON allocated_build_details.id = allocated_builds.build_id INNER JOIN derivation_outputs AS allocated_builds_derivation_outputs ON allocated_build_details.derivation_id = allocated_builds_derivation_outputs.derivation_id WHERE build_derivation_outputs.derivation_id = :derivation_id AND build_derivation_outputs.output_id = allocated_builds_derivation_outputs.output_id" #:cache? #t))) (define (get-build-to-allocate) (let loop ((build-details (sqlite-step statement))) (match build-details (#f #f) (#(uuid derivation-id derivation-name) (sqlite-bind-arguments output-conflicts-statement #:agent_id agent-id #:derivation_id derivation-id) (let ((result (sqlite-step output-conflicts-statement))) (sqlite-reset output-conflicts-statement) (if (eq? #f result) `((uuid . ,uuid) ;; TODO Change this to derivation_name (derivation-name . ,derivation-name)) (loop (sqlite-step statement)))))))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((result (get-build-to-allocate))) (sqlite-reset statement) result))) #:readonly? #t)) (define-method (datastore-insert-to-allocated-builds (datastore ) agent-id build-uuids) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " INSERT INTO allocated_builds (build_id, agent_id) VALUES " (string-join (map (lambda (build-uuid) (simple-format #f "(~A, '~A')" (db-find-build-id db build-uuid) agent-id)) build-uuids) ", ") ";"))))) (define-method (datastore-remove-builds-from-plan (datastore ) build-uuids) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " DELETE FROM build_allocation_plan WHERE build_id IN (" (string-join (map (lambda (build-uuid) (number->string (db-find-build-id db build-uuid))) build-uuids) ", ") ")"))))) (define-method (datastore-select-allocated-builds (datastore ) agent-id) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT builds.uuid, derivations.name FROM allocated_builds INNER JOIN builds ON allocated_builds.build_id = builds.id INNER JOIN derivations ON builds.derivation_id = derivations.id WHERE agent_id = :agent_id" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((result (sqlite-fold (lambda (row result) (cons (match row (#(uuid derivation_name) `((uuid . ,uuid) ;; TODO Switch this to derivation_name (derivation-name . ,derivation_name)))) result)) '() statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-allocation-plan-builds (datastore ) agent-id limit) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db ;; This needs to guard against the plan being out of date " SELECT builds.uuid, derivations.name FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id INNER JOIN build_allocation_plan ON builds.id = build_allocation_plan.build_id WHERE build_allocation_plan.agent_id = :agent_id AND builds.processed = 0 AND builds.id NOT IN (SELECT build_id FROM allocated_builds) ORDER BY build_allocation_plan.ordering ASC LIMIT :limit" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id #:limit limit) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name) `((uuid . ,uuid) (derivation-name . ,derivation_name)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-list-agent-builds (datastore ) agent-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT builds.uuid, derivations.name, builds.priority FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id INNER JOIN allocated_builds ON builds.id = allocated_builds.build_id WHERE allocated_builds.agent_id = :agent_id" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name priority) `((uuid . ,uuid) ;; TODO Switch this to derivation_name (derivation-name . ,derivation_name) (priority . ,priority)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-agent-for-build (datastore ) build-uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id FROM allocated_builds WHERE allocated_builds.build_id = :build_id UNION SELECT agent_id FROM build_results WHERE build_results.build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (match (sqlite-step statement) (#(agent-id) agent-id) (#f #f)))) (sqlite-reset statement) result))))) (define* (db-open database #:key (write? #t)) (define flags (cons (if write? SQLITE_OPEN_READWRITE SQLITE_OPEN_READONLY) (list SQLITE_OPEN_NOMUTEX))) (unless (file-exists? database) (run-sqitch database)) (sqlite-open database (apply logior flags))) (define (run-sqitch database-file) (let ((command (list (%config 'sqitch) "deploy" "--db-client" (%config 'sqitch-sqlite) ;; if sqitch.conf exists (which it should when developing), ;; just use the current directory as the chdir value. Otherwise ;; use the directory which should contain the right files after ;; installation. "--chdir" (if (file-exists? "sqitch.conf") (getcwd) (string-append (dirname (%config 'sqitch-plan)) "/sqlite")) "--plan-file" (%config 'sqitch-plan) "--registry" (string-append (canonicalize-path (dirname database-file)) "/" (basename (if (string-suffix? ".db" database-file) (string-drop-right database-file 3) database-file)) "_sqitch_registry.db") (string-append "db:sqlite:" database-file)))) (simple-format #t "running command: ~A\n" (string-join command)) (unless (zero? (apply system* command)) (simple-format (current-error-port) "error: sqitch command failed\n") (exit 1)))) (define (changes-count db) (let ((statement (sqlite-prepare db "SELECT changes();" #:cache? #t))) (let ((count (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) count))) (define (last-insert-rowid db) (let ((statement (sqlite-prepare db "SELECT last_insert_rowid();" #:cache? #t))) (let ((id (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) id))) (define (select-derivation-outputs db derivation-name) (let ((statement (sqlite-prepare db " SELECT name, id FROM derivation_outputs WHERE derivation_id = :derivation_id" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation-name)) (let ((outputs (sqlite-map (match-lambda (#(name output-id) (cons name output-id))) statement))) (sqlite-reset statement) outputs))) (define (db-find-derivation-id db name) (let ((statement (sqlite-prepare db " SELECT id FROM derivations WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name name) (let ((result (match (sqlite-step statement) (#f #f) (#(id) id)))) (sqlite-reset statement) result))) (define (db-find-derivation db name) (let ((statement (sqlite-prepare db " SELECT systems.system, fixed_output FROM derivations INNER JOIN systems ON systems.id = derivations.system_id WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name name) (let ((result (match (sqlite-step statement) (#f #f) (#(system fixed_output) `((system . ,system) (fixed-output? . ,(cond ((eq? fixed_output 0) #f) ((eq? fixed_output 1) #t) (else fixed_output)))))))) (sqlite-reset statement) result))) (define-method (datastore-find-derivation (datastore ) name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (db-find-derivation db name)))) (define-method (datastore-find-derivation-outputs (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT name, outputs.output FROM derivation_outputs INNER JOIN outputs ON derivation_outputs.output_id = outputs.id WHERE derivation_id = :derivation_id" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation-name)) (let ((result (sqlite-map (match-lambda (#(name output) `((name . ,name) (output . ,output)))) statement))) (sqlite-reset statement) (if (null? result) #f result)))))) (define-method (datastore-list-unbuilt-derivation-outputs (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT derivation_outputs.name, outputs.output FROM derivation_outputs INNER JOIN outputs ON derivation_outputs.output_id = outputs.id INNER JOIN unbuilt_outputs ON derivation_outputs.output_id = unbuilt_outputs.output_id WHERE derivation_id = :derivation_id" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation-name)) (let ((result (sqlite-map (match-lambda (#(name output) `((name . ,name) (output . ,output)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-build-outputs (datastore ) build-uuid) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT name, outputs.output, hash, size, store_references FROM builds INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id INNER JOIN outputs ON derivation_outputs.output_id = outputs.id LEFT JOIN output_metadata ON output_metadata.derivation_output_id = derivation_outputs.id AND output_metadata.build_id = builds.id WHERE builds.id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id (db-find-build-id db build-uuid)) (let ((result (sqlite-map (match-lambda (#(name output hash size store_references) `((name . ,name) (output . ,output) (hash . ,hash) (size . ,size) (references . ,(and store_references (list->vector (string-split store_references #\space))))))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-find-derivation-system (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT systems.system FROM derivations INNER JOIN systems ON systems.id = derivations.system_id WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name derivation-name) (let ((system (match (sqlite-step statement) (#f #f) (#(system) system)))) (sqlite-reset statement) system))))) (define-method (datastore-find-derivation-inputs (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT output_derivations.name, derivation_outputs.name, outputs.output FROM derivations INNER JOIN derivation_inputs ON derivation_inputs.derivation_id = derivations.id INNER JOIN derivation_outputs ON derivation_inputs.derivation_output_id = derivation_outputs.id INNER JOIN derivations AS output_derivations ON output_derivations.id = derivation_outputs.derivation_id INNER JOIN outputs ON derivation_outputs.output_id = outputs.id WHERE derivations.id = :derivation_id" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db derivation-name)) (let ((result (sqlite-map (match-lambda (#(derivation output-name output) `((derivation . ,derivation) (output_name . ,output-name) (output . ,output)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-find-derivation-for-output (datastore ) start-derivation-name output) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " WITH RECURSIVE related_derivations(id) AS ( VALUES(:derivation_id) UNION SELECT derivation_outputs.derivation_id FROM derivation_outputs INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id INNER JOIN related_derivations ON related_derivations.id = derivation_inputs.derivation_id ) SELECT derivations.name FROM related_derivations INNER JOIN derivations ON derivations.id = related_derivations.id INNER JOIN derivation_outputs ON related_derivations.id = derivation_outputs.derivation_id WHERE derivation_outputs.output_id = :output_id " #:cache? #t))) (sqlite-bind-arguments statement #:derivation_id (db-find-derivation-id db start-derivation-name) #:output_id (db-output->output-id db output)) (let ((result (match (sqlite-step statement) (#f #f) (#(derivation) derivation)))) (sqlite-reset statement) result))))) (define (db-insert-system db system) (let ((statement (sqlite-prepare db " INSERT INTO systems (system) VALUES (:system)" #:cache? #t))) (sqlite-bind-arguments statement #:system system) (sqlite-step statement) (let ((id (last-insert-rowid db))) (sqlite-reset statement) id))) (define (db-system->system-id db system) (let ((statement (sqlite-prepare db " SELECT id FROM systems WHERE system = :system" #:cache? #t))) (sqlite-bind-arguments statement #:system system) (match (sqlite-step statement) (#f #f) (#(id) (sqlite-reset statement) id)))) (define (db-system-id->system db system-id) (let ((statement (sqlite-prepare db " SELECT system FROM systems WHERE id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id system-id) (match (sqlite-step statement) (#f #f) (#(id) (sqlite-reset statement) id)))) (define (insert-derivation-and-return-outputs db derivation) (define derivation-name (derivation-file-name derivation)) (define (maybe-fix-fixed-output-field derivation-details) (let ((fixed-output? (fixed-output-derivation? derivation))) (unless (equal? (assq-ref derivation-details 'fixed-output?) fixed-output?) (let ((statement (sqlite-prepare db " UPDATE derivations SET fixed_output = :fixed_output WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name derivation-name #:fixed_output (if fixed-output? 1 0)))))) (define (insert-derivation) (let ((statement (sqlite-prepare db " INSERT INTO derivations (name, system_id, fixed_output) VALUES (:name, :system_id, :fixed_output)" #:cache? #t))) (sqlite-bind-arguments statement #:name derivation-name #:system_id (let ((system (derivation-system derivation))) (or (db-system->system-id db system) (db-insert-system db system))) #:fixed_output (if (fixed-output-derivation? derivation) 1 0)) (sqlite-step statement) (sqlite-reset statement) (last-insert-rowid db))) (or (hash-ref %derivation-outputs-cache derivation-name) (let ((derivation-details (db-find-derivation db derivation-name))) (if derivation-details (begin (maybe-fix-fixed-output-field derivation-details) (let ((derivation-outputs (select-derivation-outputs db derivation-name))) (hash-set! %derivation-outputs-cache derivation-name derivation-outputs) derivation-outputs)) (let ((derivation-id (insert-derivation))) (call-with-delay-logging insert-derivation-inputs #:args (list db derivation-id (derivation-inputs derivation))) (let ((derivation-outputs (call-with-delay-logging insert-derivation-outputs #:args (list db derivation-id (derivation-outputs derivation))))) (hash-set! %derivation-outputs-cache derivation-name derivation-outputs) derivation-outputs)))))) (define (insert-derivation-inputs db derivation-id derivation-inputs) (unless (null? derivation-inputs) (let ((derivation-output-ids (append-map (lambda (derivation-input) (let ((output-ids-by-name (call-with-delay-logging insert-derivation-and-return-outputs #:args (list db (derivation-input-derivation derivation-input))))) (map (lambda (output-name) (assoc-ref output-ids-by-name output-name)) (derivation-input-sub-derivations derivation-input)))) derivation-inputs))) (let ((statement (sqlite-prepare db " INSERT INTO derivation_inputs (derivation_id, derivation_output_id) VALUES (:derivation_id, :derivation_output_id)" #:cache? #t))) (for-each (lambda (derivation-output-id) (sqlite-bind-arguments statement #:derivation_id derivation-id #:derivation_output_id derivation-output-id) (sqlite-step statement) (sqlite-reset statement)) derivation-output-ids))))) (define (db-insert-output db output) (let ((statement (sqlite-prepare db " INSERT INTO outputs (output) VALUES (:output)" #:cache? #t))) (sqlite-bind-arguments statement #:output output) (sqlite-step statement) (let ((id (last-insert-rowid db))) (sqlite-reset statement) id))) (define (db-output->output-id db output) (let ((statement (sqlite-prepare db " SELECT id FROM outputs WHERE output = :output" #:cache? #t))) (sqlite-bind-arguments statement #:output output) (let ((result (match (sqlite-step statement) (#f #f) (#(id) id)))) (sqlite-reset statement) result))) (define (insert-derivation-outputs db derivation-id derivation-outputs) (define output-has-successful-build? (let ((statement (sqlite-prepare db " SELECT build_results.result FROM derivation_outputs INNER JOIN builds ON builds.derivation_id = derivation_outputs.derivation_id INNER JOIN build_results ON builds.id = build_results.build_id WHERE derivation_outputs.output_id = :output_id" #:cache? #t))) (lambda (output-id) (sqlite-bind-arguments statement #:output_id output-id) (let* ((build-results (sqlite-map (match-lambda (#(result) result)) statement)) (result (if (member "success" build-results) #t #f))) (sqlite-reset statement) result)))) (define insert-into-unbuilt-outputs (let ((statement (sqlite-prepare db " INSERT OR IGNORE INTO unbuilt_outputs (output_id) VALUES (:output_id)" #:cache? #t))) (lambda (output-id) (sqlite-bind-arguments statement #:output_id output-id) (sqlite-step statement) (sqlite-reset statement) #t))) (let ((derivation-outputs-with-ids (map (match-lambda ((name . derivation-output) (let ((output (derivation-output-path derivation-output))) (cons name (or (db-output->output-id db output) (db-insert-output db output)))))) derivation-outputs)) (statement (sqlite-prepare db " INSERT INTO derivation_outputs (derivation_id, name, output_id) VALUES (:derivation_id, :name, :output_id)" #:cache? #t))) (let ((result (map (match-lambda ((name . output-id) (sqlite-bind-arguments statement #:derivation_id derivation-id #:name name #:output_id output-id) (sqlite-step statement) (sqlite-reset statement) (cons name (last-insert-rowid db)))) derivation-outputs-with-ids))) (for-each (lambda (output-id) (unless (output-has-successful-build? output-id) (insert-into-unbuilt-outputs output-id))) (map cdr derivation-outputs-with-ids)) result))) (define-method (datastore-insert-build (datastore ) . rest) (define (insert-build db drv-name uuid priority defer-until) (let ((statement (sqlite-prepare db " INSERT INTO builds (uuid, derivation_id, priority, created_at, deferred_until) VALUES (:uuid, :derivation_id, :priority, datetime('now'), :deferred_until) RETURNING id" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid #:derivation_id (call-with-delay-logging db-find-derivation-id #:args (list db drv-name)) #:priority priority #:deferred_until (and=> defer-until (lambda (date) (date->string date "~1 ~3")))) (match (sqlite-step statement) (#(id) (sqlite-reset statement) id)))) (define (get-derived-priority db build-id) (let ((statement (sqlite-prepare db " SELECT max(dependent_unprocessed_builds_with_derived_priorities.derived_priority) FROM builds INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id INNER JOIN derivation_outputs AS all_derivation_outputs ON all_derivation_outputs.output_id = derivation_outputs.output_id INNER JOIN derivation_inputs ON derivation_inputs.derivation_output_id = all_derivation_outputs.id INNER JOIN builds AS dependent_builds ON dependent_builds.processed = 0 AND dependent_builds.canceled = 0 AND dependent_builds.derivation_id = derivation_inputs.derivation_id INNER JOIN unprocessed_builds_with_derived_priorities AS dependent_unprocessed_builds_with_derived_priorities ON dependent_builds.id = dependent_unprocessed_builds_with_derived_priorities.build_id WHERE builds.id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id) (let ((result (match (sqlite-step statement) (#(#f) #f) (#(derived-priority) derived-priority)))) (sqlite-reset statement) result))) (define (all-inputs-built? db build-id) (let ((statement (sqlite-prepare db " SELECT 1 FROM derivation_inputs INNER JOIN derivation_outputs ON derivation_inputs.derivation_output_id = derivation_outputs.id INNER JOIN unbuilt_outputs ON unbuilt_outputs.output_id = derivation_outputs.output_id INNER JOIN builds ON builds.derivation_id = derivation_inputs.derivation_id WHERE builds.id = :build_id LIMIT 1" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id) (let ((result (match (sqlite-step statement) (#f #t) (#(1) #f)))) (sqlite-reset statement) result))) (define (insert-unprocessed-builds-with-derived-priorities-entry db build-id derived-priority all-inputs-built?) (let ((statement (sqlite-prepare db " INSERT INTO unprocessed_builds_with_derived_priorities (build_id, derived_priority, all_inputs_built) VALUES (:build_id, :derived_priority, :all_inputs_built)" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id #:derived_priority derived-priority #:all_inputs_built (if all-inputs-built? 1 0)) (sqlite-step statement) (sqlite-reset statement))) (define (update-unprocessed-builds-with-derived-priorities db build-id derived-priority) ;; Recursively find builds for all missing outputs that this build takes ;; as inputs. These builds should have a derived priority of at least the ;; derived priority of this build (let ((find-builds-statement (sqlite-prepare db " WITH RECURSIVE relevant_builds (id) AS ( VALUES (:build_id) UNION SELECT builds.id FROM relevant_builds INNER JOIN builds AS relevant_builds_full ON relevant_builds.id = relevant_builds_full.id INNER JOIN derivation_inputs ON relevant_builds_full.derivation_id = derivation_inputs.derivation_id INNER JOIN derivation_outputs ON derivation_inputs.derivation_output_id = derivation_outputs.id INNER JOIN unbuilt_outputs ON unbuilt_outputs.output_id = derivation_outputs.output_id INNER JOIN derivation_outputs AS all_derivation_outputs ON all_derivation_outputs.output_id = unbuilt_outputs.output_id INNER JOIN builds ON builds.processed = 0 AND builds.derivation_id = all_derivation_outputs.derivation_id ) SELECT build_id FROM relevant_builds INNER JOIN unprocessed_builds_with_derived_priorities ON relevant_builds.id = unprocessed_builds_with_derived_priorities.build_id WHERE unprocessed_builds_with_derived_priorities.derived_priority < :derived_priority" #:cache? #t)) (update-derived-priority-statement (sqlite-prepare db " UPDATE unprocessed_builds_with_derived_priorities SET derived_priority = :derived_priority WHERE build_id = :build_id" #:cache? #t))) (let ((builds-to-update (call-with-delay-logging (lambda () (sqlite-map (lambda (row) (vector-ref row 0)) find-builds-statement))))) (sqlite-reset find-builds-statement) (map (lambda (id) (sqlite-bind-arguments update-derived-priority-statement #:build_id id #:derived_priority derived-priority) (sqlite-step update-derived-priority-statement) (sqlite-reset update-derived-priority-statement)) builds-to-update)))) (apply (lambda* (uuid drv-name priority defer-until #:key skip-updating-other-build-derived-priorities) (call-with-worker-thread/delay-logging (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (let* ((build-id (insert-build db drv-name uuid priority defer-until)) (derived-priority (max (or (call-with-delay-logging get-derived-priority #:args (list db build-id)) priority) priority)) (all-inputs-built? (all-inputs-built? db build-id))) (insert-unprocessed-builds-with-derived-priorities-entry db build-id derived-priority all-inputs-built?) (unless (or all-inputs-built? skip-updating-other-build-derived-priorities) (call-with-delay-logging update-unprocessed-builds-with-derived-priorities #:args (list db build-id derived-priority))))))) rest) #t) (define (insert-agent db uuid name description) (let ((statement (sqlite-prepare db " INSERT INTO agents (id, name, description) VALUES (:id, :name, :description)" #:cache? #t))) (sqlite-bind-arguments statement #:id uuid #:name name #:description description) (sqlite-step statement) (sqlite-reset statement))) (define (insert-agent-password db uuid password) (let ((statement (sqlite-prepare db " INSERT INTO agent_passwords (agent_id, password) VALUES (:agent_id, :password)" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id uuid #:password password) (sqlite-step statement) (sqlite-reset statement)))