aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2025-02-08 15:17:49 +0000
committerChristopher Baines <mail@cbaines.net>2025-02-08 16:41:09 +0000
commit40140d2e9051734ea773b9787f77d0bf2bf69e18 (patch)
treed74614071b0166f6f91699ccd5872763120b18a4
parenteaf68e1ab8798fb0e30424ab8b339b901e2fee1f (diff)
downloadbuild-coordinator-40140d2e9051734ea773b9787f77d0bf2bf69e18.tar
build-coordinator-40140d2e9051734ea773b9787f77d0bf2bf69e18.tar.gz
Move part of the build-success operation to a background job
As it can be slow.
-rw-r--r--guix-build-coordinator/coordinator.scm154
-rw-r--r--guix-build-coordinator/datastore.scm3
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm161
-rw-r--r--sqitch/pg/deploy/background-jobs-queue.sql7
-rw-r--r--sqitch/pg/revert/background-jobs-queue.sql7
-rw-r--r--sqitch/pg/verify/background-jobs-queue.sql7
-rw-r--r--sqitch/sqitch.plan1
-rw-r--r--sqitch/sqlite/deploy/background-jobs-queue.sql11
-rw-r--r--sqitch/sqlite/revert/background-jobs-queue.sql7
-rw-r--r--sqitch/sqlite/verify/background-jobs-queue.sql7
10 files changed, 333 insertions, 32 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index b0b0581..8a20b97 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -45,11 +45,14 @@
#:use-module (logging port-log)
#:use-module (gcrypt random)
#:use-module (fibers)
+ #:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers conditions)
+ #:use-module (fibers operations)
#:use-module (knots)
#:use-module (knots timeout)
+ #:use-module (knots parallelism)
#:use-module (knots thread-pool)
#:use-module (prometheus)
#:use-module (guix store)
@@ -130,6 +133,9 @@
(hooks build-coordinator-hooks)
(hook-condvars build-coordinator-hook-condvars
set-build-coordinator-hook-condvars!)
+ (background-job-conditions
+ build-coordinator-background-job-conditions
+ set-build-coordinator-background-job-conditions!)
(metrics-registry build-coordinator-metrics-registry)
(allocation-strategy build-coordinator-allocation-strategy)
(trigger-build-allocation
@@ -556,6 +562,10 @@
(datastore-spawn-fibers
(build-coordinator-datastore build-coordinator))
+ (set-build-coordinator-background-job-conditions!
+ build-coordinator
+ (start-background-job-processing-fibers build-coordinator))
+
(spawn-fiber-to-watch-for-deferred-builds build-coordinator)
(spawn-build-allocation-plan-management-fiber build-coordinator)
@@ -1866,10 +1876,11 @@
(list build-id))
(when success?
- (datastore-delete-relevant-outputs-from-unbuilt-outputs
+ (datastore-insert-background-job
datastore
- build-id)
- (datastore-update-unprocessed-builds-for-build-success
+ 'build-success
+ (list build-id))
+ (datastore-delete-relevant-outputs-from-unbuilt-outputs
datastore
build-id)
(datastore-store-output-metadata
@@ -1894,6 +1905,11 @@
'build-success
'build-failure))
+ (when success?
+ (build-coordinator-trigger-background-job-processing
+ build-coordinator
+ 'build-success))
+
(build-coordinator-send-event
build-coordinator
(if success?
@@ -1963,3 +1979,135 @@
;; Trigger build allocation, so that the allocator can handle this setup
;; failure
(trigger-build-allocation build-coordinator))
+
+(define (build-coordinator-trigger-background-job-processing
+ build-coordinator
+ type)
+ (let ((condition
+ (assq-ref (build-coordinator-background-job-conditions
+ build-coordinator)
+ type)))
+ (unless condition
+ (error
+ (simple-format #f "unknown condition ~A" type)))
+ (signal-reusable-condition! condition)))
+
+(define (start-background-job-processing-fibers build-coordinator)
+ (define %background-job-duration-histogram-buckets
+ (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf)))
+
+ (define* (start-job-fibers type proc #:key (parallelism 1))
+ (let ((coordination-channel
+ (make-channel))
+ (condition
+ (make-reusable-condition))
+ (process-in-fiber
+ (fiberize
+ (lambda args
+ (call-with-duration-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "coordinator_background_job_duration_seconds"
+ (lambda ()
+ (apply proc args))
+ #:labels '(job)
+ #:label-values `((job . ,type))
+ #:buckets %background-job-duration-histogram-buckets))
+ #:parallelism parallelism))
+ (job-exception-counter-metric
+ (make-counter-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "coordinator_background_job_failures_total"
+ #:labels '(job))))
+
+ (define (process id . args)
+ (spawn-fiber
+ (lambda ()
+ (let loop ((retry 0))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG
+ "processing " type " background job (id: "
+ id ", args: " args ", retry: " retry ")")
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'WARN
+ type " background job error (id: "
+ id "): " exn)
+ #f)
+ (lambda ()
+ (apply process-in-fiber args))
+ #:unwind? #t)))
+ (if success?
+ (begin
+ (datastore-delete-background-job
+ (build-coordinator-datastore build-coordinator)
+ id)
+ (put-message coordination-channel
+ (list 'job-finished id)))
+ (begin
+ (metric-increment job-exception-counter-metric
+ #:label-values `((job . ,type)))
+ (sleep 30)
+ (loop (+ 1 retry)))))))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let ((job-details
+ (datastore-select-background-jobs
+ (build-coordinator-datastore build-coordinator)
+ type
+ #:limit (* 2 parallelism))))
+
+ (unless (null? job-details)
+ (put-message coordination-channel
+ (list 'process job-details)))
+
+ (reusable-condition-wait condition
+ #:timeout 30)))))
+
+ (spawn-fiber
+ (lambda ()
+ (let loop ((running-job-ids '()))
+ (match (get-message coordination-channel)
+ (('process jobs)
+ (let* ((job-ids (map (lambda (job)
+ (assq-ref job 'id))
+ jobs))
+ (new-ids
+ (lset-difference = job-ids running-job-ids))
+ (jobs-to-start
+ (take new-ids
+ (min
+ (- parallelism
+ (length running-job-ids))
+ (length new-ids)))))
+ (for-each (lambda (job)
+ (apply process
+ (assq-ref job 'id)
+ (assq-ref job 'args)))
+ (filter
+ (lambda (job-details)
+ (member (assq-ref job-details 'id)
+ jobs-to-start))
+ jobs))
+ (loop (append running-job-ids
+ jobs-to-start))))
+ (('job-finished id)
+ ;; Maybe not very efficient, but should work
+ (signal-reusable-condition! condition)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG type
+ " background job " id
+ " finished successfully")
+ (loop (delete id running-job-ids)))))))
+
+ condition))
+
+ `((build-success . ,(start-job-fibers
+ 'build-success
+ (lambda (build-id)
+ (datastore-update-unprocessed-builds-for-build-success
+ (build-coordinator-datastore build-coordinator)
+ build-id))))))
diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm
index 3aaf2a7..d933e44 100644
--- a/guix-build-coordinator/datastore.scm
+++ b/guix-build-coordinator/datastore.scm
@@ -92,6 +92,9 @@
(re-export datastore-fetch-build-to-allocate)
(re-export datastore-check-if-derivation-conflicts?)
(re-export datastore-insert-to-allocated-builds)
+(re-export datastore-insert-background-job)
+(re-export datastore-delete-background-job)
+(re-export datastore-select-background-jobs)
(define* (database-uri->datastore database
#:key
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index 9ca213a..e5367c6 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -104,7 +104,10 @@
datastore-update-agent-requested-systems
datastore-fetch-build-to-allocate
datastore-check-if-derivation-conflicts?
- datastore-insert-to-allocated-builds))
+ datastore-insert-to-allocated-builds
+ datastore-insert-background-job
+ datastore-delete-background-job
+ datastore-select-background-jobs))
(define-class <sqlite-datastore> (<abstract-datastore>)
database-file
@@ -1893,13 +1896,11 @@ LIMIT 1"
(#f #t)
(#(1) #f))))
- (call-with-writer-thread/delay-logging
- datastore
- (lambda (db)
- (let ((builds-statement
- (sqlite-prepare
- db
- "
+ (define (all-build-ids db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
SELECT DISTINCT unprocessed_builds.id
FROM builds
INNER JOIN derivation_outputs
@@ -1915,32 +1916,51 @@ INNER JOIN unprocessed_builds_with_derived_priorities
ON unprocessed_builds_with_derived_priorities.build_id = unprocessed_builds.id
AND unprocessed_builds_with_derived_priorities.all_inputs_built = 0
WHERE builds.id = :build_id"
- #:cache? #t))
+ #:cache? #t)))
- (update-statement
- (sqlite-prepare
- db
- "
+ (sqlite-bind-arguments
+ statement
+ #:build_id (db-find-build-id db build-uuid))
+
+ (let ((result
+ (sqlite-fold
+ (lambda (row result)
+ (match row
+ (#(build-id)
+ (if (all-inputs-built? db build-id)
+ (cons build-id result)
+ result))))
+ '()
+ statement)))
+ (sqlite-reset statement)
+
+ result)))
+
+ (let ((build-ids
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (all-build-ids db)))))
+ (call-with-writer-thread/delay-logging
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
UPDATE unprocessed_builds_with_derived_priorities
SET all_inputs_built = 1
WHERE build_id = :build_id"
- #:cache? #t)))
+ #:cache? #t)))
- (sqlite-bind-arguments builds-statement
- #:build_id (db-find-build-id db build-uuid))
-
- (sqlite-fold
- (lambda (row result)
- (match row
- (#(build-id)
- (when (all-inputs-built? db build-id)
- (sqlite-bind-arguments update-statement
- #:build_id build-id)
- (sqlite-step-and-reset update-statement))))
- #f)
- #f
- builds-statement)
- (sqlite-reset builds-statement)))))
+ (for-each
+ (lambda (build-id)
+ (sqlite-bind-arguments statement
+ #:build_id build-id)
+ (sqlite-step-and-reset statement))
+ build-ids)
+
+ #t)))))
(define-method (datastore-remove-build-allocation
(datastore <sqlite-datastore>)
@@ -4461,3 +4481,86 @@ VALUES (:agent_id, :password)"
#:password password)
(sqlite-step-and-reset statement)))
+
+(define-method (datastore-insert-background-job
+ (datastore <sqlite-datastore>)
+ type
+ args)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO background_jobs_queue (type, args)
+VALUES (:type, :args)
+RETURNING id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:type (symbol->string type)
+ #:args (call-with-output-string
+ (lambda (port)
+ (write args port))))
+
+ (match (sqlite-step-and-reset statement)
+ (#(id) id))))))
+
+(define-method (datastore-delete-background-job
+ (datastore <sqlite-datastore>)
+ id)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+DELETE FROM background_jobs_queue WHERE id = :id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:id id)
+
+ (sqlite-step-and-reset statement))
+ #t)
+ #:priority? #t))
+
+(define-method (datastore-select-background-jobs
+ (datastore <sqlite-datastore>)
+ .
+ args)
+ (apply
+ (lambda* (type #:key (limit 1))
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT id, args
+FROM background_jobs_queue
+WHERE type = :type
+ORDER BY id ASC
+LIMIT :limit"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:type (symbol->string type)
+ #:limit limit)
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(id args)
+ `((id . ,id)
+ (args . ,(call-with-input-string args read)))))
+ statement)))
+ (sqlite-reset statement)
+ result)))))
+ args))
diff --git a/sqitch/pg/deploy/background-jobs-queue.sql b/sqitch/pg/deploy/background-jobs-queue.sql
new file mode 100644
index 0000000..63dd8d4
--- /dev/null
+++ b/sqitch/pg/deploy/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Deploy guix-build-coordinator:background-jobs-queue to pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/revert/background-jobs-queue.sql b/sqitch/pg/revert/background-jobs-queue.sql
new file mode 100644
index 0000000..46b0761
--- /dev/null
+++ b/sqitch/pg/revert/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:background-jobs-queue from pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/verify/background-jobs-queue.sql b/sqitch/pg/verify/background-jobs-queue.sql
new file mode 100644
index 0000000..a36097e
--- /dev/null
+++ b/sqitch/pg/verify/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:background-jobs-queue on pg
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/sqitch.plan b/sqitch/sqitch.plan
index cfb5b9d..83de06b 100644
--- a/sqitch/sqitch.plan
+++ b/sqitch/sqitch.plan
@@ -46,3 +46,4 @@ agent_status_add_processor_count 2023-03-24T09:28:47Z Chris <chris@felis> # Add
remove_build_allocation_plan 2023-04-23T19:50:23Z Chris <chris@felis> # Remove build_allocation_plan
system_uptime 2023-05-05T18:18:35Z Chris <chris@felis> # Add system uptime
build_starts_index 2023-11-24T16:30:13Z Chris <chris@felis> # build_starts index
+background-jobs-queue 2025-02-06T10:49:08Z Chris <chris@fang> # Add background_jobs_queue
diff --git a/sqitch/sqlite/deploy/background-jobs-queue.sql b/sqitch/sqlite/deploy/background-jobs-queue.sql
new file mode 100644
index 0000000..1cb35f0
--- /dev/null
+++ b/sqitch/sqlite/deploy/background-jobs-queue.sql
@@ -0,0 +1,11 @@
+-- Deploy guix-build-coordinator:background-jobs-queue to sqlite
+
+BEGIN;
+
+CREATE TABLE background_jobs_queue (
+ id INTEGER PRIMARY KEY,
+ type TEXT NOT NULL,
+ args TEXT NOT NULL
+);
+
+COMMIT;
diff --git a/sqitch/sqlite/revert/background-jobs-queue.sql b/sqitch/sqlite/revert/background-jobs-queue.sql
new file mode 100644
index 0000000..10ef1ff
--- /dev/null
+++ b/sqitch/sqlite/revert/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:background-jobs-queue from sqlite
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/sqlite/verify/background-jobs-queue.sql b/sqitch/sqlite/verify/background-jobs-queue.sql
new file mode 100644
index 0000000..1cf9965
--- /dev/null
+++ b/sqitch/sqlite/verify/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:background-jobs-queue on sqlite
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;