aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-build-coordinator/agent-messaging/http/server.scm13
-rw-r--r--guix-build-coordinator/client-communication.scm26
-rw-r--r--guix-build-coordinator/coordinator.scm227
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm259
-rw-r--r--guix-build-coordinator/utils/fibers.scm21
5 files changed, 299 insertions, 247 deletions
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm
index 51ac6ac..2b325d7 100644
--- a/guix-build-coordinator/agent-messaging/http/server.scm
+++ b/guix-build-coordinator/agent-messaging/http/server.scm
@@ -986,12 +986,13 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
#:code 200
#:headers '((content-type . (text/plain))
(vary . (accept))))
- (lambda (port)
- (write-metrics (build-coordinator-metrics-registry
- build-coordinator)
- port)
- (write-metrics plain-metrics-registry
- port))))))
+ (call-with-output-string
+ (lambda (port)
+ (write-metrics (build-coordinator-metrics-registry
+ build-coordinator)
+ port)
+ (write-metrics plain-metrics-registry
+ port)))))))
(_
(render-json
"not-found"
diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm
index 354cb31..47a289d 100644
--- a/guix-build-coordinator/client-communication.scm
+++ b/guix-build-coordinator/client-communication.scm
@@ -32,6 +32,7 @@
#:use-module (json)
#:use-module (logging logger)
#:use-module (gcrypt random)
+ #:use-module (prometheus)
#:use-module (web uri)
#:use-module (web client)
#:use-module (web request)
@@ -100,6 +101,14 @@
(json-string->scm (utf8->string raw-body))
'()))
+ (define read-drv-error-count-metric
+ (or (metrics-registry-fetch-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "read_derivation_error_total")
+ (make-counter-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "read_derivation_error_total")))
+
(define (controller-thunk)
(match method-and-path-components
(('GET "build" uuid)
@@ -514,8 +523,11 @@
(retry-on-error
(lambda ()
(read-drv/substitute derivation-file))
- #:times 5
- #:delay 5))
+ #:times 2
+ #:delay 3
+ #:error-hook
+ (lambda _
+ (metric-increment read-drv-error-count-metric))))
(lambda args
(backtrace))))
#:unwind? #t))
@@ -533,6 +545,10 @@
body "ensure-all-related-derivation-outputs-have-builds")
'(#:ensure-all-related-derivation-outputs-have-builds? #t)
'())
+ ,@(if (assoc-ref
+ body "skip-updating-derived-priorities")
+ '(#:skip-updating-derived-priorities? #t)
+ '())
,@(if (assoc-ref body "tags")
`(#:tags
,(map
@@ -773,7 +789,8 @@
ensure-all-related-derivation-outputs-have-builds?
tags
#:key
- defer-until)
+ defer-until
+ skip-updating-derived-priorities?)
(send-request coordinator-uri
'POST
"/builds"
@@ -791,6 +808,9 @@
,@(if ensure-all-related-derivation-outputs-have-builds?
'((ensure-all-related-derivation-outputs-have-builds . #t))
'())
+ ,@(if skip-updating-derived-priorities?
+ '((skip-updating-derived-priorities . #t))
+ '())
,@(if (null? tags)
'()
`((tags . ,(list->vector tags))))
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index 59891d1..a7fb664 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -436,11 +436,6 @@
;; The logger assumes this
(set-port-encoding! (current-output-port) "UTF-8")
- ;; Work around my broken with-store/non-blocking in Guix
- (let ((socket-file (%daemon-socket-uri)))
- (%daemon-socket-uri
- (string-append "file://" socket-file)))
-
(with-exception-handler
(lambda (exn)
(simple-format #t "failed enabling core dumps: ~A\n" exn))
@@ -510,16 +505,21 @@
(make-worker-thread-channel
(const '())
#:name "utility"
- #:parallelism 6
+ #:parallelism 10
#:delay-logger
- (lambda (seconds-delayed)
- (log-delay "utility thread channel"
- seconds-delayed)
- (when (> seconds-delayed 0.1)
- (format
- (current-error-port)
- "warning: utility thread channel delayed by ~1,2f seconds~%"
- seconds-delayed))))))
+ (let ((delay-metric
+ (make-histogram-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "utility_thread_pool_delay_seconds")))
+ (lambda (seconds-delayed)
+ (log-delay "utility thread channel"
+ seconds-delayed)
+ (metric-observe delay-metric seconds-delayed)
+ (when (> seconds-delayed 0.1)
+ (format
+ (current-error-port)
+ "warning: utility thread channel delayed by ~1,2f seconds~%"
+ seconds-delayed)))))))
(let ((finished? (make-condition)))
(call-with-sigint
@@ -612,6 +612,7 @@
(ignore-if-build-for-derivation-exists? #f)
(ignore-if-build-for-outputs-exists? #f)
(ensure-all-related-derivation-outputs-have-builds? #f)
+ skip-updating-derived-priorities?
(tags '())
defer-until
(read-drv read-derivation-from-file*))
@@ -624,39 +625,15 @@
#:include-canceled? #f)
0))
- (define (build-for-output-already-exists?)
- ;; Handle the derivation not existing in the database here, so that adding
- ;; it to the database isn't required for this code to work
- (let* ((system-from-database (datastore-find-derivation-system datastore
- derivation-file))
-
- (derivation-exists-in-database? (not (eq? #f system-from-database)))
-
- (derivation
- (if derivation-exists-in-database?
- #f ; unnecessary to fetch derivation
- ;; TODO Read the derivation in a separate thread
- (with-fibers-port-timeouts
- (lambda ()
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file)))
- #:timeout 240)))
-
- (system
- (or system-from-database
- (derivation-system derivation)))
-
- (outputs
- (if derivation-exists-in-database?
- (datastore-find-derivation-outputs datastore
- derivation-file)
- (map
- (match-lambda
- ((name . output)
- `((name . ,name)
- (output . ,(derivation-output-path output)))))
- (derivation-outputs derivation)))))
+ (define (build-for-output-already-exists/with-derivation? derivation)
+ (let ((system (derivation-system derivation))
+ (outputs
+ (map
+ (match-lambda
+ ((name . output)
+ `((name . ,name)
+ (output . ,(derivation-output-path output)))))
+ (derivation-outputs derivation))))
(any
(lambda (output-details)
(let ((builds-for-output
@@ -668,13 +645,40 @@
(not (null? builds-for-output))))
outputs)))
- (define (check-whether-to-store-build)
+ (define (build-for-output-already-exists?)
+ (let ((system (datastore-find-derivation-system datastore
+ derivation-file)))
+ (if (eq? #f system) ; derivation does not exist in database?
+ (build-for-output-already-exists/with-derivation?
+ (with-fibers-port-timeouts
+ (lambda ()
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
+ #:timeout 240))
+ (any
+ (lambda (output-details)
+ (let ((builds-for-output
+ (datastore-list-builds-for-output-and-system
+ datastore
+ (assq-ref output-details 'output)
+ system
+ #:include-canceled? #f)))
+ (not (null? builds-for-output))))
+ (datastore-find-derivation-outputs datastore
+ derivation-file)))))
+
+ (define* (check-whether-to-store-build #:optional derivation)
(cond
((and ignore-if-build-for-derivation-exists?
(build-for-derivation-exists?))
'((no-build-submitted . build-already-exists-for-this-derivation)))
((and ignore-if-build-for-outputs-exists?
- (call-with-delay-logging build-for-output-already-exists?))
+ (if derivation
+ (call-with-delay-logging
+ build-for-output-already-exists/with-derivation?
+ #:args (list derivation))
+ (call-with-delay-logging build-for-output-already-exists?)))
'((no-build-submitted . build-already-exists-for-a-output)))
(else
'continue)))
@@ -704,18 +708,20 @@
(or requested-uuid
(random-v4-uuid)))
- (define (build-perform-datastore-changes derivations-lacking-builds)
+ (define (build-perform-datastore-changes derivation derivations-lacking-builds)
(lambda (_)
;; Check again now, since new builds could have been added since the
;; checks were made before the start of the transaction.
- (match (check-whether-to-store-build)
+ (match (check-whether-to-store-build derivation)
('continue
;; Actually create a build, do this first so the derived priorities
;; for the builds inserted below are informed by this build.
(store-build derivation-file
build-id
priority
- tags)
+ tags
+ #:skip-updating-other-build-derived-priorities
+ skip-updating-derived-priorities?)
(for-each
(match-lambda
@@ -768,65 +774,68 @@
(match (check-whether-to-store-build)
('continue
- ;; Store the derivation first, so that listing related derivations
- ;; with no builds works
- (unless (datastore-find-derivation datastore derivation-file)
- (datastore-store-derivation
- datastore
- ;; TODO Read the derivation in a separate thread
- (with-fibers-port-timeouts
- (lambda ()
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file)))
- #:timeout 30)))
-
- (let ((related-derivations-lacking-builds
- (if ensure-all-related-derivation-outputs-have-builds?
- (datastore-list-related-derivations-with-no-build-for-outputs
+ (let ((drv
+ ;; If the dervation is missing from the database, read it and
+ ;; enter it in to the database, so that listing related
+ ;; derivations with no builds works
+ (if (datastore-find-derivation datastore derivation-file)
+ #f
+ (with-fibers-port-timeouts
+ (lambda ()
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
+ #:timeout 30))))
+ (when drv
+ (datastore-store-derivation datastore drv))
+
+ (let ((related-derivations-lacking-builds
+ (if ensure-all-related-derivation-outputs-have-builds?
+ (datastore-list-related-derivations-with-no-build-for-outputs
+ datastore
+ derivation-file)
+ '())))
+ (match (datastore-call-with-transaction
datastore
- derivation-file)
- '())))
- (match (datastore-call-with-transaction
- datastore
- (build-perform-datastore-changes
- ;; Do this here so it doesn't take time in the writer thread
- (map
- (lambda (drv)
- ;; Generate the UUID's outside the transaction to save
- ;; time too.
- (cons drv (random-v4-uuid)))
- related-derivations-lacking-builds))
- #:duration-metric-name
- "store_build")
- (#t ; build submitted
- (build-coordinator-prompt-hook-processing-for-event
- build-coordinator
- 'build-submitted)
-
- (build-coordinator-send-event
- build-coordinator
- 'build-submitted
- `((id . ,build-id)
- (derivation . ,derivation-file)
- (priority . ,priority)
- (tags
- . ,(list->vector
+ (build-perform-datastore-changes
+ drv
+ ;; Do this here so it doesn't take time in the writer thread
(map
- (match-lambda
- ((key . value)
- `((key . ,key)
- (value . ,value))))
- (if (vector? tags)
- (vector->list tags)
- tags))))
- (defer_until . ,defer-until)))
-
- (trigger-build-allocation build-coordinator)
-
- `((build-submitted . ,build-id)))
- (stop-condition
- stop-condition))))
+ (lambda (related-drv)
+ ;; Generate the UUID's outside the transaction to save
+ ;; time too.
+ (cons related-drv (random-v4-uuid)))
+ related-derivations-lacking-builds))
+ #:duration-metric-name
+ "store_build")
+ (#t ; build submitted
+ (build-coordinator-prompt-hook-processing-for-event
+ build-coordinator
+ 'build-submitted)
+
+ (build-coordinator-send-event
+ build-coordinator
+ 'build-submitted
+ `((id . ,build-id)
+ (derivation . ,derivation-file)
+ (priority . ,priority)
+ (tags
+ . ,(list->vector
+ (map
+ (match-lambda
+ ((key . value)
+ `((key . ,key)
+ (value . ,value))))
+ (if (vector? tags)
+ (vector->list tags)
+ tags))))
+ (defer_until . ,defer-until)))
+
+ (trigger-build-allocation build-coordinator)
+
+ `((build-submitted . ,build-id)))
+ (stop-condition
+ stop-condition)))))
(stop-condition
stop-condition)))))
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index c3b62b2..ac04362 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -107,7 +107,9 @@
(define-class <sqlite-datastore> (<abstract-datastore>)
database-file
worker-reader-thread-channel
+ worker-reader-thread-proc-vector
worker-writer-thread-channel
+ worker-writer-thread-proc-vector
metrics-registry)
(define* (sqlite-datastore database-uri
@@ -137,124 +139,129 @@
(slot-set! datastore 'database-file database-file)
(slot-set! datastore 'metrics-registry metrics-registry)
- (slot-set!
- datastore
- 'worker-writer-thread-channel
- (make-worker-thread-channel
- (lambda ()
- (let ((db
- (db-open database-file)))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
- (sqlite-exec db "PRAGMA synchronous = NORMAL;")
- (sqlite-exec db "PRAGMA temp_store = MEMORY;")
- (sqlite-exec db "PRAGMA foreign_keys = ON;")
-
- (list db)))
- #:name "ds write"
- #:destructor
- (let ((writer-thread-destructor-counter
- (make-gauge-metric metrics-registry
- "datastore_writer_thread_close_total")))
- (lambda (db)
- (db-optimize db
- database-file
- metrics-registry
- #:maybe-truncate-wal? #f)
-
- (metric-increment writer-thread-destructor-counter)
- (sqlite-close db)))
- #:lifetime 500
- #:expire-on-exception? #t
-
- ;; SQLite doesn't support parallel writes
- #:parallelism 1
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
- metrics-registry
- "datastore_write_delay_seconds")))
- (lambda (seconds-delayed)
- (metric-observe delay-metric
- ;; TODO exact->inexact to work around
- ;; a bug in guile-prometheus where
- ;; the metric sum will output in the
- ;; exact form including the /q
- (exact->inexact seconds-delayed))
- (log-delay "datastore write" seconds-delayed)
- (when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database write delayed by ~1,2f seconds~%"
- seconds-delayed))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 10)
- (format
- (current-error-port)
- "warning: database write took ~1,2f seconds (~a)~%"
- duration
- proc)))
- #:log-exception? worker-thread-log-exception?))
-
- ;; Make sure the worker thread has initialised, and created the in memory
- ;; tables
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
- (const #t))
-
- (slot-set!
- datastore
- 'worker-reader-thread-channel
- (make-worker-thread-channel
- (lambda ()
- (let ((db
- (db-open database-file #:write? #f)))
- (sqlite-exec db "PRAGMA temp_store = MEMORY;")
- (sqlite-exec db "PRAGMA busy_timeout = 4000;")
- (sqlite-exec db "PRAGMA cache_size = -16000;")
-
- (list db)))
- #:name "ds read"
- #:destructor
- (let ((reader-thread-destructor-counter
- (make-gauge-metric metrics-registry
- "datastore_reader_thread_close_total")))
- (lambda (db)
- (metric-increment reader-thread-destructor-counter)
- (sqlite-close db)))
- #:lifetime 50000
- #:expire-on-exception? #t
-
- ;; Use a minimum of 8 and a maximum of 16 threads
- #:parallelism
- (min (max (current-processor-count)
- 8)
- 16)
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
+ (let ((channel
+ proc-vector
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file)))
+ (sqlite-exec db "PRAGMA busy_timeout = 0;")
+ (sqlite-exec db "PRAGMA synchronous = NORMAL;")
+ (sqlite-exec db "PRAGMA temp_store = MEMORY;")
+ (sqlite-exec db "PRAGMA foreign_keys = ON;")
+
+ (list db)))
+ #:name "ds write"
+ #:destructor
+ (let ((writer-thread-destructor-counter
+ (make-gauge-metric metrics-registry
+ "datastore_writer_thread_close_total")))
+ (lambda (db)
+ (db-optimize db
+ database-file
metrics-registry
- "datastore_read_delay_seconds")))
- (lambda (seconds-delayed)
- (metric-observe delay-metric
- ;; TODO exact->inexact to work around
- ;; a bug in guile-prometheus where
- ;; the metric sum will output in the
- ;; exact form including the /q
- (exact->inexact seconds-delayed))
- (log-delay "datastore read" seconds-delayed)
- (when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database read delayed by ~1,2f seconds~%"
- seconds-delayed))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 30)
- (format
- (current-error-port)
- "warning: database read took ~1,2f seconds (~a)~%"
- duration
- proc)))
- #:log-exception? worker-thread-log-exception?))
+ #:maybe-truncate-wal? #f)
+
+ (metric-increment writer-thread-destructor-counter)
+ (sqlite-close db)))
+ #:lifetime 500
+ #:expire-on-exception? #t
+
+ ;; SQLite doesn't support parallel writes
+ #:parallelism 1
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "datastore_write_delay_seconds")))
+ (lambda (seconds-delayed)
+ (metric-observe delay-metric
+ ;; TODO exact->inexact to work around
+ ;; a bug in guile-prometheus where
+ ;; the metric sum will output in the
+ ;; exact form including the /q
+ (exact->inexact seconds-delayed))
+ (log-delay "datastore write" seconds-delayed)
+ (when (> seconds-delayed 1)
+ (format
+ (current-error-port)
+ "warning: database write delayed by ~1,2f seconds~%"
+ seconds-delayed))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 10)
+ (format
+ (current-error-port)
+ "warning: database write took ~1,2f seconds (~a)~%"
+ duration
+ proc)))
+ #:log-exception? worker-thread-log-exception?)))
+
+ (slot-set! datastore
+ 'worker-writer-thread-channel
+ channel)
+ (slot-set! datastore
+ 'worker-writer-thread-proc-vector
+ proc-vector))
+
+ (let ((channel
+ proc-vector
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file #:write? #f)))
+ (sqlite-exec db "PRAGMA temp_store = MEMORY;")
+ (sqlite-exec db "PRAGMA busy_timeout = 0;")
+ (sqlite-exec db "PRAGMA cache_size = -16000;")
+
+ (list db)))
+ #:name "ds read"
+ #:destructor
+ (let ((reader-thread-destructor-counter
+ (make-gauge-metric metrics-registry
+ "datastore_reader_thread_close_total")))
+ (lambda (db)
+ (metric-increment reader-thread-destructor-counter)
+ (sqlite-close db)))
+ #:lifetime 50000
+ #:expire-on-exception? #t
+
+ ;; Use a minimum of 8 and a maximum of 16 threads
+ #:parallelism
+ (min (max (current-processor-count)
+ 8)
+ 16)
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "datastore_read_delay_seconds")))
+ (lambda (seconds-delayed)
+ (metric-observe delay-metric
+ ;; TODO exact->inexact to work around
+ ;; a bug in guile-prometheus where
+ ;; the metric sum will output in the
+ ;; exact form including the /q
+ (exact->inexact seconds-delayed))
+ (log-delay "datastore read" seconds-delayed)
+ (when (> seconds-delayed 1)
+ (format
+ (current-error-port)
+ "warning: database read delayed by ~1,2f seconds~%"
+ seconds-delayed))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 30)
+ (format
+ (current-error-port)
+ "warning: database read took ~1,2f seconds (~a)~%"
+ duration
+ proc)))
+ #:log-exception? worker-thread-log-exception?)))
+ (slot-set! datastore
+ 'worker-reader-thread-channel
+ channel)
+ (slot-set! datastore
+ 'worker-reader-thread-proc-vector
+ proc-vector))
datastore))
@@ -288,11 +295,6 @@
metrics-registry
checkpoint-duration-metric-name
(lambda ()
- (if (> (wal-size) extreme-wal-size-threshold)
- ;; Since the WAL is really getting too big, wait for much longer
- (sqlite-exec db "PRAGMA busy_timeout = 300000;")
- (sqlite-exec db "PRAGMA busy_timeout = 20;"))
-
(let* ((statement
(sqlite-prepare
db
@@ -314,8 +316,6 @@
modified-page-count
pages-moved-to-db)
#t))))))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
-
result)))
#t))
@@ -352,6 +352,17 @@ PRAGMA optimize;")
(spawn-fiber
(lambda ()
(while #t
+ (sleep 20)
+ (vector-for-each
+ (lambda (i proc)
+ (simple-format (current-error-port)
+ "reader thread ~A running: ~A\n"
+ i proc))
+ (slot-ref datastore 'worker-reader-thread-proc-vector)))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
(sleep (* 60 10)) ; 10 minutes
(with-exception-handler
(lambda (exn)
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 2c7307f..a064175 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -51,6 +51,8 @@
(name "unnamed"))
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the worker thread procedure."
+ (define thread-proc-vector
+ (make-vector parallelism #f))
(define (initializer/safe)
(let ((args
@@ -74,7 +76,7 @@ arguments of the worker thread procedure."
args
;; never give up, just keep retrying
(begin
- (sleep 5)
+ (sleep 1)
(initializer/safe)))))
(define (destructor/safe args)
@@ -100,10 +102,10 @@ arguments of the worker thread procedure."
(or success?
#t
(begin
- (sleep 5)
+ (sleep 1)
(destructor/safe args)))))
- (define (process channel args)
+ (define (process thread-index channel args)
(let loop ((current-lifetime lifetime))
(let ((exception?
(match (get-message channel)
@@ -124,6 +126,9 @@ arguments of the worker thread procedure."
internal-time-units-per-second)
exn))
(lambda ()
+ (vector-set! thread-proc-vector
+ thread-index
+ proc)
(with-throw-handler #t
(lambda ()
(call-with-values
@@ -149,6 +154,10 @@ arguments of the worker thread procedure."
(put-message reply
response)
+ (vector-set! thread-proc-vector
+ thread-index
+ #f)
+
(match response
(('worker-thread-error duration _)
(when duration-logger
@@ -188,7 +197,7 @@ arguments of the worker thread procedure."
"worker-thread-channel: exception: ~A\n" exn))
(lambda ()
(parameterize ((%worker-thread-args args))
- (process channel args)))
+ (process thread-index channel args)))
#:unwind? #t)
(when destructor
@@ -196,7 +205,9 @@ arguments of the worker thread procedure."
(init (initializer/safe))))))
(iota parallelism))
- channel))
+
+ (values channel
+ thread-proc-vector)))
(define &worker-thread-timeout
(make-exception-type '&worker-thread-timeout