(define-module (guix-build-coordinator datastore sqlite) #:use-module (oop goops) #:use-module (srfi srfi-1) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (sqlite3) #:use-module (prometheus) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator datastore abstract) #:export (sqlite-datastore datastore-update datastore-call-with-transaction datastore-store-derivation datastore-list-related-derivations-with-no-build-for-outputs datastore-list-failed-builds-with-blocking-count datastore-list-builds-for-derivation-recursive-inputs datastore-store-build datastore-count-builds datastore-for-each-build datastore-find-build datastore-find-build-result datastore-find-build-derivation-system datastore-list-builds-for-derivation datastore-count-build-results datastore-store-build-result datastore-list-build-outputs datastore-new-agent datastore-list-agents datastore-find-agent datastore-store-build-start datastore-find-build-starts datastore-count-setup-failures datastore-list-setup-failures-for-build datastore-fetch-setup-failures datastore-store-setup-failure datastore-store-setup-failure/missing-inputs datastore-list-setup-failure-missing-inputs datastore-find-derivation datastore-find-derivation-system datastore-find-derivation-inputs datastore-find-derivation-for-output datastore-find-derivation-outputs datastore-list-builds-for-output datastore-list-builds-for-output-and-system datastore-new-agent-password datastore-agent-password-exists? datastore-list-processed-builds datastore-list-unprocessed-builds datastore-fetch-build-ids-and-propagated-priorities-for-unprocessed-builds datastore-count-unprocessed-hook-events datastore-list-unprocessed-hook-events datastore-delete-unprocessed-hook-event datastore-list-agent-builds datastore-agent-for-build datastore-count-build-allocation-plan-entries datastore-replace-build-allocation-plan datastore-count-allocated-builds datastore-agent-requested-systems datastore-update-agent-requested-systems datastore-allocate-builds-to-agent datastore-list-allocation-plan-builds)) (define-class () database-file worker-reader-thread-channel worker-writer-thread-channel metrics-registry) (define* (sqlite-datastore database-uri #:key update-database? metrics-registry) (define database-file (string-drop database-uri (string-length "sqlite://"))) (when update-database? (run-sqitch database-file)) (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA optimize;") (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") (sqlite-close db)) (let ((datastore (make ))) (slot-set! datastore 'database-file database-file) (slot-set! datastore 'metrics-registry metrics-registry) (slot-set! datastore 'worker-reader-thread-channel (make-worker-thread-channel (lambda () (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA busy_timeout = 5000;") (list db))) ;; Use a minimum of 2 and a maximum of 8 threads #:parallelism (min (max (current-processor-count) 2) 8))) (slot-set! datastore 'worker-writer-thread-channel (make-worker-thread-channel (lambda () (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA busy_timeout = 5000;") (list db))) ;; SQLite doesn't support parallel writes #:parallelism 1)) datastore)) (define (call-with-time-tracking datastore metric-name thunk) (define registry (slot-ref datastore 'metrics-registry)) (if registry (let* ((metric (or (metrics-registry-fetch-metric registry metric-name) (make-histogram-metric registry metric-name))) (start-time (current-time))) (let ((result (thunk))) (metric-observe metric (- (current-time) start-time)) result)) (thunk))) (define* (datastore-call-with-transaction datastore proc #:key readonly?) (call-with-worker-thread (slot-ref datastore (if readonly? 'worker-reader-thread-channel 'worker-writer-thread-channel)) (lambda (db) (sqlite-exec db "BEGIN TRANSACTION;") (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: sqlite rolling back transaction\n") (sqlite-exec db "ROLLBACK TRANSACTION;") (raise-exception exn)) (lambda () (call-with-values (lambda () (proc db)) (lambda vals (sqlite-exec db "COMMIT TRANSACTION;") (apply values vals)))))))) (define-method (datastore-find-agent (datastore ) uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT description FROM agents WHERE id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id uuid) (let ((result (match (sqlite-map (match-lambda (#(description) `((description . ,description)))) statement) (() #f) ((agent) agent)))) (sqlite-reset statement) result))))) (define-method (datastore-new-agent (datastore ) uuid description) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-agent db uuid description))) #t) (define-method (datastore-list-agents (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, description FROM agents ORDER BY id" #:cache? #t))) (let ((agents (sqlite-map (match-lambda (#(id description) `((uuid . ,id) (description . ,description)))) statement))) (sqlite-reset statement) agents))))) (define-method (datastore-new-agent-password (datastore ) agent-uuid password) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-agent-password db agent-uuid password))) #t) (define-method (datastore-agent-password-exists? (datastore ) uuid password) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT 1 FROM agent_passwords \ WHERE agent_id = :agent_id AND password = :password" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id uuid #:password password) (let ((result (match (sqlite-step statement) (#f #f) (#(1) #t)))) (sqlite-reset statement) result))))) (define-method (datastore-store-derivation (datastore ) derivation) (datastore-call-with-transaction datastore (lambda (db) (insert-derivation-and-return-outputs db derivation))) #t) (define-method (datastore-list-related-derivations-with-no-build-for-outputs (datastore ) derivation) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " WITH RECURSIVE related_derivations(name) AS ( VALUES(:derivation) UNION SELECT derivation_outputs.derivation_name FROM derivation_outputs INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id INNER JOIN related_derivations ON related_derivations.name = derivation_inputs.derivation_name ) SELECT related_derivations.name FROM related_derivations INNER JOIN derivations ON related_derivations.name = derivations.name WHERE related_derivations.name != :derivation AND NOT EXISTS ( SELECT 1 FROM builds INNER JOIN derivation_outputs AS other_derivation_derivation_outputs ON other_derivation_derivation_outputs.derivation_name = builds.derivation_name INNER JOIN derivations AS other_derivations ON other_derivation_derivation_outputs.derivation_name = other_derivations.name INNER JOIN derivation_outputs ON derivation_outputs.output = other_derivation_derivation_outputs.output WHERE derivation_outputs.derivation_name = related_derivations.name AND other_derivations.system = derivations.system ) " #:cache? #t))) (sqlite-bind-arguments statement #:derivation derivation) (let ((result (sqlite-map (match-lambda (#(derivation) derivation)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-failed-builds-with-blocking-count (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT builds.uuid, builds.derivation_name, ( WITH RECURSIVE related_derivations(name) AS ( VALUES(builds.derivation_name) UNION SELECT derivation_inputs.derivation_name FROM derivation_outputs INNER JOIN related_derivations ON derivation_outputs.derivation_name = related_derivations.name INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id ) SELECT COUNT(DISTINCT blocked_builds.uuid) FROM related_derivations INNER JOIN builds AS blocked_builds ON related_derivations.name = blocked_builds.derivation_name AND blocked_builds.processed = 0 ) FROM builds INNER JOIN build_results ON builds.uuid = build_results.build_id WHERE builds.processed = 1 AND build_results.result = 'failure' AND NOT EXISTS ( SELECT 1 FROM derivation_outputs INNER JOIN derivation_outputs AS other_build_derivation_outputs ON derivation_outputs.output = other_build_derivation_outputs.output INNER JOIN builds AS other_builds ON other_build_derivation_outputs.derivation_name = other_builds.derivation_name INNER JOIN build_results AS other_build_results ON other_builds.uuid = other_build_results.build_id WHERE derivation_outputs.derivation_name = builds.derivation_name AND other_build_results.result = 'success' ) ORDER BY 3 DESC, 2, 1" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(uuid derivation-name blocked-count) `((uuid . ,uuid) (derivation_name . ,derivation-name) (blocked_count . ,blocked-count)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-builds-for-derivation-recursive-inputs (datastore ) derivation) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " WITH RECURSIVE related_derivations(name) AS ( VALUES(:derivation) UNION SELECT derivation_outputs.derivation_name FROM derivation_outputs INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id INNER JOIN related_derivations ON related_derivations.name = derivation_inputs.derivation_name ) SELECT builds.uuid FROM builds INNER JOIN related_derivations ON related_derivations.name = builds.derivation_name" #:cache? #t))) (sqlite-bind-arguments statement #:derivation derivation) (let ((result (sqlite-map (match-lambda (#(uuid) uuid)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-store-build (datastore ) derivation-name uuid priority tags) (define (insert-tags db build-id tags) (let ((insert-tag-statement (sqlite-prepare db " INSERT INTO tags (\"key\", \"value\") VALUES (:tagkey, :tagvalue)" #:cache? #t)) (find-tag-statement (sqlite-prepare db " SELECT id FROM tags WHERE key = :tag_key AND value = :tag_value" #:cache? #t)) (build-tags-statement (sqlite-prepare db " INSERT INTO build_tags (build_id, tag_id) VALUES (:build_id, :tag_id)" #:cache? #t))) (define (tag->id key value) (sqlite-reset find-tag-statement) (sqlite-bind-arguments find-tag-statement #:tag_key key #:tag_value value) (match (sqlite-step find-tag-statement) (#(id) id) (#f (sqlite-reset insert-tag-statement) (sqlite-bind-arguments insert-tag-statement #:tagkey key #:tagvalue value) (sqlite-step insert-tag-statement) (last-insert-rowid db)))) (for-each (match-lambda ((key . value) (sqlite-reset build-tags-statement) (sqlite-bind-arguments build-tags-statement #:build_id build-id #:tag_id (tag->id key value)) (sqlite-step build-tags-statement))) tags))) (define (handle-inserting-unprocessed-hook-event db build-id) (insert-unprocessed-hook-event db "build-submitted" (list build-id))) (datastore-call-with-transaction datastore (lambda (db) (insert-build db uuid derivation-name priority) (handle-inserting-unprocessed-hook-event db uuid) (unless (null? tags) (insert-tags db uuid tags)))) #t) (define-method (datastore-count-build-results (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, result, COUNT(*) FROM build_results GROUP BY agent_id, result" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id result count) (cons (list agent_id result) count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-store-build-result (datastore ) build-id agent-id result failure-reason output-metadata end-time) (define (insert-build-result db build-id agent-id result failure-reason) (sqlite-exec db (string-append " INSERT INTO build_results ( build_id, agent_id, result, failure_reason ) VALUES ('" build-id "', '" agent-id "', '" result "', " (if failure-reason (string-append "'" failure-reason "'") "NULL") ")"))) (define (remove-build-allocation db build-id agent-id) (sqlite-exec db (string-append " DELETE FROM allocated_builds WHERE build_id = '" build-id "' AND agent_id = '" agent-id "'"))) (define (mark-build-as-processed db build-id end-time) (sqlite-exec db (string-append " UPDATE builds SET processed = 1 " (if end-time (string-append ", end_time = '" end-time "'") "") " WHERE uuid = '" build-id "'"))) (define (delete-relevant-outputs-from-unbuilt-outputs db build-id) (let ((statement (sqlite-prepare db " DELETE FROM unbuilt_outputs WHERE output IN ( SELECT derivation_outputs.output FROM derivation_outputs INNER JOIN builds ON builds.derivation_name = derivation_outputs.derivation_name WHERE builds.uuid = :build_id )" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id) (sqlite-step statement) #t)) (define (store-output-metadata db build-id output-metadata) (define (name->output-id name) (let ((statement (sqlite-prepare db " SELECT derivation_outputs.id FROM derivation_outputs INNER JOIN builds ON builds.derivation_name = derivation_outputs.derivation_name WHERE builds.uuid = :build_id AND derivation_outputs.name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id #:name name) (match (sqlite-step statement) (#(id) (sqlite-reset statement) id)))) (sqlite-exec db (string-append " INSERT INTO output_metadata (build_id, derivation_output_id, hash, size, store_references) VALUES " (string-join (map (lambda (output) (simple-format #f "('~A', ~A, '~A', ~A, '~A')" build-id (name->output-id (assoc-ref output "name")) (assoc-ref output "hash") (assoc-ref output "size") (string-join (vector->list (assoc-ref output "references")) " "))) output-metadata) ", "))) #t) (define (handle-inserting-unprocessed-hook-event db build-id result) (insert-unprocessed-hook-event db (if (string=? result "success") "build-success" "build-failure") (list build-id))) (datastore-call-with-transaction datastore (lambda (db) (insert-build-result db build-id agent-id result failure-reason) (remove-build-allocation db build-id agent-id) (mark-build-as-processed db build-id end-time) ;; This logic should be part of the coordinator, but it's here to be ;; inside the transaction (handle-inserting-unprocessed-hook-event db build-id result) (when (string=? result "success") (delete-relevant-outputs-from-unbuilt-outputs db build-id)) (when output-metadata (store-output-metadata db build-id output-metadata)))) #t) (define-method (datastore-store-build-start (datastore ) build-id agent-id) (define (handle-inserting-unprocessed-hook-event db build-id agent-id) (insert-unprocessed-hook-event db "build-started" (list build-id agent-id))) (datastore-call-with-transaction datastore (lambda (db) (sqlite-exec db (string-append " INSERT INTO build_starts ( build_id, agent_id, start_time ) VALUES ('" build-id "', '" agent-id "', " "datetime('now')" ")")) (handle-inserting-unprocessed-hook-event db build-id agent-id))) #t) (define-method (datastore-find-build-starts (datastore ) build-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT start_time, agent_id FROM build_starts WHERE build_id = :build_id ORDER BY start_time DESC" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id) (let ((result (sqlite-map (match-lambda (#(start_time agent_id) `((start-time . ,(match (strptime "%F %T" start_time) ((parts . _) parts))) (agent-id . ,agent_id)))) statement))) (sqlite-reset statement) result))))) (define (insert-setup-failure-and-remove-allocation db build-id agent-id failure-reason) (sqlite-exec db (string-append " DELETE FROM allocated_builds WHERE build_id = '" build-id "' AND agent_id = '" agent-id "'")) (sqlite-exec db (string-append " INSERT INTO setup_failures ( build_id, agent_id, failure_reason ) VALUES ('" build-id "', '" agent-id "', '" failure-reason "')")) (last-insert-rowid db)) (define-method (datastore-store-setup-failure/missing-inputs (datastore ) build-id agent-id missing-inputs) (define (insert-missing-inputs db setup-failure-id missing-inputs) (sqlite-exec db (string-append " INSERT INTO setup_failure_missing_inputs ( setup_failure_id, missing_input_store_path ) VALUES " (string-join (map (lambda (missing-input) (simple-format #f "(~A, '~A')" setup-failure-id missing-input)) missing-inputs) ", ")))) (define (handle-inserting-unprocessed-hook-event db build-id missing-inputs) (insert-unprocessed-hook-event db "build-missing-inputs" (list build-id missing-inputs))) (datastore-call-with-transaction datastore (lambda (db) (let ((setup-failure-id (insert-setup-failure-and-remove-allocation db build-id agent-id "missing_inputs"))) (insert-missing-inputs db setup-failure-id missing-inputs)) ;; This logic should be part of the coordinator, but it's here to be ;; inside the transaction (handle-inserting-unprocessed-hook-event db build-id missing-inputs))) #t) (define-method (datastore-list-setup-failure-missing-inputs (datastore ) setup-failure-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT missing_input_store_path FROM setup_failure_missing_inputs WHERE setup_failure_id = :id" #:cache? #t))) (sqlite-bind-arguments statement #:id setup-failure-id) (let ((result (sqlite-map (match-lambda (#(missing-input) missing-input)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-store-setup-failure (datastore ) build-id agent-id failure-reason) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (insert-setup-failure-and-remove-allocation db build-id agent-id failure-reason)))) (define-method (datastore-count-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT derivations.system, COUNT(*) FROM builds INNER JOIN derivations ON builds.derivation_name = derivations.name GROUP BY derivations.system" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(system count) (cons system count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-for-each-build (datastore ) proc) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid FROM builds ORDER BY uuid" #:cache? #t))) (let loop ((row (sqlite-step statement))) (match row (#(uuid) (proc uuid) (loop (sqlite-step statement))) (#f (sqlite-reset statement) #t))))))) (define-method (datastore-find-build (datastore ) uuid) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivation_name, priority, processed, created_at, end_time FROM builds WHERE uuid = :uuid" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid) (let ((result (match (sqlite-step statement) (#(uuid derivation_name priority processed created_at end_time) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority) (processed . ,(cond ((= 0 processed) #f) ((= 1 processed) #t) (else (error "unknown processed value")))) (created-at . ,(if (string? created_at) (match (strptime "%F %T" created_at) ((parts . _) parts)) #f)) (end-time . ,(if (string? end_time) (match (strptime "%F %T" end_time) ((parts . _) parts)) #f))))))) (sqlite-reset statement) result))))) (define-method (datastore-find-build-result (datastore ) build-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, result, failure_reason FROM build_results WHERE build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id) (let ((result (match (sqlite-step statement) (#(agent_id result failure_reason) `((agent_id . ,agent_id) (result . ,result) (failure_reason . ,failure_reason))) (#f #f)))) (sqlite-reset statement) result))))) (define-method (datastore-find-build-derivation-system (datastore ) build-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT derivations.system FROM builds INNER JOIN derivations ON builds.derivation_name = derivations.name WHERE builds.uuid = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id) (let ((system (match (sqlite-step statement) (#(system) system)))) (sqlite-reset statement) system))))) (define-method (datastore-list-builds-for-output (datastore ) output) (call-with-time-tracking datastore "list_builds_for_output_duration_seconds" (lambda () (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, builds.derivation_name, priority, processed, result FROM builds INNER JOIN derivation_outputs ON builds.derivation_name = derivation_outputs.derivation_name LEFT JOIN build_results ON builds.uuid = build_results.build_id WHERE derivation_outputs.output = :output" #:cache? #t))) (sqlite-bind-arguments statement #:output output) (let ((result (sqlite-map (match-lambda (#(uuid derivation priority processed result) `((uuid . ,uuid) (derivation . ,derivation) (priority . ,priority) (processed . ,processed) (result . ,result)))) statement))) (sqlite-reset statement) result))))))) (define-method (datastore-list-builds-for-output-and-system (datastore ) output system) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, builds.derivation_name FROM builds INNER JOIN derivation_outputs ON builds.derivation_name = derivation_outputs.derivation_name INNER JOIN derivations ON builds.derivation_name = derivations.name WHERE derivation_outputs.output = :output AND derivations.system = :system" #:cache? #t))) (sqlite-bind-arguments statement #:output output #:system system) (let ((result (sqlite-map (match-lambda (#(uuid derivation) `((uuid . ,uuid) (derivation . ,derivation)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-builds-for-derivation (datastore ) derivation) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid FROM builds WHERE derivation_name = :derivation" #:cache? #t))) (sqlite-bind-arguments statement #:derivation derivation) (let ((result (sqlite-map (match-lambda (#(uuid) `((uuid . ,uuid)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-update (datastore )) (run-sqitch (slot-ref datastore 'database-file)) #t) (define-method (datastore-count-setup-failures (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, failure_reason, COUNT(*) FROM setup_failures GROUP BY agent_id, failure_reason" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id failure_reason count) (cons (list agent_id failure_reason) count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-setup-failures-for-build (datastore ) build-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, agent_id, failure_reason FROM setup_failures WHERE build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id) (let ((result (sqlite-map (match-lambda (#(id agent-id failure-reason) `((id . ,id) (agent-id . ,agent-id) (failure-reason . ,failure-reason)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-fetch-setup-failures (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, build_id, agent_id, failure_reason FROM setup_failures INNER JOIN builds ON builds.uuid = setup_failures.build_id WHERE builds.processed = 0" #:cache? #t))) (let ((result (sqlite-fold (lambda (row result) (match row (#(id build-id agent-id failure-reason) (let ((failures-for-build-id (or (hash-ref result build-id) '()))) (hash-set! result build-id (cons `((id . ,id) (agent-id . ,agent-id) (failure-reason . ,failure-reason)) failures-for-build-id))))) result) (make-hash-table) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-processed-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivation_name, priority FROM builds WHERE processed = 1" #:cache? #t))) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name priority) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-list-unprocessed-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT uuid, derivation_name, priority FROM builds WHERE processed = 0 ORDER BY priority DESC" #:cache? #t))) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name priority) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-fetch-build-ids-and-propagated-priorities-for-unprocessed-builds (datastore ) created-after) (define (populate-unprocessed-builds-table db) (sqlite-exec db " DROP TABLE IF EXISTS temp.unprocessed_builds") (let ((statement (sqlite-prepare db (string-append " CREATE TEMP TABLE unprocessed_builds AS SELECT uuid FROM builds WHERE processed = 0 " (if created-after (simple-format #f "AND created_at >= ~A\n" created-after) "") " AND NOT EXISTS ( SELECT 1 FROM derivation_inputs INNER JOIN derivation_outputs ON derivation_inputs.derivation_output_id = derivation_outputs.id INNER JOIN unbuilt_outputs ON unbuilt_outputs.output = derivation_outputs.output WHERE builds.derivation_name = derivation_inputs.derivation_name ) EXCEPT SELECT build_id FROM allocated_builds ")))) (sqlite-step statement) (sqlite-finalize statement) #t)) (define (query-unprocessed-builds-table db) (let ((statement (sqlite-prepare db (string-append " SELECT * FROM unprocessed_builds")))) (let ((builds (sqlite-map (match-lambda (#(uuid) uuid)) statement))) (sqlite-finalize statement) builds))) (define (fetch-propagated-priorities-for-unprocessed-builds db) (let ((statement (sqlite-prepare db " WITH RECURSIVE builds_with_derived_priority( uuid, derivation_name, derived_priority ) AS ( SELECT builds.uuid, builds.derivation_name, builds.priority FROM builds INNER JOIN unprocessed_builds ON builds.uuid = unprocessed_builds.uuid UNION SELECT builds.uuid, builds.derivation_name, max(builds.priority, builds_with_derived_priority.derived_priority) FROM builds_with_derived_priority INNER JOIN derivation_inputs ON builds_with_derived_priority.derivation_name = derivation_inputs.derivation_name INNER JOIN derivation_outputs ON derivation_inputs.derivation_output_id = derivation_outputs.id INNER JOIN derivation_outputs AS all_derivation_outputs ON all_derivation_outputs.output = derivation_outputs.output INNER JOIN builds ON builds.processed = 0 AND builds.derivation_name = all_derivation_outputs.derivation_name ) SELECT builds_with_derived_priority.uuid, MAX(derived_priority) FROM builds_with_derived_priority INNER JOIN builds ON builds.uuid = builds_with_derived_priority.uuid WHERE builds.processed = 0 GROUP BY builds_with_derived_priority.uuid" #:cache? #t))) (let ((result (sqlite-fold (lambda (row result) (match row (#(uuid derived-priority) (hash-set! result uuid derived-priority))) result) (make-hash-table 10000) statement))) result))) (datastore-call-with-transaction datastore (lambda (db) (populate-unprocessed-builds-table db) (let ((propagated-priorities (fetch-propagated-priorities-for-unprocessed-builds db)) (unprocessed-build-ids (query-unprocessed-builds-table db))) (values propagated-priorities unprocessed-build-ids))) #:readonly? #t)) (define (insert-unprocessed-hook-event db event arguments) (let ((statement (sqlite-prepare db " INSERT INTO unprocessed_hook_events (event, arguments) VALUES (:event, :arguments)"))) (sqlite-bind-arguments statement #:event event #:arguments (call-with-output-string (lambda (port) (write arguments port)))) (sqlite-step statement) (sqlite-reset statement)) #t) (define-method (datastore-count-unprocessed-hook-events (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT event, COUNT(*) FROM unprocessed_hook_events GROUP BY event" #:cache? #t))) (let ((counts (sqlite-map (match-lambda (#(event count) `((event . ,event) (count . ,count)))) statement))) (sqlite-reset statement) counts))))) (define-method (datastore-list-unprocessed-hook-events (datastore ) event limit) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT id, event, arguments FROM unprocessed_hook_events WHERE event = :event LIMIT :limit" #:cache? #t))) (sqlite-bind-arguments statement #:event (symbol->string event) #:limit limit) (let ((events (sqlite-map (match-lambda (#(id event arguments) (list id (string->symbol event) (call-with-input-string arguments (lambda (port) (read port)))))) statement))) (sqlite-reset statement) events))))) (define-method (datastore-delete-unprocessed-hook-event (datastore ) id) (call-with-worker-thread (slot-ref datastore 'worker-writer-thread-channel) (lambda (db) (sqlite-exec db (string-append " DELETE FROM unprocessed_hook_events WHERE id = " (number->string id)))))) (define-method (datastore-count-build-allocation-plan-entries (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, COUNT(*) FROM build_allocation_plan GROUP BY agent_id" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id count) (cons agent_id count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-replace-build-allocation-plan (datastore ) planned-builds) (define (clear-current-plan db) (sqlite-exec db "DELETE FROM build_allocation_plan")) (define (insert-new-plan db planned-builds) (sqlite-exec db (string-append " INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES " (string-join (map (match-lambda ((build-id agent-id ordering) (simple-format #f "('~A', '~A', ~A)" build-id agent-id ordering))) planned-builds) ", ") ";"))) (call-with-time-tracking datastore "replace_build_allocation_plan_duration_seconds" (lambda () (datastore-call-with-transaction datastore (lambda (db) (clear-current-plan db) (unless (null? planned-builds) (insert-new-plan db planned-builds)))))) #t) (define-method (datastore-count-allocated-builds (datastore )) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id, COUNT(*) FROM allocated_builds GROUP BY agent_id" #:cache? #t))) (let ((result (sqlite-map (match-lambda (#(agent_id count) (cons agent_id count))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-agent-requested-systems (datastore ) agent-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT system FROM build_allocation_agent_requested_systems WHERE agent_id = :agent_id ORDER BY system ASC" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((result (sqlite-map (match-lambda (#(system) system)) statement))) (sqlite-reset statement) result))))) (define-method (datastore-update-agent-requested-systems (datastore ) agent-id systems) (define update-not-needed? (equal? (sort systems string) agent-id systems count) (define (fetch-build db) (let ((statement (sqlite-prepare db ;; This needs to guard against the plan being out of date (simple-format #f " SELECT builds.uuid, builds.derivation_name FROM builds INNER JOIN build_allocation_plan ON builds.uuid = build_allocation_plan.build_id INNER JOIN derivations ON builds.derivation_name = derivations.name WHERE build_allocation_plan.agent_id = :agent_id AND builds.processed = 0 AND builds.uuid NOT IN (SELECT build_id FROM allocated_builds) AND derivations.system IN (~A) AND NOT EXISTS ( SELECT 1 FROM derivation_outputs AS build_derivation_outputs INNER JOIN allocated_builds ON allocated_builds.agent_id = :agent_id INNER JOIN builds AS allocated_build_details ON allocated_build_details.uuid = allocated_builds.build_id INNER JOIN derivation_outputs AS allocated_builds_derivation_outputs ON allocated_build_details.derivation_name = allocated_builds_derivation_outputs.derivation_name WHERE build_derivation_outputs.derivation_name = builds.derivation_name AND build_derivation_outputs.output = allocated_builds_derivation_outputs.output ) ORDER BY build_allocation_plan.ordering ASC LIMIT 1" (string-join (map (lambda (system) (string-append "'" system "'")) systems) ","))))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((build (match (sqlite-step statement) (#f #f) (#(uuid derivation_name) `((uuid . ,uuid) (derivation-name . ,derivation_name)))))) (sqlite-finalize statement) build))) (define (insert-to-allocated-builds db agent-id build-ids) (sqlite-exec db (string-append " INSERT INTO allocated_builds (build_id, agent_id) VALUES " (string-join (map (lambda (build-id) (simple-format #f "('~A', '~A')" build-id agent-id)) build-ids) ", ") ";"))) (define (remove-builds-from-plan db build-ids) (sqlite-exec db (string-append " DELETE FROM build_allocation_plan WHERE build_id IN (" (string-join (map (lambda (build-id) (string-append "'" build-id "'")) build-ids) ", ") ")"))) (define (allocate-one-build db agent-id) (let ((build-details (fetch-build db))) (if build-details (let ((build-id (assq-ref build-details 'uuid))) (insert-to-allocated-builds db agent-id (list build-id)) (remove-builds-from-plan db (list build-id)) build-details) #f))) (define (allocate-several-builds db agent-id count) (let loop ((builds '())) (if (= (length builds) count) builds (let ((build-details (allocate-one-build db agent-id))) (if build-details (loop (cons build-details builds)) builds))))) (datastore-call-with-transaction datastore (lambda (db) (allocate-several-builds db agent-id count)))) (define-method (datastore-list-allocation-plan-builds (datastore ) agent-id limit) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db ;; This needs to guard against the plan being out of date " SELECT builds.uuid, builds.derivation_name FROM builds INNER JOIN build_allocation_plan ON builds.uuid = build_allocation_plan.build_id WHERE build_allocation_plan.agent_id = :agent_id AND builds.processed = 0 AND builds.uuid NOT IN (SELECT build_id FROM allocated_builds) ORDER BY build_allocation_plan.ordering ASC LIMIT :limit" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id #:limit limit) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name) `((uuid . ,uuid) (derivation-name . ,derivation_name)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-list-agent-builds (datastore ) agent-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT builds.uuid, builds.derivation_name, builds.priority FROM builds INNER JOIN allocated_builds ON builds.uuid = allocated_builds.build_id WHERE allocated_builds.agent_id = :agent_id" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id agent-id) (let ((builds (sqlite-map (match-lambda (#(uuid derivation_name priority) `((uuid . ,uuid) (derivation-name . ,derivation_name) (priority . ,priority)))) statement))) (sqlite-reset statement) builds))))) (define-method (datastore-agent-for-build (datastore ) build-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT agent_id FROM allocated_builds WHERE allocated_builds.build_id = :build_id UNION SELECT agent_id FROM build_results WHERE build_results.build_id = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id) (let ((result (match (sqlite-step statement) (#(agent-id) agent-id) (#f #f)))) (sqlite-reset statement) result))))) (define (db-open database) (define flags (list SQLITE_OPEN_READWRITE SQLITE_OPEN_NOMUTEX)) (unless (file-exists? database) (run-sqitch database)) (sqlite-open database (apply logior flags))) (define (run-sqitch database-file) (let ((command (list (%config 'sqitch) "deploy" "--db-client" (%config 'sqitch-sqlite) ;; if sqitch.conf exists (which it should when developing), ;; just use the current directory as the chdir value. Otherwise ;; use the directory which should contain the right files after ;; installation. "--chdir" (if (file-exists? "sqitch.conf") (getcwd) (string-append (dirname (%config 'sqitch-plan)) "/sqlite")) "--plan-file" (%config 'sqitch-plan) "--registry" (string-append (if (string-suffix? ".db" database-file) (string-drop-right database-file 3) database-file) "_sqitch_registry.db") (string-append "db:sqlite:" database-file)))) (simple-format #t "running command: ~A\n" (string-join command)) (unless (zero? (apply system* command)) (simple-format (current-error-port) "error: sqitch command failed\n") (exit 1)))) (define (changes-count db) (let ((statement (sqlite-prepare db "SELECT changes();" #:cache? #t))) (let ((count (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) count))) (define (last-insert-rowid db) (let ((statement (sqlite-prepare db "SELECT last_insert_rowid();" #:cache? #t))) (let ((id (vector-ref (sqlite-step statement) 0))) (sqlite-reset statement) id))) (define (select-derivation-outputs db derivation-name) (let ((statement (sqlite-prepare db " SELECT name, id FROM derivation_outputs WHERE derivation_name = :derivation_name" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_name derivation-name) (let ((outputs (sqlite-map (match-lambda (#(name output-id) (cons name output-id))) statement))) (sqlite-reset statement) outputs))) (define (db-find-derivation db name) (let ((statement (sqlite-prepare db " SELECT system, fixed_output FROM derivations WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name name) (let ((result (match (sqlite-step statement) (#f #f) (#(system fixed_output) `((system . ,system) (fixed-output? . ,(cond ((eq? fixed_output 0) #f) ((eq? fixed_output 1) #t) (else fixed_output)))))))) (sqlite-reset statement) result))) (define-method (datastore-find-derivation (datastore ) name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (db-find-derivation db name)))) (define-method (datastore-find-derivation-outputs (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT name, output FROM derivation_outputs WHERE derivation_name = :derivation_name" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_name derivation-name) (let ((result (sqlite-map (match-lambda (#(name output) `((name . ,name) (output . ,output)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-list-build-outputs (datastore ) build-id) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT name, output, hash, size, store_references FROM builds INNER JOIN derivation_outputs ON builds.derivation_name = derivation_outputs.derivation_name LEFT JOIN output_metadata ON output_metadata.derivation_output_id = derivation_outputs.id AND output_metadata.build_id = builds.uuid WHERE builds.uuid = :build_id" #:cache? #t))) (sqlite-bind-arguments statement #:build_id build-id) (let ((result (sqlite-map (match-lambda (#(name output hash size store_references) `((name . ,name) (output . ,output) (hash . ,hash) (size . ,size) (references . ,(and store_references (list->vector (string-split store_references #\space))))))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-find-derivation-system (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT system FROM derivations WHERE name = :name" #:cache? #t))) (sqlite-bind-arguments statement #:name derivation-name) (let ((system (match (sqlite-step statement) (#(system) system)))) (sqlite-reset statement) system))))) (define-method (datastore-find-derivation-inputs (datastore ) derivation-name) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " SELECT derivation_outputs.derivation_name, derivation_outputs.name, derivation_outputs.output FROM derivation_inputs INNER JOIN derivation_outputs ON derivation_inputs.derivation_output_id = derivation_outputs.id WHERE derivation_inputs.derivation_name = :derivation_name" #:cache? #t))) (sqlite-bind-arguments statement #:derivation_name derivation-name) (let ((result (sqlite-map (match-lambda (#(derivation output-name output) `((derivation . ,derivation) (output_name . ,output-name) (output . ,output)))) statement))) (sqlite-reset statement) result))))) (define-method (datastore-find-derivation-for-output (datastore ) start-derivation-name output) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db " WITH RECURSIVE related_derivations(name) AS ( VALUES(:derivation) UNION SELECT derivation_outputs.derivation_name FROM derivation_outputs INNER JOIN derivation_inputs ON derivation_outputs.id = derivation_inputs.derivation_output_id INNER JOIN related_derivations ON related_derivations.name = derivation_inputs.derivation_name ) SELECT related_derivations.name FROM related_derivations INNER JOIN derivation_outputs ON related_derivations.name = derivation_outputs.derivation_name WHERE output = :output " #:cache? #t))) (sqlite-bind-arguments statement #:derivation start-derivation-name #:output output) (let ((result (match (sqlite-step statement) (#f #f) (#(derivation) derivation)))) (sqlite-reset statement) result))))) (define (insert-derivation-and-return-outputs db derivation) (define derivation-name (derivation-file-name derivation)) (define (insert-derivation) (let ((derivation-details (db-find-derivation db derivation-name)) (fixed-output? (fixed-output-derivation? derivation))) (if derivation-details (begin (unless (equal? (assq-ref derivation-details 'fixed-output?) fixed-output?) (sqlite-exec db (simple-format #f " UPDATE derivations SET fixed_output = ~A WHERE name = '~A'" (if fixed-output? 1 0) derivation-name))) 0) (let ((statement (sqlite-prepare db " INSERT OR IGNORE INTO derivations (name, system, fixed_output) VALUES (:name, :system, :fixed_output)" #:cache? #t))) (sqlite-bind-arguments statement #:name derivation-name #:system (derivation-system derivation) #:fixed_output (if fixed-output? 1 0)) (sqlite-step statement) (sqlite-reset statement) (changes-count db))))) (let ((changes (insert-derivation))) (unless (eq? changes 0) (insert-derivation-inputs db derivation-name (derivation-inputs derivation)) (insert-derivation-outputs db derivation-name (derivation-outputs derivation))) (select-derivation-outputs db derivation-name))) (define (insert-derivation-inputs db derivation-name derivation-inputs) (unless (null? derivation-inputs) (let ((derivation-output-ids (append-map (lambda (derivation-input) (let ((output-ids-by-name (insert-derivation-and-return-outputs db (derivation-input-derivation derivation-input)))) (map (lambda (output-name) (assoc-ref output-ids-by-name output-name)) (derivation-input-sub-derivations derivation-input)))) derivation-inputs))) (sqlite-exec db (string-append " INSERT INTO derivation_inputs (derivation_name, derivation_output_id) VALUES " (string-join (map (lambda (derivation-output-id) (simple-format #f "('~A', ~A)" derivation-name derivation-output-id)) derivation-output-ids) ", ") ";"))))) (define (insert-derivation-outputs db derivation-name derivation-outputs) (begin (sqlite-exec db (string-append " INSERT INTO derivation_outputs (derivation_name, name, output) VALUES " (string-join (map (match-lambda ((name . derivation-output) (simple-format #f "('~A', '~A', '~A')" derivation-name name (derivation-output-path derivation-output)))) derivation-outputs) ", ") ";")) (sqlite-exec db (string-append " INSERT OR IGNORE INTO unbuilt_outputs (output) SELECT * FROM (VALUES " (string-join (map (match-lambda ((name . derivation-output) (simple-format #f "('~A')" (derivation-output-path derivation-output)))) derivation-outputs) ", ") ") AS outputs WHERE NOT EXISTS ( SELECT 1 FROM builds INNER JOIN build_results ON builds.uuid = build_results.build_id INNER JOIN derivation_outputs ON builds.derivation_name = derivation_outputs.derivation_name WHERE build_results.result = 'success' AND derivation_outputs.output = outputs.column1 )")))) (define (insert-build db uuid derivation-name priority) (let ((statement (sqlite-prepare db " INSERT INTO builds (uuid, derivation_name, priority, created_at) VALUES (:uuid, :derivation_name, :priority, datetime('now'))" #:cache? #t))) (sqlite-bind-arguments statement #:uuid uuid #:derivation_name derivation-name #:priority priority) (sqlite-step statement) (sqlite-reset statement))) (define (insert-agent db uuid description) (let ((statement (sqlite-prepare db " INSERT INTO agents (id, description) VALUES (:id, :description)" #:cache? #t))) (sqlite-bind-arguments statement #:id uuid #:description description) (sqlite-step statement) (sqlite-reset statement))) (define (insert-agent-password db uuid password) (let ((statement (sqlite-prepare db " INSERT INTO agent_passwords (agent_id, password) VALUES (:agent_id, :password)" #:cache? #t))) (sqlite-bind-arguments statement #:agent_id uuid #:password password) (sqlite-step statement) (sqlite-reset statement)))