diff options
Diffstat (limited to 'guix-build-coordinator')
-rw-r--r-- | guix-build-coordinator/agent-messaging/http/server.scm | 68 | ||||
-rw-r--r-- | guix-build-coordinator/agent.scm | 15 | ||||
-rw-r--r-- | guix-build-coordinator/build-allocator.scm | 69 | ||||
-rw-r--r-- | guix-build-coordinator/client-communication.scm | 74 | ||||
-rw-r--r-- | guix-build-coordinator/coordinator.scm | 603 | ||||
-rw-r--r-- | guix-build-coordinator/datastore.scm | 8 | ||||
-rw-r--r-- | guix-build-coordinator/datastore/abstract.scm | 5 | ||||
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 542 | ||||
-rw-r--r-- | guix-build-coordinator/hooks.scm | 166 | ||||
-rw-r--r-- | guix-build-coordinator/utils.scm | 11 | ||||
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 31 |
11 files changed, 865 insertions, 727 deletions
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm index 85ba0d4..2b325d7 100644 --- a/guix-build-coordinator/agent-messaging/http/server.scm +++ b/guix-build-coordinator/agent-messaging/http/server.scm @@ -135,7 +135,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (define (http-agent-messaging-start-server port host secret-key-base build-coordinator - chunked-request-channel output-hash-channel) (define plain-metrics-registry (make-metrics-registry)) @@ -183,7 +182,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." body secret-key-base build-coordinator - chunked-request-channel output-hash-channel update-managed-metrics! plain-metrics-registry))) @@ -614,7 +612,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." body secret-key-base build-coordinator - chunked-request-channel output-hash-channel update-managed-metrics! plain-metrics-registry) @@ -794,18 +791,9 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." body))) (call-with-output-file tmp-output-file-name (lambda (output-port) - ;; Older agents may still attempt to use chunked encoding - ;; for this request - (if (member '(chunked) (request-transfer-encoding request)) - (call-with-worker-thread - chunked-request-channel - (lambda () - (dump-port body-port - output-port - (request-content-length request)))) - (dump-port body-port - output-port - (request-content-length request)))))) + (dump-port body-port + output-port + (request-content-length request))))) (rename-file tmp-output-file-name output-file-name) (no-content)) @@ -852,20 +840,12 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." "deleting " tmp-output-file-name) (delete-file tmp-output-file-name)) - (if (member '(chunked) (request-transfer-encoding request)) - ;; Older agents may use chunked encoding for this request - (call-with-worker-thread - chunked-request-channel - (lambda () - (receive-file body - #f - tmp-output-file-name))) - (let ((content-length - (request-content-length request))) - (when (> content-length 0) - (receive-file body - content-length - tmp-output-file-name)))) + (let ((content-length + (request-content-length request))) + (when (> content-length 0) + (receive-file body + content-length + tmp-output-file-name))) (if (file-exists? output-file-name) (render-json @@ -945,19 +925,10 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." "deleting " output-file-name) (delete-file output-file-name)) - (if (member '(chunked) (request-transfer-encoding request)) - ;; Older agents may use chunked encoding for this request - (call-with-worker-thread - chunked-request-channel - (lambda () - (receive-file body - #f - tmp-output-file-name - #:append? #t))) - (receive-file body - (request-content-length request) - tmp-output-file-name - #:append? #t))) + (receive-file body + content-length + tmp-output-file-name + #:append? #t)) (if (file-exists? output-file-name) (render-json @@ -1015,12 +986,13 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." #:code 200 #:headers '((content-type . (text/plain)) (vary . (accept)))) - (lambda (port) - (write-metrics (build-coordinator-metrics-registry - build-coordinator) - port) - (write-metrics plain-metrics-registry - port)))))) + (call-with-output-string + (lambda (port) + (write-metrics (build-coordinator-metrics-registry + build-coordinator) + port) + (write-metrics plain-metrics-registry + port))))))) (_ (render-json "not-found" diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index a64a61c..7afd549 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -77,7 +77,8 @@ derivation-substitute-urls non-derivation-substitute-urls metrics-file - max-1min-load-average) + max-1min-load-average + timestamp-log-output?) (define lgr (make <logger>)) (define port-log (make <port-log> #:port (current-output-port) @@ -85,9 +86,11 @@ ;; In guile-lib v0.2.8 onwards, the formatter is ;; called with more arguments (lambda args ; lvl, time, str - (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime - (second args))) + (format #f "~a(~5a): ~a~%" + (if timestamp-log-output? + (strftime "%F %H:%M:%S " (localtime + (second args))) + "") (first args) (third args))))) @@ -642,7 +645,7 @@ unable to query substitute servers without caching")) (has-substiutes-no-cache? non-derivation-substitute-urls file)) - #:timeout (* 60 1000))) + #:timeout 60)) #:times 20 #:delay (random 15)) #f))) @@ -704,7 +707,7 @@ but the guix-daemon claims it's unavailable" ((current-build-output-port log-port)) (build-things fetch-substitute-store missing-paths))) - #:timeout (* 60 10 1000))) + #:timeout (* 60 10))) (lambda (key . args) (log-msg lgr 'ERROR "exception when fetching missing paths " diff --git a/guix-build-coordinator/build-allocator.scm b/guix-build-coordinator/build-allocator.scm index f73c457..588f213 100644 --- a/guix-build-coordinator/build-allocator.scm +++ b/guix-build-coordinator/build-allocator.scm @@ -352,20 +352,13 @@ #:label-values `((system . ,system))))) counts))) - ;; Go through the setup failures and look specifically at the - ;; missing_inputs ones. Eliminate any missing_inputs failures if all the - ;; missing inputs appear to have been built successfully, and update the + ;; Go through the setup failures and update the ;; derived-build-priorities-hash to reflect the priorities of builds based ;; on the builds that would be "unblocked" if they were completed. (for-each (lambda (setup-failure-build-id) (let ((setup-failures - (filter - (lambda (setup-failure) - (string=? "missing_inputs" - (assq-ref setup-failure 'failure-reason))) - (hash-ref setup-failures-hash - setup-failure-build-id))) + (hash-ref setup-failures-hash setup-failure-build-id)) (setup-failure-build-derived-priority (hash-ref derived-build-priorities-hash setup-failure-build-id))) @@ -425,31 +418,28 @@ setup-failures-hash)) (let ((result - (append-map + (map (lambda (agent-id) (log "considering builds for" agent-id) (let ((builds-sorted-by-derived-priority (sort-list (filter (filter-builds-for-agent agent-id) builds) (build-sorting-function-for-agent agent-id)))) - (if (null? builds-sorted-by-derived-priority) - '() - (let ((final-ordered-builds - (concatenate - (map sort-priority-sublist - (limit-processed-sublists - (break-builds-in-to-priority-sublists - builds-sorted-by-derived-priority)))))) - (let ((builds-for-agent - (limit-planned-builds final-ordered-builds))) - (map (lambda (build-id ordering) - (list build-id - agent-id - ordering)) - (map (lambda (build) - (assq-ref build 'uuid)) - builds-for-agent) - (iota (length builds-for-agent)))))))) + (cons + agent-id + (if (null? builds-sorted-by-derived-priority) + '() + (let ((final-ordered-builds + (concatenate + (map sort-priority-sublist + (limit-processed-sublists + (break-builds-in-to-priority-sublists + builds-sorted-by-derived-priority)))))) + (let ((builds-for-agent + (limit-planned-builds final-ordered-builds))) + (map (lambda (build) + (assq-ref build 'uuid)) + builds-for-agent))))))) (map (lambda (agent) (assq-ref agent 'uuid)) agents)))) @@ -538,7 +528,10 @@ (assq-ref setup-failure 'failure-reason))) (cond ((string=? failure-reason "missing_inputs") - #f) + ;; This problem might go away, but just don't try the same agent + ;; again for now. + (string=? (assq-ref setup-failure 'agent-id) + agent-id)) ((string=? failure-reason "could_not_delete_outputs") ;; This problem might go away, but just don't try the same agent ;; again for now. @@ -588,7 +581,7 @@ counts))) (let ((result - (append-map + (map (lambda (agent-id) (log "considering builds for" agent-id) (let* ((filter-proc @@ -600,7 +593,8 @@ (if (or (and planned-builds-for-agent-limit (>= count planned-builds-for-agent-limit)) (null? potential-build-ids)) - build-ids ;; highest priority last + (reverse build-ids) ;; highest priority last, so + ;; reverse (let ((potential-build (first potential-build-ids))) (if (filter-proc potential-build) (loop (+ 1 count) @@ -610,18 +604,7 @@ (loop count build-ids (cdr potential-build-ids)))))))) - (if (null? build-ids) - '() - (let ((build-ids-count - (length build-ids))) - (map (lambda (build-id ordering) - (list build-id - agent-id - ordering)) - build-ids - (iota build-ids-count - build-ids-count - -1)))))) + (cons agent-id build-ids))) (map (lambda (agent) (assq-ref agent 'uuid)) agents)))) diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm index 46535e4..47a289d 100644 --- a/guix-build-coordinator/client-communication.scm +++ b/guix-build-coordinator/client-communication.scm @@ -32,6 +32,7 @@ #:use-module (json) #:use-module (logging logger) #:use-module (gcrypt random) + #:use-module (prometheus) #:use-module (web uri) #:use-module (web client) #:use-module (web request) @@ -65,7 +66,8 @@ (define (start-client-request-server secret-key-base host port - build-coordinator) + build-coordinator + utility-thread-pool-channel) (run-server/patched (lambda (request body) (log-msg (build-coordinator-logger build-coordinator) @@ -80,7 +82,8 @@ (uri-path (request-uri request)))) body secret-key-base - build-coordinator))) + build-coordinator + utility-thread-pool-channel))) #:host host #:port port)) @@ -88,7 +91,8 @@ method-and-path-components raw-body secret-key-base - build-coordinator) + build-coordinator + utility-thread-pool-channel) (define datastore (build-coordinator-datastore build-coordinator)) @@ -97,6 +101,14 @@ (json-string->scm (utf8->string raw-body)) '())) + (define read-drv-error-count-metric + (or (metrics-registry-fetch-metric + (build-coordinator-metrics-registry build-coordinator) + "read_derivation_error_total") + (make-counter-metric + (build-coordinator-metrics-registry build-coordinator) + "read_derivation_error_total"))) + (define (controller-thunk) (match method-and-path-components (('GET "build" uuid) @@ -292,8 +304,8 @@ (render-json `((build_allocation_plan . ,(list->vector - (datastore-list-allocation-plan-builds - datastore + (build-coordinator-list-allocation-plan-builds + build-coordinator agent-id))))))) (('POST "agent" agent-id "passwords") (let ((password (new-agent-password @@ -461,13 +473,33 @@ (simple-format #f "derivation must be a string: ~A\n" derivation)))) + (unless (derivation-path? derivation-file) + (raise-exception + (make-exception-with-message + "invalid derivation path"))) + + (string-for-each + (lambda (c) + (unless (or (char-alphabetic? c) + (char-numeric? c) + (member c '(#\+ #\- #\. #\_ #\? #\=))) + (raise-exception + (make-exception-with-message + (simple-format #f "invalid character in derivation name: ~A" + c))))) + (store-path-base derivation-file)) + (define (read-drv/substitute derivation-file) (with-store/non-blocking store (unless (valid-path? store derivation-file) (substitute-derivation store derivation-file #:substitute-urls substitute-urls))) - (read-derivation-from-file* derivation-file)) + ;; Read the derivation in a thread to avoid blocking fibers + (call-with-worker-thread + utility-thread-pool-channel + (lambda () + (read-derivation-from-file* derivation-file)))) (let ((submit-build-result (call-with-delay-logging @@ -484,17 +516,18 @@ 'WARN "exception substituting derivation " derivation-file ": " exn) - - (if (null? (or substitute-urls '())) - ;; Try again - (read-drv/substitute derivation-file) - (read-derivation-through-substitutes - derivation-file - substitute-urls))) + (raise-exception exn)) (lambda () (with-throw-handler #t (lambda () - (read-drv/substitute derivation-file)) + (retry-on-error + (lambda () + (read-drv/substitute derivation-file)) + #:times 2 + #:delay 3 + #:error-hook + (lambda _ + (metric-increment read-drv-error-count-metric)))) (lambda args (backtrace)))) #:unwind? #t)) @@ -512,6 +545,10 @@ body "ensure-all-related-derivation-outputs-have-builds") '(#:ensure-all-related-derivation-outputs-have-builds? #t) '()) + ,@(if (assoc-ref + body "skip-updating-derived-priorities") + '(#:skip-updating-derived-priorities? #t) + '()) ,@(if (assoc-ref body "tags") `(#:tags ,(map @@ -535,8 +572,7 @@ datastore (lambda (_) (let ((allocation-plan-counts - (datastore-count-build-allocation-plan-entries - datastore))) + (build-coordinator-allocation-plan-stats build-coordinator))) `((state_id . ,(build-coordinator-get-state-id build-coordinator)) (agents . ,(list->vector (map @@ -753,7 +789,8 @@ ensure-all-related-derivation-outputs-have-builds? tags #:key - defer-until) + defer-until + skip-updating-derived-priorities?) (send-request coordinator-uri 'POST "/builds" @@ -771,6 +808,9 @@ ,@(if ensure-all-related-derivation-outputs-have-builds? '((ensure-all-related-derivation-outputs-have-builds . #t)) '()) + ,@(if skip-updating-derived-priorities? + '((skip-updating-derived-priorities . #t)) + '()) ,@(if (null? tags) '() `((tags . ,(list->vector tags)))) diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 34b05c9..a7fb664 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -25,6 +25,7 @@ #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) + #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 vlist) @@ -98,6 +99,10 @@ build-coordinator-prompt-hook-processing-for-event start-hook-processing-threads + build-coordinator-allocation-plan-stats + build-coordinator-trigger-build-allocation + build-coordinator-list-allocation-plan-builds + build-output-file-location build-log-file-destination build-log-file-location @@ -117,7 +122,8 @@ (define-record-type <build-coordinator> (make-build-coordinator-record datastore hooks metrics-registry - allocation-strategy logger) + allocation-strategy allocator-channel + logger) build-coordinator? (datastore build-coordinator-datastore) (hooks build-coordinator-hooks) @@ -125,8 +131,10 @@ set-build-coordinator-hook-condvars!) (metrics-registry build-coordinator-metrics-registry) (allocation-strategy build-coordinator-allocation-strategy) - (allocator-thread build-coordinator-allocator-thread - set-build-coordinator-allocator-thread!) + (trigger-build-allocation + build-coordinator-trigger-build-allocation + set-build-coordinator-trigger-build-allocation!) + (allocator-channel build-coordinator-allocator-channel) (logger build-coordinator-logger) (events-channel build-coordinator-events-channel set-build-coordinator-events-channel!) @@ -364,7 +372,8 @@ (not (client-error? exn)))))) hooks (allocation-strategy - basic-build-allocation-strategy)) + basic-build-allocation-strategy) + (timestamp-log-output? #t)) (and (or (list? hooks) (begin (simple-format @@ -398,9 +407,11 @@ ;; In guile-lib v0.2.8 onwards, the formatter is ;; called with more arguments (lambda args ; lvl, time, str - (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime - (second args))) + (format #f "~a(~5a): ~a~%" + (if timestamp-log-output? + (strftime "%F %H:%M:%S " (localtime + (second args))) + "") (first args) (third args))))) (build-coordinator @@ -408,6 +419,7 @@ hooks metrics-registry allocation-strategy + (make-channel) lgr))) (add-handler! lgr port-log) @@ -424,11 +436,6 @@ ;; The logger assumes this (set-port-encoding! (current-output-port) "UTF-8") - ;; Work around my broken with-store/non-blocking in Guix - (let ((socket-file (%daemon-socket-uri))) - (%daemon-socket-uri - (string-append "file://" socket-file))) - (with-exception-handler (lambda (exn) (simple-format #t "failed enabling core dumps: ~A\n" exn)) @@ -458,7 +465,7 @@ (when update-datastore? (datastore-update (build-coordinator-datastore build-coordinator))) - (set-build-coordinator-allocator-thread! + (set-build-coordinator-trigger-build-allocation! build-coordinator (make-build-allocator-thread build-coordinator)) @@ -490,31 +497,29 @@ ;; Create some worker thread channels, which need to be created prior ;; to run-fibers being called. - (let ((chunked-request-channel - ;; There are fibers issues when trying to read the chunked - ;; requests, so do this in dedicated threads. + (let ((output-hash-channel + (make-output-hash-channel + build-coordinator)) + + (utility-thread-pool-channel (make-worker-thread-channel (const '()) - #:name "chunked request" - #:parallelism 16 - #:log-exception? - (lambda (exn) - (not - (chunked-input-ended-prematurely-error? - exn))) + #:name "utility" + #:parallelism 10 #:delay-logger - (lambda (seconds-delayed) - (log-delay "chunked request channel" - seconds-delayed) - (when (> seconds-delayed 0.1) - (format - (current-error-port) - "warning: chunked request channel delayed by ~1,2f seconds~%" - seconds-delayed))))) - - (output-hash-channel - (make-output-hash-channel - build-coordinator))) + (let ((delay-metric + (make-histogram-metric + (build-coordinator-metrics-registry build-coordinator) + "utility_thread_pool_delay_seconds"))) + (lambda (seconds-delayed) + (log-delay "utility thread channel" + seconds-delayed) + (metric-observe delay-metric seconds-delayed) + (when (> seconds-delayed 0.1) + (format + (current-error-port) + "warning: utility thread channel delayed by ~1,2f seconds~%" + seconds-delayed))))))) (let ((finished? (make-condition))) (call-with-sigint @@ -550,6 +555,8 @@ (spawn-fiber-to-watch-for-deferred-builds build-coordinator) + (spawn-build-allocation-plan-management-fiber build-coordinator) + (set-build-coordinator-scheduler! build-coordinator (current-scheduler)) @@ -575,7 +582,6 @@ host secret-key-base build-coordinator - chunked-request-channel output-hash-channel) (log-msg 'INFO "listening on " host ":" port)))) @@ -584,7 +590,8 @@ secret-key-base (uri-host client-communication-uri) (uri-port client-communication-uri) - build-coordinator) + build-coordinator + utility-thread-pool-channel) ;; Guile seems to just stop listening on ports, so try to ;; monitor that internally and just quit if it happens @@ -605,6 +612,7 @@ (ignore-if-build-for-derivation-exists? #f) (ignore-if-build-for-outputs-exists? #f) (ensure-all-related-derivation-outputs-have-builds? #f) + skip-updating-derived-priorities? (tags '()) defer-until (read-drv read-derivation-from-file*)) @@ -617,45 +625,15 @@ #:include-canceled? #f) 0)) - (define (build-for-output-already-exists?) - ;; Handle the derivation not existing in the database here, so that adding - ;; it to the database isn't required for this code to work - (let* ((system-from-database (datastore-find-derivation-system datastore - derivation-file)) - - (derivation-exists-in-database? (not (eq? #f system-from-database))) - - (derivation - (if derivation-exists-in-database? - #f ; unnecessary to fetch derivation - ;; Bit of a hack, but offload reading the derivation to a - ;; thread so that it doesn't block the fibers thread, since - ;; local I/O doesn't cooperate with fibers - (datastore-call-with-transaction - datastore - (lambda _ - (with-fibers-port-timeouts - (lambda () - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file))) - #:timeout 240)) - #:readonly? #t))) - - (system - (or system-from-database - (derivation-system derivation))) - - (outputs - (if derivation-exists-in-database? - (datastore-find-derivation-outputs datastore - derivation-file) - (map - (match-lambda - ((name . output) - `((name . ,name) - (output . ,(derivation-output-path output))))) - (derivation-outputs derivation))))) + (define (build-for-output-already-exists/with-derivation? derivation) + (let ((system (derivation-system derivation)) + (outputs + (map + (match-lambda + ((name . output) + `((name . ,name) + (output . ,(derivation-output-path output))))) + (derivation-outputs derivation)))) (any (lambda (output-details) (let ((builds-for-output @@ -667,13 +645,40 @@ (not (null? builds-for-output)))) outputs))) - (define (check-whether-to-store-build) + (define (build-for-output-already-exists?) + (let ((system (datastore-find-derivation-system datastore + derivation-file))) + (if (eq? #f system) ; derivation does not exist in database? + (build-for-output-already-exists/with-derivation? + (with-fibers-port-timeouts + (lambda () + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))) + #:timeout 240)) + (any + (lambda (output-details) + (let ((builds-for-output + (datastore-list-builds-for-output-and-system + datastore + (assq-ref output-details 'output) + system + #:include-canceled? #f))) + (not (null? builds-for-output)))) + (datastore-find-derivation-outputs datastore + derivation-file))))) + + (define* (check-whether-to-store-build #:optional derivation) (cond ((and ignore-if-build-for-derivation-exists? (build-for-derivation-exists?)) '((no-build-submitted . build-already-exists-for-this-derivation))) ((and ignore-if-build-for-outputs-exists? - (call-with-delay-logging build-for-output-already-exists?)) + (if derivation + (call-with-delay-logging + build-for-output-already-exists/with-derivation? + #:args (list derivation)) + (call-with-delay-logging build-for-output-already-exists?))) '((no-build-submitted . build-already-exists-for-a-output))) (else 'continue))) @@ -703,18 +708,20 @@ (or requested-uuid (random-v4-uuid))) - (define (build-perform-datastore-changes derivations-lacking-builds) + (define (build-perform-datastore-changes derivation derivations-lacking-builds) (lambda (_) ;; Check again now, since new builds could have been added since the ;; checks were made before the start of the transaction. - (match (check-whether-to-store-build) + (match (check-whether-to-store-build derivation) ('continue ;; Actually create a build, do this first so the derived priorities ;; for the builds inserted below are informed by this build. (store-build derivation-file build-id priority - tags) + tags + #:skip-updating-other-build-derived-priorities + skip-updating-derived-priorities?) (for-each (match-lambda @@ -752,66 +759,83 @@ (build-coordinator-metrics-registry build-coordinator) "coordinator_submit_build_duration_seconds" (lambda () + (unless (derivation-path? derivation-file) + (raise-exception + (make-client-error 'invalid-derivation-file))) + + (string-for-each + (lambda (c) + (unless (or (char-alphabetic? c) + (char-numeric? c) + (member c '(#\+ #\- #\. #\_ #\? #\=))) + (raise-exception + (make-client-error 'invalid-character-in-derivation-file)))) + (store-path-base derivation-file)) + (match (check-whether-to-store-build) ('continue - ;; Store the derivation first, so that listing related derivations - ;; with no builds works - (unless (datastore-find-derivation datastore derivation-file) - (datastore-store-derivation - datastore - (with-fibers-port-timeouts - (lambda () - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file))) - #:timeout 30))) + (let ((drv + ;; If the dervation is missing from the database, read it and + ;; enter it in to the database, so that listing related + ;; derivations with no builds works + (if (datastore-find-derivation datastore derivation-file) + #f + (with-fibers-port-timeouts + (lambda () + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))) + #:timeout 30)))) + (when drv + (datastore-store-derivation datastore drv)) - (let ((related-derivations-lacking-builds - (if ensure-all-related-derivation-outputs-have-builds? - (datastore-list-related-derivations-with-no-build-for-outputs + (let ((related-derivations-lacking-builds + (if ensure-all-related-derivation-outputs-have-builds? + (datastore-list-related-derivations-with-no-build-for-outputs + datastore + derivation-file) + '()))) + (match (datastore-call-with-transaction datastore - derivation-file) - '()))) - (match (datastore-call-with-transaction - datastore - (build-perform-datastore-changes - ;; Do this here so it doesn't take time in the writer thread - (map - (lambda (drv) - ;; Generate the UUID's outside the transaction to save - ;; time too. - (cons drv (random-v4-uuid))) - related-derivations-lacking-builds)) - #:duration-metric-name - "store_build") - (#t ; build submitted - (build-coordinator-prompt-hook-processing-for-event - build-coordinator - 'build-submitted) - - (build-coordinator-send-event - build-coordinator - 'build-submitted - `((id . ,build-id) - (derivation . ,derivation-file) - (priority . ,priority) - (tags - . ,(list->vector + (build-perform-datastore-changes + drv + ;; Do this here so it doesn't take time in the writer thread (map - (match-lambda - ((key . value) - `((key . ,key) - (value . ,value)))) - (if (vector? tags) - (vector->list tags) - tags)))) - (defer_until . ,defer-until))) - - (trigger-build-allocation build-coordinator) - - `((build-submitted . ,build-id))) - (stop-condition - stop-condition)))) + (lambda (related-drv) + ;; Generate the UUID's outside the transaction to save + ;; time too. + (cons related-drv (random-v4-uuid))) + related-derivations-lacking-builds)) + #:duration-metric-name + "store_build") + (#t ; build submitted + (build-coordinator-prompt-hook-processing-for-event + build-coordinator + 'build-submitted) + + (build-coordinator-send-event + build-coordinator + 'build-submitted + `((id . ,build-id) + (derivation . ,derivation-file) + (priority . ,priority) + (tags + . ,(list->vector + (map + (match-lambda + ((key . value) + `((key . ,key) + (value . ,value)))) + (if (vector? tags) + (vector->list tags) + tags)))) + (defer_until . ,defer-until))) + + (trigger-build-allocation build-coordinator) + + `((build-submitted . ,build-id))) + (stop-condition + stop-condition))))) (stop-condition stop-condition))))) @@ -841,7 +865,9 @@ (make-transaction-rollback-exception 'skipped-as-build-required-by-another))) - (datastore-remove-build-from-allocation-plan datastore uuid) + (build-coordinator-remove-build-from-allocation-plan + build-coordinator + uuid) (datastore-cancel-build datastore uuid) (datastore-insert-unprocessed-hook-event datastore "build-canceled" @@ -979,7 +1005,7 @@ (processor_count . ,processor-count)))) (define (trigger-build-allocation build-coordinator) - ((build-coordinator-allocator-thread build-coordinator))) + ((build-coordinator-trigger-build-allocation build-coordinator))) (define (build-coordinator-prompt-hook-processing-for-event build-coordinator event-name) @@ -1000,16 +1026,20 @@ (allocator-proc datastore #:metrics-registry (build-coordinator-metrics-registry build-coordinator))))) - (datastore-replace-build-allocation-plan datastore new-plan) - - (let ((build-count-per-agent - (datastore-count-build-allocation-plan-entries - datastore))) - (build-coordinator-send-event - build-coordinator - "allocation-plan-update" - `((allocation_plan_counts . ,build-count-per-agent))))) - #t) + (build-coordinator-replace-build-allocation-plan + build-coordinator + new-plan) + + (build-coordinator-send-event + build-coordinator + "allocation-plan-update" + `((allocation_plan_counts + . ,(map + (match-lambda + ((agent-id . builds) + (cons agent-id (length builds)))) + new-plan)))) + #t)) (define (make-build-allocator-thread build-coordinator) (define mtx (make-mutex)) @@ -1081,31 +1111,213 @@ exn) (exit 1)) (lambda () - (let ((build-allocation-plan-total - (make-gauge-metric - (build-coordinator-metrics-registry build-coordinator) - "build_allocation_plan_total" - #:labels '(agent_id)))) - (with-time-logging - "allocator initialise metrics" - (for-each (match-lambda - ((agent-id . count) - (metric-set build-allocation-plan-total - count - #:label-values - `((agent_id . ,agent-id))))) - (datastore-count-build-allocation-plan-entries - (build-coordinator-datastore build-coordinator))))) - (update-build-allocation-plan-loop))))) trigger-build-allocation) +(define (spawn-build-allocation-plan-management-fiber coordinator) + (define allocation-plan '()) + + (define allocation-plan-metric + (make-gauge-metric + (build-coordinator-metrics-registry coordinator) + "build_allocation_plan_total" + #:labels '(agent_id))) + + (define (update-build-allocation-plan-metrics!) + (for-each + (match-lambda + ((agent-id . builds) + (metric-set allocation-plan-metric + (length builds) + #:label-values + `((agent_id . ,agent-id))))) + allocation-plan) + #t) + + (spawn-fiber + (lambda () + (while #t + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "exception in allocation plan fiber: ~A\n" + exn)) + (lambda () + (with-throw-handler #t + (lambda () + (match (get-message (build-coordinator-allocator-channel coordinator)) + (('stats reply) + (put-message + reply + (map + (match-lambda + ((agent-id . builds) + (cons agent-id (length builds)))) + allocation-plan))) + (('fetch-agent-plan agent-id reply) + (put-message + reply + (or (list-copy (assoc-ref allocation-plan agent-id)) + '()))) + (('fetch-plan reply) + (put-message reply + (map (match-lambda + ((agent-id . builds) + (cons agent-id + (list-copy builds)))) + allocation-plan))) + (('remove-build uuid reply) + (set! + allocation-plan + (map + (match-lambda + ((agent-id . builds) + (cons agent-id + (remove + (lambda (build-uuid) + (string=? build-uuid uuid)) + builds)))) + allocation-plan)) + + (put-message reply #t)) + (('replace new-plan reply) + + (set! allocation-plan new-plan) + + (update-build-allocation-plan-metrics!) + + (put-message reply #t)))) + (lambda _ + (backtrace)))) + #:unwind? #t))))) + +(define (build-coordinator-allocation-plan-stats coordinator) + (let ((reply (make-channel))) + (put-message (build-coordinator-allocator-channel coordinator) + (list 'stats reply)) + (get-message reply))) + +(define (build-coordinator-fetch-agent-allocation-plan coordinator agent-id) + (let ((reply (make-channel))) + (put-message (build-coordinator-allocator-channel coordinator) + (list 'fetch-agent-plan agent-id reply)) + (get-message reply))) + +(define (build-coordinator-allocation-plan coordinator) + (let ((reply (make-channel))) + (put-message (build-coordinator-allocator-channel coordinator) + (list 'fetch-plan reply)) + (get-message reply))) + +(define (build-coordinator-build-in-allocation-plan? coordinator uuid) + (any + (match-lambda + ((agent-id . build-uuids) + (->bool (member uuid build-uuids string=?)))) + (build-coordinator-allocation-plan coordinator))) + +(define (build-coordinator-remove-build-from-allocation-plan coordinator uuid) + (let ((reply (make-channel))) + (put-message (build-coordinator-allocator-channel coordinator) + (list 'remove-build uuid reply)) + (get-message reply))) + +(define (build-coordinator-replace-build-allocation-plan coordinator plan) + (let ((reply (make-channel))) + (put-message (build-coordinator-allocator-channel coordinator) + (list 'replace plan reply)) + (get-message reply))) + +(define (build-coordinator-fetch-build-to-allocate coordinator + agent-id) + (define datastore + (build-coordinator-datastore coordinator)) + + (let ((all-planned-builds + (build-coordinator-fetch-agent-allocation-plan coordinator + agent-id))) + + (log-msg (build-coordinator-logger coordinator) + 'DEBUG + "fetching build to allocate to " agent-id ", " + (length all-planned-builds) " to consider") + + (let loop ((planned-builds all-planned-builds)) + (if (null? planned-builds) + #f + (match (datastore-fetch-build-to-allocate datastore + (first planned-builds)) + (#f + (log-msg (build-coordinator-logger coordinator) + 'DEBUG + agent-id ": ignoring " (first planned-builds)) + (loop (cdr planned-builds))) + (#(uuid derivation-id derivation-name derived_priority) + + (if (datastore-check-if-derivation-conflicts? + datastore + agent-id + derivation-id) + (begin + (log-msg (build-coordinator-logger coordinator) + 'DEBUG + agent-id ": " uuid " conflicts with allocated build") + (loop (cdr planned-builds))) + (begin + (log-msg (build-coordinator-logger coordinator) + 'DEBUG + agent-id ": " uuid " selected") + + `((uuid . ,uuid) + (derivation_name . ,derivation-name) + (derived_priority . ,derived_priority)))))))))) + +(define* (build-coordinator-list-allocation-plan-builds coordinator + agent-id + #:key limit) + (define (take* lst i) + (if (< (length lst) i) + lst + (take lst i))) + + (define datastore + (build-coordinator-datastore coordinator)) + + (let ((build-ids + (build-coordinator-fetch-agent-allocation-plan coordinator + agent-id))) + (filter-map + (lambda (build-id) + (match (datastore-fetch-build-to-allocate datastore build-id) + (#(uuid derivation_id derivation_name derived_priority) + (let ((build-details (datastore-find-build datastore uuid))) + `((uuid . ,uuid) + (derivation_name . ,derivation_name) + (system . ,(datastore-find-build-derivation-system + datastore + uuid)) + (priority . ,(assq-ref build-details 'priority)) + (derived_priority . ,derived_priority) + (tags . ,(vector-map + (lambda (_ tag) + (match tag + ((key . value) + `((key . ,key) + (value . ,value))))) + (datastore-fetch-build-tags + datastore + uuid)))))) + (#f #f))) + (if limit + (take* build-ids limit) + build-ids)))) + (define (spawn-fiber-to-watch-for-deferred-builds coordinator) (spawn-fiber (lambda () + (sleep 10) (while #t - (sleep 60) ; 1 minute (with-exception-handler (lambda (exn) (simple-format (current-error-port) @@ -1113,14 +1325,21 @@ exn)) (lambda () (let ((first-deferred-build - (datastore-find-first-unallocated-deferred-build - (build-coordinator-datastore coordinator)))) - (when (and first-deferred-build - (time<=? (date->time-utc - (assq-ref first-deferred-build 'deferred-until)) - (current-time))) - (simple-format #t "guix-build-coordinator: triggering build allocation for deferred build: ~A\n" (assq-ref first-deferred-build 'uuid)) - (trigger-build-allocation coordinator)))) + (datastore-find-deferred-build + (build-coordinator-datastore coordinator) + (lambda (build-details) + (build-coordinator-build-in-allocation-plan? + coordinator + (assq-ref build-details 'uuid)))))) + (if (and first-deferred-build + (time<=? (date->time-utc + (assq-ref first-deferred-build 'deferred-until)) + (current-time))) + (begin + (simple-format #t "guix-build-coordinator: triggering build allocation for deferred build: ~A\n" (assq-ref first-deferred-build 'uuid)) + (trigger-build-allocation coordinator) + (sleep 10)) + (sleep 60)))) #:unwind? #t))) #:parallel? #t)) @@ -1268,12 +1487,18 @@ (datastore-list-unprocessed-hook-events datastore event-name - (+ 1 (length in-progress-ids))))) - (find - (match-lambda - ((id rest ...) - (not (member id in-progress-ids)))) - potential-jobs))) + (+ 1 (length in-progress-ids)))) + (job + (find + (match-lambda + ((id rest ...) + (not (member id in-progress-ids)))) + potential-jobs))) + (log-msg + (build-coordinator-logger build-coordinator) + 'DEBUG + event-name " work queue, got job " job) + job)) (lambda (id event arguments) (process-event id event arguments handler)) #:name (symbol->string event-name)))) @@ -1299,11 +1524,13 @@ (build-coordinator-datastore build-coordinator)) (define (allocate-one-build agent-id) - (let ((build-details (datastore-fetch-build-to-allocate datastore agent-id))) + (let ((build-details (build-coordinator-fetch-build-to-allocate + build-coordinator agent-id))) (if build-details (let ((build-id (assq-ref build-details 'uuid))) (datastore-insert-to-allocated-builds datastore agent-id (list build-id)) - (datastore-remove-builds-from-plan datastore (list build-id)) + (build-coordinator-remove-build-from-allocation-plan + build-coordinator build-id) build-details) #f))) @@ -1335,20 +1562,6 @@ (let ((new-builds (allocate-several-builds agent (- target-count start-count)))) - (unless (null? new-builds) - (let ((allocation-plan-metric - (metrics-registry-fetch-metric - (slot-ref datastore 'metrics-registry) - "build_allocation_plan_total"))) - (for-each - (match-lambda - ((agent-id . count) - (metric-set allocation-plan-metric - count - #:label-values - `((agent_id . ,agent-id))))) - (datastore-count-build-allocation-plan-entries datastore)))) - ;; Previously allocate builds just returned newly allocated ;; builds, but if max-builds is provided, return all the ;; builds. This means the agent can handle this in a idempotent diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm index a29a993..dc4fec6 100644 --- a/guix-build-coordinator/datastore.scm +++ b/guix-build-coordinator/datastore.scm @@ -68,7 +68,7 @@ (re-export datastore-count-builds-for-derivation) (re-export datastore-list-processed-builds) (re-export datastore-list-unprocessed-builds) -(re-export datastore-find-first-unallocated-deferred-build) +(re-export datastore-find-deferred-build) (re-export datastore-fetch-prioritised-unprocessed-builds) (re-export datastore-insert-unprocessed-hook-event) (re-export datastore-count-unprocessed-hook-events) @@ -86,16 +86,12 @@ (re-export datastore-list-builds-for-output) (re-export datastore-list-builds-for-output-and-system) (re-export datastore-agent-for-build) -(re-export datastore-count-build-allocation-plan-entries) -(re-export datastore-replace-build-allocation-plan) -(re-export datastore-remove-build-from-allocation-plan) (re-export datastore-count-allocated-builds) (re-export datastore-agent-requested-systems) (re-export datastore-update-agent-requested-systems) (re-export datastore-fetch-build-to-allocate) +(re-export datastore-check-if-derivation-conflicts?) (re-export datastore-insert-to-allocated-builds) -(re-export datastore-remove-builds-from-plan) -(re-export datastore-list-allocation-plan-builds) (define* (database-uri->datastore database #:key diff --git a/guix-build-coordinator/datastore/abstract.scm b/guix-build-coordinator/datastore/abstract.scm index 68fc654..799485e 100644 --- a/guix-build-coordinator/datastore/abstract.scm +++ b/guix-build-coordinator/datastore/abstract.scm @@ -9,8 +9,7 @@ datastore-new-agent datastore-new-agent-password datastore-list-agent-builds - datastore-agent-password-exists? - datastore-list-allocation-plan-builds)) + datastore-agent-password-exists?)) (define-class <abstract-datastore> ()) @@ -24,5 +23,3 @@ (define-generic datastore-agent-password-exists?) (define-generic datastore-agent-list-unprocessed-builds) (define-generic datastore-list-agent-builds) -(define-generic datastore-agent-replace-build-allocation-plan) -(define-generic datastore-list-allocation-plan-builds) diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index 96751a5..ac04362 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -87,7 +87,7 @@ datastore-replace-agent-tags datastore-list-processed-builds datastore-list-unprocessed-builds - datastore-find-first-unallocated-deferred-build + datastore-find-deferred-build datastore-fetch-prioritised-unprocessed-builds datastore-insert-unprocessed-hook-event datastore-count-unprocessed-hook-events @@ -96,21 +96,20 @@ datastore-delete-unprocessed-hook-event datastore-list-agent-builds datastore-agent-for-build - datastore-count-build-allocation-plan-entries datastore-replace-build-allocation-plan - datastore-remove-build-from-allocation-plan datastore-count-allocated-builds datastore-agent-requested-systems datastore-update-agent-requested-systems datastore-fetch-build-to-allocate - datastore-insert-to-allocated-builds - datastore-remove-builds-from-plan - datastore-list-allocation-plan-builds)) + datastore-check-if-derivation-conflicts? + datastore-insert-to-allocated-builds)) (define-class <sqlite-datastore> (<abstract-datastore>) database-file worker-reader-thread-channel + worker-reader-thread-proc-vector worker-writer-thread-channel + worker-writer-thread-proc-vector metrics-registry) (define* (sqlite-datastore database-uri @@ -140,138 +139,129 @@ (slot-set! datastore 'database-file database-file) (slot-set! datastore 'metrics-registry metrics-registry) - (slot-set! - datastore - 'worker-writer-thread-channel - (make-worker-thread-channel - (lambda () - (let ((db - (db-open database-file))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - (sqlite-exec db "PRAGMA synchronous = NORMAL;") - (sqlite-exec db "PRAGMA temp_store = MEMORY;") - (sqlite-exec db "PRAGMA foreign_keys = ON;") - - (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;") - - (sqlite-exec - db - " -CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( - build_id INTEGER NOT NULL, - agent_id TEXT NOT NULL, - ordering INTEGER NOT NULL, - PRIMARY KEY (agent_id, build_id) -);") - - (list db))) - #:name "ds write" - #:destructor - (let ((writer-thread-destructor-counter - (make-gauge-metric metrics-registry - "datastore_writer_thread_close_total"))) - (lambda (db) - (db-optimize db - database-file - metrics-registry - #:maybe-truncate-wal? #f) - - (metric-increment writer-thread-destructor-counter) - (sqlite-close db))) - #:lifetime 500 - #:expire-on-exception? #t - - ;; SQLite doesn't support parallel writes - #:parallelism 1 - #:delay-logger (let ((delay-metric - (make-histogram-metric - metrics-registry - "datastore_write_delay_seconds"))) - (lambda (seconds-delayed) - (metric-observe delay-metric - ;; TODO exact->inexact to work around - ;; a bug in guile-prometheus where - ;; the metric sum will output in the - ;; exact form including the /q - (exact->inexact seconds-delayed)) - (log-delay "datastore write" seconds-delayed) - (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database write delayed by ~1,2f seconds~%" - seconds-delayed)))) - #:duration-logger - (lambda (duration proc) - (when (> duration 10) - (format - (current-error-port) - "warning: database write took ~1,2f seconds (~a)~%" - duration - proc))) - #:log-exception? worker-thread-log-exception?)) - - ;; Make sure the worker thread has initialised, and created the in memory - ;; tables - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) - (const #t)) - - (slot-set! - datastore - 'worker-reader-thread-channel - (make-worker-thread-channel - (lambda () - (let ((db - (db-open database-file #:write? #f))) - (sqlite-exec db "PRAGMA temp_store = MEMORY;") - (sqlite-exec db "PRAGMA busy_timeout = 4000;") - (sqlite-exec db "PRAGMA cache_size = -16000;") - - (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;") - - (list db))) - #:name "ds read" - #:destructor - (let ((reader-thread-destructor-counter - (make-gauge-metric metrics-registry - "datastore_reader_thread_close_total"))) - (lambda (db) - (metric-increment reader-thread-destructor-counter) - (sqlite-close db))) - #:lifetime 50000 - #:expire-on-exception? #t - - ;; Use a minimum of 8 and a maximum of 16 threads - #:parallelism - (min (max (current-processor-count) - 8) - 16) - #:delay-logger (let ((delay-metric - (make-histogram-metric + (let ((channel + proc-vector + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file))) + (sqlite-exec db "PRAGMA busy_timeout = 0;") + (sqlite-exec db "PRAGMA synchronous = NORMAL;") + (sqlite-exec db "PRAGMA temp_store = MEMORY;") + (sqlite-exec db "PRAGMA foreign_keys = ON;") + + (list db))) + #:name "ds write" + #:destructor + (let ((writer-thread-destructor-counter + (make-gauge-metric metrics-registry + "datastore_writer_thread_close_total"))) + (lambda (db) + (db-optimize db + database-file metrics-registry - "datastore_read_delay_seconds"))) - (lambda (seconds-delayed) - (metric-observe delay-metric - ;; TODO exact->inexact to work around - ;; a bug in guile-prometheus where - ;; the metric sum will output in the - ;; exact form including the /q - (exact->inexact seconds-delayed)) - (log-delay "datastore read" seconds-delayed) - (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database read delayed by ~1,2f seconds~%" - seconds-delayed)))) - #:duration-logger - (lambda (duration proc) - (when (> duration 30) - (format - (current-error-port) - "warning: database read took ~1,2f seconds (~a)~%" - duration - proc))) - #:log-exception? worker-thread-log-exception?)) + #:maybe-truncate-wal? #f) + + (metric-increment writer-thread-destructor-counter) + (sqlite-close db))) + #:lifetime 500 + #:expire-on-exception? #t + + ;; SQLite doesn't support parallel writes + #:parallelism 1 + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "datastore_write_delay_seconds"))) + (lambda (seconds-delayed) + (metric-observe delay-metric + ;; TODO exact->inexact to work around + ;; a bug in guile-prometheus where + ;; the metric sum will output in the + ;; exact form including the /q + (exact->inexact seconds-delayed)) + (log-delay "datastore write" seconds-delayed) + (when (> seconds-delayed 1) + (format + (current-error-port) + "warning: database write delayed by ~1,2f seconds~%" + seconds-delayed)))) + #:duration-logger + (lambda (duration proc) + (when (> duration 10) + (format + (current-error-port) + "warning: database write took ~1,2f seconds (~a)~%" + duration + proc))) + #:log-exception? worker-thread-log-exception?))) + + (slot-set! datastore + 'worker-writer-thread-channel + channel) + (slot-set! datastore + 'worker-writer-thread-proc-vector + proc-vector)) + + (let ((channel + proc-vector + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file #:write? #f))) + (sqlite-exec db "PRAGMA temp_store = MEMORY;") + (sqlite-exec db "PRAGMA busy_timeout = 0;") + (sqlite-exec db "PRAGMA cache_size = -16000;") + + (list db))) + #:name "ds read" + #:destructor + (let ((reader-thread-destructor-counter + (make-gauge-metric metrics-registry + "datastore_reader_thread_close_total"))) + (lambda (db) + (metric-increment reader-thread-destructor-counter) + (sqlite-close db))) + #:lifetime 50000 + #:expire-on-exception? #t + + ;; Use a minimum of 8 and a maximum of 16 threads + #:parallelism + (min (max (current-processor-count) + 8) + 16) + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "datastore_read_delay_seconds"))) + (lambda (seconds-delayed) + (metric-observe delay-metric + ;; TODO exact->inexact to work around + ;; a bug in guile-prometheus where + ;; the metric sum will output in the + ;; exact form including the /q + (exact->inexact seconds-delayed)) + (log-delay "datastore read" seconds-delayed) + (when (> seconds-delayed 1) + (format + (current-error-port) + "warning: database read delayed by ~1,2f seconds~%" + seconds-delayed)))) + #:duration-logger + (lambda (duration proc) + (when (> duration 30) + (format + (current-error-port) + "warning: database read took ~1,2f seconds (~a)~%" + duration + proc))) + #:log-exception? worker-thread-log-exception?))) + (slot-set! datastore + 'worker-reader-thread-channel + channel) + (slot-set! datastore + 'worker-reader-thread-proc-vector + proc-vector)) datastore)) @@ -305,11 +295,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( metrics-registry checkpoint-duration-metric-name (lambda () - (if (> (wal-size) extreme-wal-size-threshold) - ;; Since the WAL is really getting too big, wait for much longer - (sqlite-exec db "PRAGMA busy_timeout = 300000;") - (sqlite-exec db "PRAGMA busy_timeout = 20;")) - (let* ((statement (sqlite-prepare db @@ -331,8 +316,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( modified-page-count pages-moved-to-db) #t)))))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - result))) #t)) @@ -369,6 +352,17 @@ PRAGMA optimize;") (spawn-fiber (lambda () (while #t + (sleep 20) + (vector-for-each + (lambda (i proc) + (simple-format (current-error-port) + "reader thread ~A running: ~A\n" + i proc)) + (slot-ref datastore 'worker-reader-thread-proc-vector))))) + + (spawn-fiber + (lambda () + (while #t (sleep (* 60 10)) ; 10 minutes (with-exception-handler (lambda (exn) @@ -1742,42 +1736,6 @@ WHERE build_id = :build_id" #t) rest)) -(define-method (datastore-remove-build-from-allocation-plan - (datastore <sqlite-datastore>) - uuid) - (define (update-build-allocation-plan-metrics) - (let ((allocation-plan-metric - (metrics-registry-fetch-metric - (slot-ref datastore 'metrics-registry) - "build_allocation_plan_total"))) - (for-each - (match-lambda - ((agent-id . count) - (metric-set allocation-plan-metric - count - #:label-values - `((agent_id . ,agent-id))))) - (datastore-count-build-allocation-plan-entries datastore)))) - - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (let ((statement (sqlite-prepare - db - " -DELETE FROM build_allocation_plan WHERE build_id = :build_id" - #:cache? #t))) - - (sqlite-bind-arguments - statement - #:build_id (db-find-build-id db uuid)) - - (sqlite-step-and-reset statement) - - (unless (= 0 (changes-count db)) - (update-build-allocation-plan-metrics))))) - #t) - (define-method (datastore-count-build-results (datastore <sqlite-datastore>)) (call-with-worker-thread @@ -3024,8 +2982,9 @@ ORDER BY priority DESC" builds))))) -(define-method (datastore-find-first-unallocated-deferred-build - (datastore <sqlite-datastore>)) +(define-method (datastore-find-deferred-build + (datastore <sqlite-datastore>) + select?) (call-with-worker-thread (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) @@ -3040,25 +2999,29 @@ INNER JOIN derivations WHERE processed = 0 AND canceled = 0 AND deferred_until IS NOT NULL - AND builds.id NOT IN (SELECT build_id FROM build_allocation_plan) -ORDER BY deferred_until ASC -LIMIT 1" +ORDER BY deferred_until ASC" #:cache? #t))) - (match (sqlite-step-and-reset statement) - (#(uuid derivation_name priority created_at deferred_until) - `((uuid . ,uuid) - (derivation-name . ,derivation_name) - (priority . ,priority) - (created-at . ,(if (string? created_at) - (string->date created_at - "~Y-~m-~d ~H:~M:~S") - #f)) - (deferred-until . ,(if (string? deferred_until) - (string->date deferred_until - "~Y-~m-~d ~H:~M:~S") - #f)))) - (#f #f)))))) + (let loop ((row (sqlite-step statement))) + (match row + (#(uuid derivation_name priority created_at deferred_until) + (let ((res + (select? + `((uuid . ,uuid) + (derivation-name . ,derivation_name) + (priority . ,priority) + (created-at . ,(if (string? created_at) + (string->date created_at + "~Y-~m-~d ~H:~M:~S") + #f)) + (deferred-until . ,(if (string? deferred_until) + (string->date deferred_until + "~Y-~m-~d ~H:~M:~S") + #f)))))) + (if res + res + (loop (sqlite-step statement))))) + (#f #f))))))) (define-method (datastore-fetch-prioritised-unprocessed-builds (datastore <sqlite-datastore>)) @@ -3230,103 +3193,6 @@ DELETE FROM unprocessed_hook_events WHERE id = :id" (sqlite-step-and-reset statement)))) #t) -(define-method (datastore-count-build-allocation-plan-entries - (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (let ((statement - (sqlite-prepare - db - " -SELECT agent_id, COUNT(*) -FROM build_allocation_plan -GROUP BY agent_id" - #:cache? #t))) - - (let ((result - (sqlite-map - (match-lambda - (#(agent_id count) - (cons agent_id count))) - statement))) - (sqlite-reset statement) - - result))))) - -(define-method (datastore-replace-build-allocation-plan - (datastore <sqlite-datastore>) - planned-builds) - (define (clear-current-plan db) - (sqlite-exec - db - "DELETE FROM build_allocation_plan")) - - (define insert-sql - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (string-append - " -INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES " - (string-join - (map (match-lambda - ((build-uuid agent-id ordering) - (simple-format - #f - "('~A', '~A', ~A)" - (db-find-build-id db build-uuid) - agent-id - ordering))) - planned-builds) - ", ") - ";")))) - - (define (insert-new-plan db planned-builds) - (sqlite-exec - db - insert-sql)) - - (datastore-call-with-transaction - datastore - (lambda (db) - (clear-current-plan db) - (unless (null? planned-builds) - (insert-new-plan db planned-builds))) - #:duration-metric-name "replace_build_allocation_plan") - - (let* ((agent-ids - (map (lambda (agent-details) - (assq-ref agent-details 'uuid)) - (datastore-list-agents datastore))) - (counts-by-agent - (make-vector (length agent-ids) 0))) - (for-each - (match-lambda - ((_ agent-id _) - (let ((index (list-index (lambda (list-agent-id) - (string=? agent-id list-agent-id)) - agent-ids))) - (vector-set! counts-by-agent - index - (+ (vector-ref counts-by-agent - index) - 1))))) - planned-builds) - - (let ((metric - (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) - "build_allocation_plan_total"))) - (for-each - (lambda (index agent-id) - (metric-set metric - (vector-ref counts-by-agent index) - #:label-values - `((agent_id . ,agent-id)))) - (iota (length agent-ids)) - agent-ids))) - #t) - (define-method (datastore-count-allocated-builds (datastore <sqlite-datastore>)) (call-with-worker-thread/delay-logging @@ -3426,34 +3292,40 @@ INSERT INTO build_allocation_agent_requested_systems (agent_id, system_id) VALUE (define-method (datastore-fetch-build-to-allocate (datastore <sqlite-datastore>) - agent-id) - (datastore-call-with-transaction - datastore + build-id) + (call-with-worker-thread + (slot-ref datastore 'worker-reader-thread-channel) (lambda (db) (let ((statement (sqlite-prepare db - ;; This needs to guard against the plan being out of date " SELECT builds.uuid, derivations.id, derivations.name, unprocessed_builds_with_derived_priorities.derived_priority FROM builds -INNER JOIN build_allocation_plan - ON builds.id = build_allocation_plan.build_id INNER JOIN derivations ON builds.derivation_id = derivations.id -INNER JOIN build_allocation_agent_requested_systems - ON build_allocation_agent_requested_systems.agent_id = :agent_id - AND build_allocation_agent_requested_systems.system_id = derivations.system_id LEFT JOIN unprocessed_builds_with_derived_priorities ON unprocessed_builds_with_derived_priorities.build_id = builds.id -WHERE build_allocation_plan.agent_id = :agent_id +WHERE builds.uuid = :uuid AND builds.processed = 0 AND builds.canceled = 0 - AND builds.id NOT IN (SELECT build_id FROM allocated_builds) -ORDER BY build_allocation_plan.ordering ASC" - #:cache? #t)) - (output-conflicts-statement + AND builds.id NOT IN (SELECT build_id FROM allocated_builds)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:uuid build-id) + + (sqlite-step-and-reset statement))))) + +(define-method (datastore-check-if-derivation-conflicts? + (datastore <sqlite-datastore>) + agent-id + derivation-id) + (call-with-worker-thread + (slot-ref datastore 'worker-reader-thread-channel) + (lambda (db) + (let ((statement (sqlite-prepare db " @@ -3471,34 +3343,11 @@ WHERE build_derivation_outputs.derivation_id = :derivation_id allocated_builds_derivation_outputs.output_id" #:cache? #t))) - (define (get-build-to-allocate) - (let loop ((build-details (sqlite-step statement))) - (match build-details - (#f #f) - (#(uuid derivation-id derivation-name derived_priority) - - (sqlite-bind-arguments output-conflicts-statement - #:agent_id agent-id - #:derivation_id derivation-id) - - (if (eq? (sqlite-step-and-reset output-conflicts-statement) - #f) - `((uuid . ,uuid) - (derivation_name . ,derivation-name) - (derived_priority . ,derived_priority)) - (loop (sqlite-step statement))))))) - - (sqlite-bind-arguments - statement - #:agent_id agent-id) - - (let ((result (get-build-to-allocate))) - (sqlite-reset statement) - - result))) + (sqlite-bind-arguments statement + #:agent_id agent-id + #:derivation_id derivation-id) - #:readonly? #t - #:duration-metric-name "fetch_builds_to_allocate")) + (->bool (sqlite-step-and-reset statement)))))) (define-method (datastore-insert-to-allocated-builds (datastore <sqlite-datastore>) @@ -3523,25 +3372,6 @@ INSERT INTO allocated_builds (build_id, agent_id) VALUES " ", ") ";"))))) -(define-method (datastore-remove-builds-from-plan - (datastore <sqlite-datastore>) - build-uuids) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (sqlite-exec - db - (string-append - " -DELETE FROM build_allocation_plan -WHERE build_id IN (" - (string-join - (map (lambda (build-uuid) - (number->string (db-find-build-id db build-uuid))) - build-uuids) - ", ") - ")"))))) - (define-method (datastore-list-allocation-plan-builds (datastore <sqlite-datastore>) . diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm index bc055c2..030f310 100644 --- a/guix-build-coordinator/hooks.scm +++ b/guix-build-coordinator/hooks.scm @@ -32,8 +32,9 @@ #:use-module (guix config) #:use-module (guix derivations) #:use-module (guix serialization) - #:use-module ((guix utils) #:select (default-keyword-arguments)) - #:use-module (guix build utils) + #:use-module ((guix utils) #:select (default-keyword-arguments + call-with-decompressed-port)) + #:use-module ((guix build utils) #:select (dump-port mkdir-p)) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator datastore) @@ -81,6 +82,20 @@ build-id agent-id) (current-error-port)))) +(define* (default-nar-compressions #:key output-filename nar-size + source-compression source-size) + (define MiB (* (expt 2 20) 1.)) + + (if (eq? 'none source-compression) + ;; If the agent didn't compress the nar, don't change that here + (list 'none) + (let* ((compression-proportion + (/ source-size nar-size))) + (if (or (> compression-proportion 0.95) + (< nar-size (* 0.05 MiB))) + '(none) + (list source-compression))))) + (define* (build-success-publish-hook publish-directory #:key @@ -94,7 +109,8 @@ post-publish-hook combined-post-publish-hook (publish-referenced-derivation-source-files? #t) - derivation-substitute-urls) + derivation-substitute-urls + (nar-compressions-proc default-nar-compressions)) (mkdir-p (string-append publish-directory "/nar/lzip")) (lambda (build-coordinator build-id) @@ -218,8 +234,7 @@ nar-filename)) #:unwind? #t)) - (cons narinfo-filename - nar-filename))))) + narinfo-filename)))) referenced-source-files)))))) (define (process-output drv-name output) @@ -230,12 +245,6 @@ (nar-location (build-output-file-location datastore build-id output-name)) - (nar-filename - (string-append "nar/lzip/" - (basename output-filename))) - (nar-destination - (string-append publish-directory "/" - nar-filename)) (narinfo-filename (string-append (string-take (basename output-filename) 32) ".narinfo")) @@ -246,8 +255,78 @@ (if (skip-publishing-proc narinfo-filename narinfo-directory) #f - (begin - (copy-file nar-location nar-destination) + (let ((compressions + (nar-compressions-proc + #:output-filename output-filename + #:nar-size (assq-ref output 'size) + ;; TODO Don't hardcode this + #:source-compression 'lzip + #:source-size (stat:size (stat nar-location #f))))) + + (for-each + (lambda (compression) + (if (or (and (pair? compression) + (eq? (car compression) 'lzip)) + (eq? compression 'lzip)) + ;; TODO If the agents start uploading uncompressed files + ;; or files compressed differently, this might not be + ;; right + (let ((nar-destination + (string-append publish-directory "/" + "nar/lzip/" + (basename output-filename)))) + (copy-file nar-location nar-destination)) + (let* ((target-compression + (if (pair? compression) + (car compression) + compression)) + ;; TODO This logic should sit elsewhere + (nar-destination + (string-append + publish-directory "/" + "nar/" + (if (eq? compression 'none) + "" + (string-append + (symbol->string target-compression) "/")) + (basename output-filename))) + (temporary-destination + (string-append nar-destination ".tmp"))) + (mkdir-p (dirname temporary-destination)) + (call-with-input-file nar-location + (lambda (source-port) + (call-with-decompressed-port + ;; TODO Don't assume the source compression + 'lzip + source-port + (lambda (decompressed-source-port) + (let ((call-with-compressed-output-port* + (match target-compression + ('gzip + (@ (zlib) call-with-gzip-output-port)) + ('lzip + (@ (lzlib) call-with-lzip-output-port)) + ('zstd + (@ (zstd) call-with-zstd-output-port)) + ('none + (lambda (port proc) + (proc port) + (close-port port)))))) + (when (file-exists? temporary-destination) + (delete-file temporary-destination)) + (apply + call-with-compressed-output-port* + (open-output-file temporary-destination + #:binary #t) + (lambda (compressed-port) + (dump-port decompressed-source-port + compressed-port)) + (if (pair? compression) + (cdr compression) + '()))))))) + (rename-file temporary-destination + nar-destination)))) + compressions) (call-with-output-file narinfo-location (lambda (port) @@ -257,7 +336,23 @@ (assq-ref output 'size) (vector->list (assq-ref output 'references)) - `((lzip ,(stat:size (stat nar-location #f)))) + (map + (lambda (compression-details) + (let* ((compression + (if (pair? compression-details) + (car compression-details) + compression-details)) + ;; TODO This logic should sit elsewhere + (file (string-append + publish-directory "/" + "nar/" + (if (eq? compression 'none) + "" + (string-append + (symbol->string compression) "/")) + (basename output-filename)))) + (list compression (stat:size (stat file #f))))) + compressions) #:system (datastore-find-derivation-system datastore drv-name) @@ -276,18 +371,16 @@ (raise-exception exn)) (lambda () (post-publish-hook publish-directory - narinfo-filename - nar-filename)) + narinfo-filename)) #:unwind? #t)) - (cons narinfo-filename - nar-filename))))) + narinfo-filename)))) (let* ((build-details (datastore-find-build datastore build-id)) (drv-name (assq-ref build-details 'derivation-name)) - (narinfos-and-nars + (narinfos (append (if publish-referenced-derivation-source-files? (process-referenced-derivation-source-files drv-name) @@ -297,23 +390,22 @@ (process-output drv-name output)) (datastore-list-build-outputs datastore build-id))))) (when (and combined-post-publish-hook - (not (null? narinfos-and-nars))) + (not (null? narinfos))) (with-exception-handler (lambda (exn) ;; Rollback narinfo creation, to make this more ;; transactional (for-each - (match-lambda - ((narinfo-filename . _) - (delete-file - (string-append - narinfo-directory "/" narinfo-filename)))) - narinfos-and-nars) + (lambda + (narinfo-filename) + (delete-file + (string-append + narinfo-directory "/" narinfo-filename))) + narinfos) (raise-exception exn)) (lambda () - (combined-post-publish-hook publish-directory - narinfos-and-nars)) + (combined-post-publish-hook publish-directory narinfos)) #:unwind? #t))))) (define* (build-success-s3-publish-hook @@ -544,8 +636,14 @@ output-log-file) (delete-file source-log-file))))) -(define (default-build-missing-inputs-hook build-coordinator - build-id missing-inputs) +(define* (default-build-missing-inputs-hook + build-coordinator + build-id missing-inputs + #:key (submit-build? (lambda* (missing-input + #:key successful-builds + pending-builds) + (and (null? successful-builds) + (null? pending-builds))))) (define datastore (build-coordinator-datastore build-coordinator)) @@ -579,8 +677,9 @@ (not (assq-ref build-details 'processed)) (not (assq-ref build-details 'canceled)))) builds-for-output))) - (if (and (null? successful-builds) - (null? pending-builds)) + (if (submit-build? missing-input + #:successful-builds successful-builds + #:pending-builds pending-builds) (begin (simple-format #t "submitting build for ~A\n" @@ -590,8 +689,7 @@ #:tags (datastore-fetch-build-tags datastore build-id))) - (simple-format #t "~A builds exist for ~A, skipping\n" - (length builds-for-output) + (simple-format #t "skipping submitting build for ~A\n" missing-input))) (begin (simple-format (current-error-port) diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index 0acd62a..d747962 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -215,7 +215,7 @@ (define port-write-timeout-error? (record-predicate &port-write-timeout)) -(define* (with-port-timeouts thunk #:key (timeout (* 120 1000))) +(define* (with-port-timeouts thunk #:key (timeout 120)) ;; When the GC runs, it restarts the poll syscall, but the timeout remains ;; unchanged! When the timeout is longer than the time between the syscall @@ -227,8 +227,7 @@ (define (wait port mode) (let ((timeout-internal (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) + (* internal-time-units-per-second timeout)))) (let loop ((poll-value (port-poll port mode poll-timeout-ms))) (if (= poll-value 0) @@ -567,9 +566,9 @@ (define* (store-item->recutils compression file-size) (let ((url (encode-and-join-uri-path `(,@(split-and-decode-uri-path nar-path) - ,@(if compression - (list (symbol->string compression)) - '()) + ,@(if (eq? compression 'none) + '() + (list (symbol->string compression))) ,(basename store-path))))) (format #f "URL: ~a~%Compression: ~a~%~@[FileSize: ~a~%~]" url diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index 293429c..a064175 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -51,6 +51,8 @@ (name "unnamed")) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the worker thread procedure." + (define thread-proc-vector + (make-vector parallelism #f)) (define (initializer/safe) (let ((args @@ -74,7 +76,7 @@ arguments of the worker thread procedure." args ;; never give up, just keep retrying (begin - (sleep 5) + (sleep 1) (initializer/safe))))) (define (destructor/safe args) @@ -100,10 +102,10 @@ arguments of the worker thread procedure." (or success? #t (begin - (sleep 5) + (sleep 1) (destructor/safe args))))) - (define (process channel args) + (define (process thread-index channel args) (let loop ((current-lifetime lifetime)) (let ((exception? (match (get-message channel) @@ -124,6 +126,9 @@ arguments of the worker thread procedure." internal-time-units-per-second) exn)) (lambda () + (vector-set! thread-proc-vector + thread-index + proc) (with-throw-handler #t (lambda () (call-with-values @@ -149,6 +154,10 @@ arguments of the worker thread procedure." (put-message reply response) + (vector-set! thread-proc-vector + thread-index + #f) + (match response (('worker-thread-error duration _) (when duration-logger @@ -188,7 +197,7 @@ arguments of the worker thread procedure." "worker-thread-channel: exception: ~A\n" exn)) (lambda () (parameterize ((%worker-thread-args args)) - (process channel args))) + (process thread-index channel args))) #:unwind? #t) (when destructor @@ -196,7 +205,9 @@ arguments of the worker thread procedure." (init (initializer/safe)))))) (iota parallelism)) - channel)) + + (values channel + thread-proc-vector))) (define &worker-thread-timeout (make-exception-type '&worker-thread-timeout @@ -447,20 +458,16 @@ If already in the worker thread, call PROC immediately." (define (readable? port) "Test if PORT is writable." - (match (select (vector port) #() #() 0) - ((#() #() #()) #f) - ((#(_) #() #()) #t))) + (= 1 (port-poll port "r" 0))) (define (writable? port) "Test if PORT is writable." - (match (select #() (vector port) #() 0) - ((#() #() #()) #f) - ((#() #(_) #()) #t))) + (= 1 (port-poll port "w" 0))) (define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) (make-base-operation #f (lambda _ - (and (ready? (port-ready-fd port)) values)) + (and (ready? port) values)) (lambda (flag sched resume) (define (commit) (match (atomic-box-compare-and-swap! flag 'W 'S) |