diff options
-rw-r--r-- | guix-build-coordinator/agent-messaging/http/server.scm | 13 | ||||
-rw-r--r-- | guix-build-coordinator/client-communication.scm | 26 | ||||
-rw-r--r-- | guix-build-coordinator/coordinator.scm | 227 | ||||
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 259 | ||||
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 21 |
5 files changed, 299 insertions, 247 deletions
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm index 51ac6ac..2b325d7 100644 --- a/guix-build-coordinator/agent-messaging/http/server.scm +++ b/guix-build-coordinator/agent-messaging/http/server.scm @@ -986,12 +986,13 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." #:code 200 #:headers '((content-type . (text/plain)) (vary . (accept)))) - (lambda (port) - (write-metrics (build-coordinator-metrics-registry - build-coordinator) - port) - (write-metrics plain-metrics-registry - port)))))) + (call-with-output-string + (lambda (port) + (write-metrics (build-coordinator-metrics-registry + build-coordinator) + port) + (write-metrics plain-metrics-registry + port))))))) (_ (render-json "not-found" diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm index 354cb31..47a289d 100644 --- a/guix-build-coordinator/client-communication.scm +++ b/guix-build-coordinator/client-communication.scm @@ -32,6 +32,7 @@ #:use-module (json) #:use-module (logging logger) #:use-module (gcrypt random) + #:use-module (prometheus) #:use-module (web uri) #:use-module (web client) #:use-module (web request) @@ -100,6 +101,14 @@ (json-string->scm (utf8->string raw-body)) '())) + (define read-drv-error-count-metric + (or (metrics-registry-fetch-metric + (build-coordinator-metrics-registry build-coordinator) + "read_derivation_error_total") + (make-counter-metric + (build-coordinator-metrics-registry build-coordinator) + "read_derivation_error_total"))) + (define (controller-thunk) (match method-and-path-components (('GET "build" uuid) @@ -514,8 +523,11 @@ (retry-on-error (lambda () (read-drv/substitute derivation-file)) - #:times 5 - #:delay 5)) + #:times 2 + #:delay 3 + #:error-hook + (lambda _ + (metric-increment read-drv-error-count-metric)))) (lambda args (backtrace)))) #:unwind? #t)) @@ -533,6 +545,10 @@ body "ensure-all-related-derivation-outputs-have-builds") '(#:ensure-all-related-derivation-outputs-have-builds? #t) '()) + ,@(if (assoc-ref + body "skip-updating-derived-priorities") + '(#:skip-updating-derived-priorities? #t) + '()) ,@(if (assoc-ref body "tags") `(#:tags ,(map @@ -773,7 +789,8 @@ ensure-all-related-derivation-outputs-have-builds? tags #:key - defer-until) + defer-until + skip-updating-derived-priorities?) (send-request coordinator-uri 'POST "/builds" @@ -791,6 +808,9 @@ ,@(if ensure-all-related-derivation-outputs-have-builds? '((ensure-all-related-derivation-outputs-have-builds . #t)) '()) + ,@(if skip-updating-derived-priorities? + '((skip-updating-derived-priorities . #t)) + '()) ,@(if (null? tags) '() `((tags . ,(list->vector tags)))) diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 59891d1..a7fb664 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -436,11 +436,6 @@ ;; The logger assumes this (set-port-encoding! (current-output-port) "UTF-8") - ;; Work around my broken with-store/non-blocking in Guix - (let ((socket-file (%daemon-socket-uri))) - (%daemon-socket-uri - (string-append "file://" socket-file))) - (with-exception-handler (lambda (exn) (simple-format #t "failed enabling core dumps: ~A\n" exn)) @@ -510,16 +505,21 @@ (make-worker-thread-channel (const '()) #:name "utility" - #:parallelism 6 + #:parallelism 10 #:delay-logger - (lambda (seconds-delayed) - (log-delay "utility thread channel" - seconds-delayed) - (when (> seconds-delayed 0.1) - (format - (current-error-port) - "warning: utility thread channel delayed by ~1,2f seconds~%" - seconds-delayed)))))) + (let ((delay-metric + (make-histogram-metric + (build-coordinator-metrics-registry build-coordinator) + "utility_thread_pool_delay_seconds"))) + (lambda (seconds-delayed) + (log-delay "utility thread channel" + seconds-delayed) + (metric-observe delay-metric seconds-delayed) + (when (> seconds-delayed 0.1) + (format + (current-error-port) + "warning: utility thread channel delayed by ~1,2f seconds~%" + seconds-delayed))))))) (let ((finished? (make-condition))) (call-with-sigint @@ -612,6 +612,7 @@ (ignore-if-build-for-derivation-exists? #f) (ignore-if-build-for-outputs-exists? #f) (ensure-all-related-derivation-outputs-have-builds? #f) + skip-updating-derived-priorities? (tags '()) defer-until (read-drv read-derivation-from-file*)) @@ -624,39 +625,15 @@ #:include-canceled? #f) 0)) - (define (build-for-output-already-exists?) - ;; Handle the derivation not existing in the database here, so that adding - ;; it to the database isn't required for this code to work - (let* ((system-from-database (datastore-find-derivation-system datastore - derivation-file)) - - (derivation-exists-in-database? (not (eq? #f system-from-database))) - - (derivation - (if derivation-exists-in-database? - #f ; unnecessary to fetch derivation - ;; TODO Read the derivation in a separate thread - (with-fibers-port-timeouts - (lambda () - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file))) - #:timeout 240))) - - (system - (or system-from-database - (derivation-system derivation))) - - (outputs - (if derivation-exists-in-database? - (datastore-find-derivation-outputs datastore - derivation-file) - (map - (match-lambda - ((name . output) - `((name . ,name) - (output . ,(derivation-output-path output))))) - (derivation-outputs derivation))))) + (define (build-for-output-already-exists/with-derivation? derivation) + (let ((system (derivation-system derivation)) + (outputs + (map + (match-lambda + ((name . output) + `((name . ,name) + (output . ,(derivation-output-path output))))) + (derivation-outputs derivation)))) (any (lambda (output-details) (let ((builds-for-output @@ -668,13 +645,40 @@ (not (null? builds-for-output)))) outputs))) - (define (check-whether-to-store-build) + (define (build-for-output-already-exists?) + (let ((system (datastore-find-derivation-system datastore + derivation-file))) + (if (eq? #f system) ; derivation does not exist in database? + (build-for-output-already-exists/with-derivation? + (with-fibers-port-timeouts + (lambda () + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))) + #:timeout 240)) + (any + (lambda (output-details) + (let ((builds-for-output + (datastore-list-builds-for-output-and-system + datastore + (assq-ref output-details 'output) + system + #:include-canceled? #f))) + (not (null? builds-for-output)))) + (datastore-find-derivation-outputs datastore + derivation-file))))) + + (define* (check-whether-to-store-build #:optional derivation) (cond ((and ignore-if-build-for-derivation-exists? (build-for-derivation-exists?)) '((no-build-submitted . build-already-exists-for-this-derivation))) ((and ignore-if-build-for-outputs-exists? - (call-with-delay-logging build-for-output-already-exists?)) + (if derivation + (call-with-delay-logging + build-for-output-already-exists/with-derivation? + #:args (list derivation)) + (call-with-delay-logging build-for-output-already-exists?))) '((no-build-submitted . build-already-exists-for-a-output))) (else 'continue))) @@ -704,18 +708,20 @@ (or requested-uuid (random-v4-uuid))) - (define (build-perform-datastore-changes derivations-lacking-builds) + (define (build-perform-datastore-changes derivation derivations-lacking-builds) (lambda (_) ;; Check again now, since new builds could have been added since the ;; checks were made before the start of the transaction. - (match (check-whether-to-store-build) + (match (check-whether-to-store-build derivation) ('continue ;; Actually create a build, do this first so the derived priorities ;; for the builds inserted below are informed by this build. (store-build derivation-file build-id priority - tags) + tags + #:skip-updating-other-build-derived-priorities + skip-updating-derived-priorities?) (for-each (match-lambda @@ -768,65 +774,68 @@ (match (check-whether-to-store-build) ('continue - ;; Store the derivation first, so that listing related derivations - ;; with no builds works - (unless (datastore-find-derivation datastore derivation-file) - (datastore-store-derivation - datastore - ;; TODO Read the derivation in a separate thread - (with-fibers-port-timeouts - (lambda () - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file))) - #:timeout 30))) - - (let ((related-derivations-lacking-builds - (if ensure-all-related-derivation-outputs-have-builds? - (datastore-list-related-derivations-with-no-build-for-outputs + (let ((drv + ;; If the dervation is missing from the database, read it and + ;; enter it in to the database, so that listing related + ;; derivations with no builds works + (if (datastore-find-derivation datastore derivation-file) + #f + (with-fibers-port-timeouts + (lambda () + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))) + #:timeout 30)))) + (when drv + (datastore-store-derivation datastore drv)) + + (let ((related-derivations-lacking-builds + (if ensure-all-related-derivation-outputs-have-builds? + (datastore-list-related-derivations-with-no-build-for-outputs + datastore + derivation-file) + '()))) + (match (datastore-call-with-transaction datastore - derivation-file) - '()))) - (match (datastore-call-with-transaction - datastore - (build-perform-datastore-changes - ;; Do this here so it doesn't take time in the writer thread - (map - (lambda (drv) - ;; Generate the UUID's outside the transaction to save - ;; time too. - (cons drv (random-v4-uuid))) - related-derivations-lacking-builds)) - #:duration-metric-name - "store_build") - (#t ; build submitted - (build-coordinator-prompt-hook-processing-for-event - build-coordinator - 'build-submitted) - - (build-coordinator-send-event - build-coordinator - 'build-submitted - `((id . ,build-id) - (derivation . ,derivation-file) - (priority . ,priority) - (tags - . ,(list->vector + (build-perform-datastore-changes + drv + ;; Do this here so it doesn't take time in the writer thread (map - (match-lambda - ((key . value) - `((key . ,key) - (value . ,value)))) - (if (vector? tags) - (vector->list tags) - tags)))) - (defer_until . ,defer-until))) - - (trigger-build-allocation build-coordinator) - - `((build-submitted . ,build-id))) - (stop-condition - stop-condition)))) + (lambda (related-drv) + ;; Generate the UUID's outside the transaction to save + ;; time too. + (cons related-drv (random-v4-uuid))) + related-derivations-lacking-builds)) + #:duration-metric-name + "store_build") + (#t ; build submitted + (build-coordinator-prompt-hook-processing-for-event + build-coordinator + 'build-submitted) + + (build-coordinator-send-event + build-coordinator + 'build-submitted + `((id . ,build-id) + (derivation . ,derivation-file) + (priority . ,priority) + (tags + . ,(list->vector + (map + (match-lambda + ((key . value) + `((key . ,key) + (value . ,value)))) + (if (vector? tags) + (vector->list tags) + tags)))) + (defer_until . ,defer-until))) + + (trigger-build-allocation build-coordinator) + + `((build-submitted . ,build-id))) + (stop-condition + stop-condition))))) (stop-condition stop-condition))))) diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index c3b62b2..ac04362 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -107,7 +107,9 @@ (define-class <sqlite-datastore> (<abstract-datastore>) database-file worker-reader-thread-channel + worker-reader-thread-proc-vector worker-writer-thread-channel + worker-writer-thread-proc-vector metrics-registry) (define* (sqlite-datastore database-uri @@ -137,124 +139,129 @@ (slot-set! datastore 'database-file database-file) (slot-set! datastore 'metrics-registry metrics-registry) - (slot-set! - datastore - 'worker-writer-thread-channel - (make-worker-thread-channel - (lambda () - (let ((db - (db-open database-file))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - (sqlite-exec db "PRAGMA synchronous = NORMAL;") - (sqlite-exec db "PRAGMA temp_store = MEMORY;") - (sqlite-exec db "PRAGMA foreign_keys = ON;") - - (list db))) - #:name "ds write" - #:destructor - (let ((writer-thread-destructor-counter - (make-gauge-metric metrics-registry - "datastore_writer_thread_close_total"))) - (lambda (db) - (db-optimize db - database-file - metrics-registry - #:maybe-truncate-wal? #f) - - (metric-increment writer-thread-destructor-counter) - (sqlite-close db))) - #:lifetime 500 - #:expire-on-exception? #t - - ;; SQLite doesn't support parallel writes - #:parallelism 1 - #:delay-logger (let ((delay-metric - (make-histogram-metric - metrics-registry - "datastore_write_delay_seconds"))) - (lambda (seconds-delayed) - (metric-observe delay-metric - ;; TODO exact->inexact to work around - ;; a bug in guile-prometheus where - ;; the metric sum will output in the - ;; exact form including the /q - (exact->inexact seconds-delayed)) - (log-delay "datastore write" seconds-delayed) - (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database write delayed by ~1,2f seconds~%" - seconds-delayed)))) - #:duration-logger - (lambda (duration proc) - (when (> duration 10) - (format - (current-error-port) - "warning: database write took ~1,2f seconds (~a)~%" - duration - proc))) - #:log-exception? worker-thread-log-exception?)) - - ;; Make sure the worker thread has initialised, and created the in memory - ;; tables - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) - (const #t)) - - (slot-set! - datastore - 'worker-reader-thread-channel - (make-worker-thread-channel - (lambda () - (let ((db - (db-open database-file #:write? #f))) - (sqlite-exec db "PRAGMA temp_store = MEMORY;") - (sqlite-exec db "PRAGMA busy_timeout = 4000;") - (sqlite-exec db "PRAGMA cache_size = -16000;") - - (list db))) - #:name "ds read" - #:destructor - (let ((reader-thread-destructor-counter - (make-gauge-metric metrics-registry - "datastore_reader_thread_close_total"))) - (lambda (db) - (metric-increment reader-thread-destructor-counter) - (sqlite-close db))) - #:lifetime 50000 - #:expire-on-exception? #t - - ;; Use a minimum of 8 and a maximum of 16 threads - #:parallelism - (min (max (current-processor-count) - 8) - 16) - #:delay-logger (let ((delay-metric - (make-histogram-metric + (let ((channel + proc-vector + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file))) + (sqlite-exec db "PRAGMA busy_timeout = 0;") + (sqlite-exec db "PRAGMA synchronous = NORMAL;") + (sqlite-exec db "PRAGMA temp_store = MEMORY;") + (sqlite-exec db "PRAGMA foreign_keys = ON;") + + (list db))) + #:name "ds write" + #:destructor + (let ((writer-thread-destructor-counter + (make-gauge-metric metrics-registry + "datastore_writer_thread_close_total"))) + (lambda (db) + (db-optimize db + database-file metrics-registry - "datastore_read_delay_seconds"))) - (lambda (seconds-delayed) - (metric-observe delay-metric - ;; TODO exact->inexact to work around - ;; a bug in guile-prometheus where - ;; the metric sum will output in the - ;; exact form including the /q - (exact->inexact seconds-delayed)) - (log-delay "datastore read" seconds-delayed) - (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database read delayed by ~1,2f seconds~%" - seconds-delayed)))) - #:duration-logger - (lambda (duration proc) - (when (> duration 30) - (format - (current-error-port) - "warning: database read took ~1,2f seconds (~a)~%" - duration - proc))) - #:log-exception? worker-thread-log-exception?)) + #:maybe-truncate-wal? #f) + + (metric-increment writer-thread-destructor-counter) + (sqlite-close db))) + #:lifetime 500 + #:expire-on-exception? #t + + ;; SQLite doesn't support parallel writes + #:parallelism 1 + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "datastore_write_delay_seconds"))) + (lambda (seconds-delayed) + (metric-observe delay-metric + ;; TODO exact->inexact to work around + ;; a bug in guile-prometheus where + ;; the metric sum will output in the + ;; exact form including the /q + (exact->inexact seconds-delayed)) + (log-delay "datastore write" seconds-delayed) + (when (> seconds-delayed 1) + (format + (current-error-port) + "warning: database write delayed by ~1,2f seconds~%" + seconds-delayed)))) + #:duration-logger + (lambda (duration proc) + (when (> duration 10) + (format + (current-error-port) + "warning: database write took ~1,2f seconds (~a)~%" + duration + proc))) + #:log-exception? worker-thread-log-exception?))) + + (slot-set! datastore + 'worker-writer-thread-channel + channel) + (slot-set! datastore + 'worker-writer-thread-proc-vector + proc-vector)) + + (let ((channel + proc-vector + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file #:write? #f))) + (sqlite-exec db "PRAGMA temp_store = MEMORY;") + (sqlite-exec db "PRAGMA busy_timeout = 0;") + (sqlite-exec db "PRAGMA cache_size = -16000;") + + (list db))) + #:name "ds read" + #:destructor + (let ((reader-thread-destructor-counter + (make-gauge-metric metrics-registry + "datastore_reader_thread_close_total"))) + (lambda (db) + (metric-increment reader-thread-destructor-counter) + (sqlite-close db))) + #:lifetime 50000 + #:expire-on-exception? #t + + ;; Use a minimum of 8 and a maximum of 16 threads + #:parallelism + (min (max (current-processor-count) + 8) + 16) + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "datastore_read_delay_seconds"))) + (lambda (seconds-delayed) + (metric-observe delay-metric + ;; TODO exact->inexact to work around + ;; a bug in guile-prometheus where + ;; the metric sum will output in the + ;; exact form including the /q + (exact->inexact seconds-delayed)) + (log-delay "datastore read" seconds-delayed) + (when (> seconds-delayed 1) + (format + (current-error-port) + "warning: database read delayed by ~1,2f seconds~%" + seconds-delayed)))) + #:duration-logger + (lambda (duration proc) + (when (> duration 30) + (format + (current-error-port) + "warning: database read took ~1,2f seconds (~a)~%" + duration + proc))) + #:log-exception? worker-thread-log-exception?))) + (slot-set! datastore + 'worker-reader-thread-channel + channel) + (slot-set! datastore + 'worker-reader-thread-proc-vector + proc-vector)) datastore)) @@ -288,11 +295,6 @@ metrics-registry checkpoint-duration-metric-name (lambda () - (if (> (wal-size) extreme-wal-size-threshold) - ;; Since the WAL is really getting too big, wait for much longer - (sqlite-exec db "PRAGMA busy_timeout = 300000;") - (sqlite-exec db "PRAGMA busy_timeout = 20;")) - (let* ((statement (sqlite-prepare db @@ -314,8 +316,6 @@ modified-page-count pages-moved-to-db) #t)))))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - result))) #t)) @@ -352,6 +352,17 @@ PRAGMA optimize;") (spawn-fiber (lambda () (while #t + (sleep 20) + (vector-for-each + (lambda (i proc) + (simple-format (current-error-port) + "reader thread ~A running: ~A\n" + i proc)) + (slot-ref datastore 'worker-reader-thread-proc-vector))))) + + (spawn-fiber + (lambda () + (while #t (sleep (* 60 10)) ; 10 minutes (with-exception-handler (lambda (exn) diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index 2c7307f..a064175 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -51,6 +51,8 @@ (name "unnamed")) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the worker thread procedure." + (define thread-proc-vector + (make-vector parallelism #f)) (define (initializer/safe) (let ((args @@ -74,7 +76,7 @@ arguments of the worker thread procedure." args ;; never give up, just keep retrying (begin - (sleep 5) + (sleep 1) (initializer/safe))))) (define (destructor/safe args) @@ -100,10 +102,10 @@ arguments of the worker thread procedure." (or success? #t (begin - (sleep 5) + (sleep 1) (destructor/safe args))))) - (define (process channel args) + (define (process thread-index channel args) (let loop ((current-lifetime lifetime)) (let ((exception? (match (get-message channel) @@ -124,6 +126,9 @@ arguments of the worker thread procedure." internal-time-units-per-second) exn)) (lambda () + (vector-set! thread-proc-vector + thread-index + proc) (with-throw-handler #t (lambda () (call-with-values @@ -149,6 +154,10 @@ arguments of the worker thread procedure." (put-message reply response) + (vector-set! thread-proc-vector + thread-index + #f) + (match response (('worker-thread-error duration _) (when duration-logger @@ -188,7 +197,7 @@ arguments of the worker thread procedure." "worker-thread-channel: exception: ~A\n" exn)) (lambda () (parameterize ((%worker-thread-args args)) - (process channel args))) + (process thread-index channel args))) #:unwind? #t) (when destructor @@ -196,7 +205,9 @@ arguments of the worker thread procedure." (init (initializer/safe)))))) (iota parallelism)) - channel)) + + (values channel + thread-proc-vector))) (define &worker-thread-timeout (make-exception-type '&worker-thread-timeout |