aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator')
-rw-r--r--guix-build-coordinator/agent-messaging/http/server.scm68
-rw-r--r--guix-build-coordinator/agent.scm15
-rw-r--r--guix-build-coordinator/build-allocator.scm69
-rw-r--r--guix-build-coordinator/client-communication.scm74
-rw-r--r--guix-build-coordinator/coordinator.scm603
-rw-r--r--guix-build-coordinator/datastore.scm8
-rw-r--r--guix-build-coordinator/datastore/abstract.scm5
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm542
-rw-r--r--guix-build-coordinator/hooks.scm166
-rw-r--r--guix-build-coordinator/utils.scm11
-rw-r--r--guix-build-coordinator/utils/fibers.scm31
11 files changed, 865 insertions, 727 deletions
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm
index 85ba0d4..2b325d7 100644
--- a/guix-build-coordinator/agent-messaging/http/server.scm
+++ b/guix-build-coordinator/agent-messaging/http/server.scm
@@ -135,7 +135,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(define (http-agent-messaging-start-server port host secret-key-base
build-coordinator
- chunked-request-channel
output-hash-channel)
(define plain-metrics-registry
(make-metrics-registry))
@@ -183,7 +182,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
body
secret-key-base
build-coordinator
- chunked-request-channel
output-hash-channel
update-managed-metrics!
plain-metrics-registry)))
@@ -614,7 +612,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
body
secret-key-base
build-coordinator
- chunked-request-channel
output-hash-channel
update-managed-metrics!
plain-metrics-registry)
@@ -794,18 +791,9 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
body)))
(call-with-output-file tmp-output-file-name
(lambda (output-port)
- ;; Older agents may still attempt to use chunked encoding
- ;; for this request
- (if (member '(chunked) (request-transfer-encoding request))
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (dump-port body-port
- output-port
- (request-content-length request))))
- (dump-port body-port
- output-port
- (request-content-length request))))))
+ (dump-port body-port
+ output-port
+ (request-content-length request)))))
(rename-file tmp-output-file-name
output-file-name)
(no-content))
@@ -852,20 +840,12 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
"deleting " tmp-output-file-name)
(delete-file tmp-output-file-name))
- (if (member '(chunked) (request-transfer-encoding request))
- ;; Older agents may use chunked encoding for this request
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (receive-file body
- #f
- tmp-output-file-name)))
- (let ((content-length
- (request-content-length request)))
- (when (> content-length 0)
- (receive-file body
- content-length
- tmp-output-file-name))))
+ (let ((content-length
+ (request-content-length request)))
+ (when (> content-length 0)
+ (receive-file body
+ content-length
+ tmp-output-file-name)))
(if (file-exists? output-file-name)
(render-json
@@ -945,19 +925,10 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
"deleting " output-file-name)
(delete-file output-file-name))
- (if (member '(chunked) (request-transfer-encoding request))
- ;; Older agents may use chunked encoding for this request
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (receive-file body
- #f
- tmp-output-file-name
- #:append? #t)))
- (receive-file body
- (request-content-length request)
- tmp-output-file-name
- #:append? #t)))
+ (receive-file body
+ content-length
+ tmp-output-file-name
+ #:append? #t))
(if (file-exists? output-file-name)
(render-json
@@ -1015,12 +986,13 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
#:code 200
#:headers '((content-type . (text/plain))
(vary . (accept))))
- (lambda (port)
- (write-metrics (build-coordinator-metrics-registry
- build-coordinator)
- port)
- (write-metrics plain-metrics-registry
- port))))))
+ (call-with-output-string
+ (lambda (port)
+ (write-metrics (build-coordinator-metrics-registry
+ build-coordinator)
+ port)
+ (write-metrics plain-metrics-registry
+ port)))))))
(_
(render-json
"not-found"
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index a64a61c..7afd549 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -77,7 +77,8 @@
derivation-substitute-urls
non-derivation-substitute-urls
metrics-file
- max-1min-load-average)
+ max-1min-load-average
+ timestamp-log-output?)
(define lgr (make <logger>))
(define port-log (make <port-log>
#:port (current-output-port)
@@ -85,9 +86,11 @@
;; In guile-lib v0.2.8 onwards, the formatter is
;; called with more arguments
(lambda args ; lvl, time, str
- (format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime
- (second args)))
+ (format #f "~a(~5a): ~a~%"
+ (if timestamp-log-output?
+ (strftime "%F %H:%M:%S " (localtime
+ (second args)))
+ "")
(first args)
(third args)))))
@@ -642,7 +645,7 @@ unable to query substitute servers without caching"))
(has-substiutes-no-cache?
non-derivation-substitute-urls
file))
- #:timeout (* 60 1000)))
+ #:timeout 60))
#:times 20
#:delay (random 15))
#f)))
@@ -704,7 +707,7 @@ but the guix-daemon claims it's unavailable"
((current-build-output-port log-port))
(build-things fetch-substitute-store
missing-paths)))
- #:timeout (* 60 10 1000)))
+ #:timeout (* 60 10)))
(lambda (key . args)
(log-msg lgr 'ERROR
"exception when fetching missing paths "
diff --git a/guix-build-coordinator/build-allocator.scm b/guix-build-coordinator/build-allocator.scm
index f73c457..588f213 100644
--- a/guix-build-coordinator/build-allocator.scm
+++ b/guix-build-coordinator/build-allocator.scm
@@ -352,20 +352,13 @@
#:label-values `((system . ,system)))))
counts)))
- ;; Go through the setup failures and look specifically at the
- ;; missing_inputs ones. Eliminate any missing_inputs failures if all the
- ;; missing inputs appear to have been built successfully, and update the
+ ;; Go through the setup failures and update the
;; derived-build-priorities-hash to reflect the priorities of builds based
;; on the builds that would be "unblocked" if they were completed.
(for-each
(lambda (setup-failure-build-id)
(let ((setup-failures
- (filter
- (lambda (setup-failure)
- (string=? "missing_inputs"
- (assq-ref setup-failure 'failure-reason)))
- (hash-ref setup-failures-hash
- setup-failure-build-id)))
+ (hash-ref setup-failures-hash setup-failure-build-id))
(setup-failure-build-derived-priority
(hash-ref derived-build-priorities-hash
setup-failure-build-id)))
@@ -425,31 +418,28 @@
setup-failures-hash))
(let ((result
- (append-map
+ (map
(lambda (agent-id)
(log "considering builds for" agent-id)
(let ((builds-sorted-by-derived-priority
(sort-list (filter (filter-builds-for-agent agent-id)
builds)
(build-sorting-function-for-agent agent-id))))
- (if (null? builds-sorted-by-derived-priority)
- '()
- (let ((final-ordered-builds
- (concatenate
- (map sort-priority-sublist
- (limit-processed-sublists
- (break-builds-in-to-priority-sublists
- builds-sorted-by-derived-priority))))))
- (let ((builds-for-agent
- (limit-planned-builds final-ordered-builds)))
- (map (lambda (build-id ordering)
- (list build-id
- agent-id
- ordering))
- (map (lambda (build)
- (assq-ref build 'uuid))
- builds-for-agent)
- (iota (length builds-for-agent))))))))
+ (cons
+ agent-id
+ (if (null? builds-sorted-by-derived-priority)
+ '()
+ (let ((final-ordered-builds
+ (concatenate
+ (map sort-priority-sublist
+ (limit-processed-sublists
+ (break-builds-in-to-priority-sublists
+ builds-sorted-by-derived-priority))))))
+ (let ((builds-for-agent
+ (limit-planned-builds final-ordered-builds)))
+ (map (lambda (build)
+ (assq-ref build 'uuid))
+ builds-for-agent)))))))
(map (lambda (agent)
(assq-ref agent 'uuid))
agents))))
@@ -538,7 +528,10 @@
(assq-ref setup-failure 'failure-reason)))
(cond
((string=? failure-reason "missing_inputs")
- #f)
+ ;; This problem might go away, but just don't try the same agent
+ ;; again for now.
+ (string=? (assq-ref setup-failure 'agent-id)
+ agent-id))
((string=? failure-reason "could_not_delete_outputs")
;; This problem might go away, but just don't try the same agent
;; again for now.
@@ -588,7 +581,7 @@
counts)))
(let ((result
- (append-map
+ (map
(lambda (agent-id)
(log "considering builds for" agent-id)
(let* ((filter-proc
@@ -600,7 +593,8 @@
(if (or (and planned-builds-for-agent-limit
(>= count planned-builds-for-agent-limit))
(null? potential-build-ids))
- build-ids ;; highest priority last
+ (reverse build-ids) ;; highest priority last, so
+ ;; reverse
(let ((potential-build (first potential-build-ids)))
(if (filter-proc potential-build)
(loop (+ 1 count)
@@ -610,18 +604,7 @@
(loop count
build-ids
(cdr potential-build-ids))))))))
- (if (null? build-ids)
- '()
- (let ((build-ids-count
- (length build-ids)))
- (map (lambda (build-id ordering)
- (list build-id
- agent-id
- ordering))
- build-ids
- (iota build-ids-count
- build-ids-count
- -1))))))
+ (cons agent-id build-ids)))
(map (lambda (agent)
(assq-ref agent 'uuid))
agents))))
diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm
index 46535e4..47a289d 100644
--- a/guix-build-coordinator/client-communication.scm
+++ b/guix-build-coordinator/client-communication.scm
@@ -32,6 +32,7 @@
#:use-module (json)
#:use-module (logging logger)
#:use-module (gcrypt random)
+ #:use-module (prometheus)
#:use-module (web uri)
#:use-module (web client)
#:use-module (web request)
@@ -65,7 +66,8 @@
(define (start-client-request-server secret-key-base
host
port
- build-coordinator)
+ build-coordinator
+ utility-thread-pool-channel)
(run-server/patched
(lambda (request body)
(log-msg (build-coordinator-logger build-coordinator)
@@ -80,7 +82,8 @@
(uri-path (request-uri request))))
body
secret-key-base
- build-coordinator)))
+ build-coordinator
+ utility-thread-pool-channel)))
#:host host
#:port port))
@@ -88,7 +91,8 @@
method-and-path-components
raw-body
secret-key-base
- build-coordinator)
+ build-coordinator
+ utility-thread-pool-channel)
(define datastore
(build-coordinator-datastore build-coordinator))
@@ -97,6 +101,14 @@
(json-string->scm (utf8->string raw-body))
'()))
+ (define read-drv-error-count-metric
+ (or (metrics-registry-fetch-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "read_derivation_error_total")
+ (make-counter-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "read_derivation_error_total")))
+
(define (controller-thunk)
(match method-and-path-components
(('GET "build" uuid)
@@ -292,8 +304,8 @@
(render-json
`((build_allocation_plan
. ,(list->vector
- (datastore-list-allocation-plan-builds
- datastore
+ (build-coordinator-list-allocation-plan-builds
+ build-coordinator
agent-id)))))))
(('POST "agent" agent-id "passwords")
(let ((password (new-agent-password
@@ -461,13 +473,33 @@
(simple-format #f "derivation must be a string: ~A\n"
derivation))))
+ (unless (derivation-path? derivation-file)
+ (raise-exception
+ (make-exception-with-message
+ "invalid derivation path")))
+
+ (string-for-each
+ (lambda (c)
+ (unless (or (char-alphabetic? c)
+ (char-numeric? c)
+ (member c '(#\+ #\- #\. #\_ #\? #\=)))
+ (raise-exception
+ (make-exception-with-message
+ (simple-format #f "invalid character in derivation name: ~A"
+ c)))))
+ (store-path-base derivation-file))
+
(define (read-drv/substitute derivation-file)
(with-store/non-blocking store
(unless (valid-path? store derivation-file)
(substitute-derivation store
derivation-file
#:substitute-urls substitute-urls)))
- (read-derivation-from-file* derivation-file))
+ ;; Read the derivation in a thread to avoid blocking fibers
+ (call-with-worker-thread
+ utility-thread-pool-channel
+ (lambda ()
+ (read-derivation-from-file* derivation-file))))
(let ((submit-build-result
(call-with-delay-logging
@@ -484,17 +516,18 @@
'WARN
"exception substituting derivation " derivation-file
": " exn)
-
- (if (null? (or substitute-urls '()))
- ;; Try again
- (read-drv/substitute derivation-file)
- (read-derivation-through-substitutes
- derivation-file
- substitute-urls)))
+ (raise-exception exn))
(lambda ()
(with-throw-handler #t
(lambda ()
- (read-drv/substitute derivation-file))
+ (retry-on-error
+ (lambda ()
+ (read-drv/substitute derivation-file))
+ #:times 2
+ #:delay 3
+ #:error-hook
+ (lambda _
+ (metric-increment read-drv-error-count-metric))))
(lambda args
(backtrace))))
#:unwind? #t))
@@ -512,6 +545,10 @@
body "ensure-all-related-derivation-outputs-have-builds")
'(#:ensure-all-related-derivation-outputs-have-builds? #t)
'())
+ ,@(if (assoc-ref
+ body "skip-updating-derived-priorities")
+ '(#:skip-updating-derived-priorities? #t)
+ '())
,@(if (assoc-ref body "tags")
`(#:tags
,(map
@@ -535,8 +572,7 @@
datastore
(lambda (_)
(let ((allocation-plan-counts
- (datastore-count-build-allocation-plan-entries
- datastore)))
+ (build-coordinator-allocation-plan-stats build-coordinator)))
`((state_id . ,(build-coordinator-get-state-id build-coordinator))
(agents . ,(list->vector
(map
@@ -753,7 +789,8 @@
ensure-all-related-derivation-outputs-have-builds?
tags
#:key
- defer-until)
+ defer-until
+ skip-updating-derived-priorities?)
(send-request coordinator-uri
'POST
"/builds"
@@ -771,6 +808,9 @@
,@(if ensure-all-related-derivation-outputs-have-builds?
'((ensure-all-related-derivation-outputs-have-builds . #t))
'())
+ ,@(if skip-updating-derived-priorities?
+ '((skip-updating-derived-priorities . #t))
+ '())
,@(if (null? tags)
'()
`((tags . ,(list->vector tags))))
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index 34b05c9..a7fb664 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -25,6 +25,7 @@
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-19)
#:use-module (srfi srfi-26)
+ #:use-module (srfi srfi-43)
#:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
#:use-module (ice-9 vlist)
@@ -98,6 +99,10 @@
build-coordinator-prompt-hook-processing-for-event
start-hook-processing-threads
+ build-coordinator-allocation-plan-stats
+ build-coordinator-trigger-build-allocation
+ build-coordinator-list-allocation-plan-builds
+
build-output-file-location
build-log-file-destination
build-log-file-location
@@ -117,7 +122,8 @@
(define-record-type <build-coordinator>
(make-build-coordinator-record datastore hooks metrics-registry
- allocation-strategy logger)
+ allocation-strategy allocator-channel
+ logger)
build-coordinator?
(datastore build-coordinator-datastore)
(hooks build-coordinator-hooks)
@@ -125,8 +131,10 @@
set-build-coordinator-hook-condvars!)
(metrics-registry build-coordinator-metrics-registry)
(allocation-strategy build-coordinator-allocation-strategy)
- (allocator-thread build-coordinator-allocator-thread
- set-build-coordinator-allocator-thread!)
+ (trigger-build-allocation
+ build-coordinator-trigger-build-allocation
+ set-build-coordinator-trigger-build-allocation!)
+ (allocator-channel build-coordinator-allocator-channel)
(logger build-coordinator-logger)
(events-channel build-coordinator-events-channel
set-build-coordinator-events-channel!)
@@ -364,7 +372,8 @@
(not (client-error? exn))))))
hooks
(allocation-strategy
- basic-build-allocation-strategy))
+ basic-build-allocation-strategy)
+ (timestamp-log-output? #t))
(and (or (list? hooks)
(begin
(simple-format
@@ -398,9 +407,11 @@
;; In guile-lib v0.2.8 onwards, the formatter is
;; called with more arguments
(lambda args ; lvl, time, str
- (format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime
- (second args)))
+ (format #f "~a(~5a): ~a~%"
+ (if timestamp-log-output?
+ (strftime "%F %H:%M:%S " (localtime
+ (second args)))
+ "")
(first args)
(third args)))))
(build-coordinator
@@ -408,6 +419,7 @@
hooks
metrics-registry
allocation-strategy
+ (make-channel)
lgr)))
(add-handler! lgr port-log)
@@ -424,11 +436,6 @@
;; The logger assumes this
(set-port-encoding! (current-output-port) "UTF-8")
- ;; Work around my broken with-store/non-blocking in Guix
- (let ((socket-file (%daemon-socket-uri)))
- (%daemon-socket-uri
- (string-append "file://" socket-file)))
-
(with-exception-handler
(lambda (exn)
(simple-format #t "failed enabling core dumps: ~A\n" exn))
@@ -458,7 +465,7 @@
(when update-datastore?
(datastore-update (build-coordinator-datastore build-coordinator)))
- (set-build-coordinator-allocator-thread!
+ (set-build-coordinator-trigger-build-allocation!
build-coordinator
(make-build-allocator-thread build-coordinator))
@@ -490,31 +497,29 @@
;; Create some worker thread channels, which need to be created prior
;; to run-fibers being called.
- (let ((chunked-request-channel
- ;; There are fibers issues when trying to read the chunked
- ;; requests, so do this in dedicated threads.
+ (let ((output-hash-channel
+ (make-output-hash-channel
+ build-coordinator))
+
+ (utility-thread-pool-channel
(make-worker-thread-channel
(const '())
- #:name "chunked request"
- #:parallelism 16
- #:log-exception?
- (lambda (exn)
- (not
- (chunked-input-ended-prematurely-error?
- exn)))
+ #:name "utility"
+ #:parallelism 10
#:delay-logger
- (lambda (seconds-delayed)
- (log-delay "chunked request channel"
- seconds-delayed)
- (when (> seconds-delayed 0.1)
- (format
- (current-error-port)
- "warning: chunked request channel delayed by ~1,2f seconds~%"
- seconds-delayed)))))
-
- (output-hash-channel
- (make-output-hash-channel
- build-coordinator)))
+ (let ((delay-metric
+ (make-histogram-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "utility_thread_pool_delay_seconds")))
+ (lambda (seconds-delayed)
+ (log-delay "utility thread channel"
+ seconds-delayed)
+ (metric-observe delay-metric seconds-delayed)
+ (when (> seconds-delayed 0.1)
+ (format
+ (current-error-port)
+ "warning: utility thread channel delayed by ~1,2f seconds~%"
+ seconds-delayed)))))))
(let ((finished? (make-condition)))
(call-with-sigint
@@ -550,6 +555,8 @@
(spawn-fiber-to-watch-for-deferred-builds build-coordinator)
+ (spawn-build-allocation-plan-management-fiber build-coordinator)
+
(set-build-coordinator-scheduler! build-coordinator
(current-scheduler))
@@ -575,7 +582,6 @@
host
secret-key-base
build-coordinator
- chunked-request-channel
output-hash-channel)
(log-msg 'INFO "listening on " host ":" port))))
@@ -584,7 +590,8 @@
secret-key-base
(uri-host client-communication-uri)
(uri-port client-communication-uri)
- build-coordinator)
+ build-coordinator
+ utility-thread-pool-channel)
;; Guile seems to just stop listening on ports, so try to
;; monitor that internally and just quit if it happens
@@ -605,6 +612,7 @@
(ignore-if-build-for-derivation-exists? #f)
(ignore-if-build-for-outputs-exists? #f)
(ensure-all-related-derivation-outputs-have-builds? #f)
+ skip-updating-derived-priorities?
(tags '())
defer-until
(read-drv read-derivation-from-file*))
@@ -617,45 +625,15 @@
#:include-canceled? #f)
0))
- (define (build-for-output-already-exists?)
- ;; Handle the derivation not existing in the database here, so that adding
- ;; it to the database isn't required for this code to work
- (let* ((system-from-database (datastore-find-derivation-system datastore
- derivation-file))
-
- (derivation-exists-in-database? (not (eq? #f system-from-database)))
-
- (derivation
- (if derivation-exists-in-database?
- #f ; unnecessary to fetch derivation
- ;; Bit of a hack, but offload reading the derivation to a
- ;; thread so that it doesn't block the fibers thread, since
- ;; local I/O doesn't cooperate with fibers
- (datastore-call-with-transaction
- datastore
- (lambda _
- (with-fibers-port-timeouts
- (lambda ()
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file)))
- #:timeout 240))
- #:readonly? #t)))
-
- (system
- (or system-from-database
- (derivation-system derivation)))
-
- (outputs
- (if derivation-exists-in-database?
- (datastore-find-derivation-outputs datastore
- derivation-file)
- (map
- (match-lambda
- ((name . output)
- `((name . ,name)
- (output . ,(derivation-output-path output)))))
- (derivation-outputs derivation)))))
+ (define (build-for-output-already-exists/with-derivation? derivation)
+ (let ((system (derivation-system derivation))
+ (outputs
+ (map
+ (match-lambda
+ ((name . output)
+ `((name . ,name)
+ (output . ,(derivation-output-path output)))))
+ (derivation-outputs derivation))))
(any
(lambda (output-details)
(let ((builds-for-output
@@ -667,13 +645,40 @@
(not (null? builds-for-output))))
outputs)))
- (define (check-whether-to-store-build)
+ (define (build-for-output-already-exists?)
+ (let ((system (datastore-find-derivation-system datastore
+ derivation-file)))
+ (if (eq? #f system) ; derivation does not exist in database?
+ (build-for-output-already-exists/with-derivation?
+ (with-fibers-port-timeouts
+ (lambda ()
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
+ #:timeout 240))
+ (any
+ (lambda (output-details)
+ (let ((builds-for-output
+ (datastore-list-builds-for-output-and-system
+ datastore
+ (assq-ref output-details 'output)
+ system
+ #:include-canceled? #f)))
+ (not (null? builds-for-output))))
+ (datastore-find-derivation-outputs datastore
+ derivation-file)))))
+
+ (define* (check-whether-to-store-build #:optional derivation)
(cond
((and ignore-if-build-for-derivation-exists?
(build-for-derivation-exists?))
'((no-build-submitted . build-already-exists-for-this-derivation)))
((and ignore-if-build-for-outputs-exists?
- (call-with-delay-logging build-for-output-already-exists?))
+ (if derivation
+ (call-with-delay-logging
+ build-for-output-already-exists/with-derivation?
+ #:args (list derivation))
+ (call-with-delay-logging build-for-output-already-exists?)))
'((no-build-submitted . build-already-exists-for-a-output)))
(else
'continue)))
@@ -703,18 +708,20 @@
(or requested-uuid
(random-v4-uuid)))
- (define (build-perform-datastore-changes derivations-lacking-builds)
+ (define (build-perform-datastore-changes derivation derivations-lacking-builds)
(lambda (_)
;; Check again now, since new builds could have been added since the
;; checks were made before the start of the transaction.
- (match (check-whether-to-store-build)
+ (match (check-whether-to-store-build derivation)
('continue
;; Actually create a build, do this first so the derived priorities
;; for the builds inserted below are informed by this build.
(store-build derivation-file
build-id
priority
- tags)
+ tags
+ #:skip-updating-other-build-derived-priorities
+ skip-updating-derived-priorities?)
(for-each
(match-lambda
@@ -752,66 +759,83 @@
(build-coordinator-metrics-registry build-coordinator)
"coordinator_submit_build_duration_seconds"
(lambda ()
+ (unless (derivation-path? derivation-file)
+ (raise-exception
+ (make-client-error 'invalid-derivation-file)))
+
+ (string-for-each
+ (lambda (c)
+ (unless (or (char-alphabetic? c)
+ (char-numeric? c)
+ (member c '(#\+ #\- #\. #\_ #\? #\=)))
+ (raise-exception
+ (make-client-error 'invalid-character-in-derivation-file))))
+ (store-path-base derivation-file))
+
(match (check-whether-to-store-build)
('continue
- ;; Store the derivation first, so that listing related derivations
- ;; with no builds works
- (unless (datastore-find-derivation datastore derivation-file)
- (datastore-store-derivation
- datastore
- (with-fibers-port-timeouts
- (lambda ()
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file)))
- #:timeout 30)))
+ (let ((drv
+ ;; If the dervation is missing from the database, read it and
+ ;; enter it in to the database, so that listing related
+ ;; derivations with no builds works
+ (if (datastore-find-derivation datastore derivation-file)
+ #f
+ (with-fibers-port-timeouts
+ (lambda ()
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
+ #:timeout 30))))
+ (when drv
+ (datastore-store-derivation datastore drv))
- (let ((related-derivations-lacking-builds
- (if ensure-all-related-derivation-outputs-have-builds?
- (datastore-list-related-derivations-with-no-build-for-outputs
+ (let ((related-derivations-lacking-builds
+ (if ensure-all-related-derivation-outputs-have-builds?
+ (datastore-list-related-derivations-with-no-build-for-outputs
+ datastore
+ derivation-file)
+ '())))
+ (match (datastore-call-with-transaction
datastore
- derivation-file)
- '())))
- (match (datastore-call-with-transaction
- datastore
- (build-perform-datastore-changes
- ;; Do this here so it doesn't take time in the writer thread
- (map
- (lambda (drv)
- ;; Generate the UUID's outside the transaction to save
- ;; time too.
- (cons drv (random-v4-uuid)))
- related-derivations-lacking-builds))
- #:duration-metric-name
- "store_build")
- (#t ; build submitted
- (build-coordinator-prompt-hook-processing-for-event
- build-coordinator
- 'build-submitted)
-
- (build-coordinator-send-event
- build-coordinator
- 'build-submitted
- `((id . ,build-id)
- (derivation . ,derivation-file)
- (priority . ,priority)
- (tags
- . ,(list->vector
+ (build-perform-datastore-changes
+ drv
+ ;; Do this here so it doesn't take time in the writer thread
(map
- (match-lambda
- ((key . value)
- `((key . ,key)
- (value . ,value))))
- (if (vector? tags)
- (vector->list tags)
- tags))))
- (defer_until . ,defer-until)))
-
- (trigger-build-allocation build-coordinator)
-
- `((build-submitted . ,build-id)))
- (stop-condition
- stop-condition))))
+ (lambda (related-drv)
+ ;; Generate the UUID's outside the transaction to save
+ ;; time too.
+ (cons related-drv (random-v4-uuid)))
+ related-derivations-lacking-builds))
+ #:duration-metric-name
+ "store_build")
+ (#t ; build submitted
+ (build-coordinator-prompt-hook-processing-for-event
+ build-coordinator
+ 'build-submitted)
+
+ (build-coordinator-send-event
+ build-coordinator
+ 'build-submitted
+ `((id . ,build-id)
+ (derivation . ,derivation-file)
+ (priority . ,priority)
+ (tags
+ . ,(list->vector
+ (map
+ (match-lambda
+ ((key . value)
+ `((key . ,key)
+ (value . ,value))))
+ (if (vector? tags)
+ (vector->list tags)
+ tags))))
+ (defer_until . ,defer-until)))
+
+ (trigger-build-allocation build-coordinator)
+
+ `((build-submitted . ,build-id)))
+ (stop-condition
+ stop-condition)))))
(stop-condition
stop-condition)))))
@@ -841,7 +865,9 @@
(make-transaction-rollback-exception
'skipped-as-build-required-by-another)))
- (datastore-remove-build-from-allocation-plan datastore uuid)
+ (build-coordinator-remove-build-from-allocation-plan
+ build-coordinator
+ uuid)
(datastore-cancel-build datastore uuid)
(datastore-insert-unprocessed-hook-event datastore
"build-canceled"
@@ -979,7 +1005,7 @@
(processor_count . ,processor-count))))
(define (trigger-build-allocation build-coordinator)
- ((build-coordinator-allocator-thread build-coordinator)))
+ ((build-coordinator-trigger-build-allocation build-coordinator)))
(define (build-coordinator-prompt-hook-processing-for-event build-coordinator
event-name)
@@ -1000,16 +1026,20 @@
(allocator-proc datastore
#:metrics-registry (build-coordinator-metrics-registry
build-coordinator)))))
- (datastore-replace-build-allocation-plan datastore new-plan)
-
- (let ((build-count-per-agent
- (datastore-count-build-allocation-plan-entries
- datastore)))
- (build-coordinator-send-event
- build-coordinator
- "allocation-plan-update"
- `((allocation_plan_counts . ,build-count-per-agent)))))
- #t)
+ (build-coordinator-replace-build-allocation-plan
+ build-coordinator
+ new-plan)
+
+ (build-coordinator-send-event
+ build-coordinator
+ "allocation-plan-update"
+ `((allocation_plan_counts
+ . ,(map
+ (match-lambda
+ ((agent-id . builds)
+ (cons agent-id (length builds))))
+ new-plan))))
+ #t))
(define (make-build-allocator-thread build-coordinator)
(define mtx (make-mutex))
@@ -1081,31 +1111,213 @@
exn)
(exit 1))
(lambda ()
- (let ((build-allocation-plan-total
- (make-gauge-metric
- (build-coordinator-metrics-registry build-coordinator)
- "build_allocation_plan_total"
- #:labels '(agent_id))))
- (with-time-logging
- "allocator initialise metrics"
- (for-each (match-lambda
- ((agent-id . count)
- (metric-set build-allocation-plan-total
- count
- #:label-values
- `((agent_id . ,agent-id)))))
- (datastore-count-build-allocation-plan-entries
- (build-coordinator-datastore build-coordinator)))))
-
(update-build-allocation-plan-loop)))))
trigger-build-allocation)
+(define (spawn-build-allocation-plan-management-fiber coordinator)
+ (define allocation-plan '())
+
+ (define allocation-plan-metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry coordinator)
+ "build_allocation_plan_total"
+ #:labels '(agent_id)))
+
+ (define (update-build-allocation-plan-metrics!)
+ (for-each
+ (match-lambda
+ ((agent-id . builds)
+ (metric-set allocation-plan-metric
+ (length builds)
+ #:label-values
+ `((agent_id . ,agent-id)))))
+ allocation-plan)
+ #t)
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "exception in allocation plan fiber: ~A\n"
+ exn))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (match (get-message (build-coordinator-allocator-channel coordinator))
+ (('stats reply)
+ (put-message
+ reply
+ (map
+ (match-lambda
+ ((agent-id . builds)
+ (cons agent-id (length builds))))
+ allocation-plan)))
+ (('fetch-agent-plan agent-id reply)
+ (put-message
+ reply
+ (or (list-copy (assoc-ref allocation-plan agent-id))
+ '())))
+ (('fetch-plan reply)
+ (put-message reply
+ (map (match-lambda
+ ((agent-id . builds)
+ (cons agent-id
+ (list-copy builds))))
+ allocation-plan)))
+ (('remove-build uuid reply)
+ (set!
+ allocation-plan
+ (map
+ (match-lambda
+ ((agent-id . builds)
+ (cons agent-id
+ (remove
+ (lambda (build-uuid)
+ (string=? build-uuid uuid))
+ builds))))
+ allocation-plan))
+
+ (put-message reply #t))
+ (('replace new-plan reply)
+
+ (set! allocation-plan new-plan)
+
+ (update-build-allocation-plan-metrics!)
+
+ (put-message reply #t))))
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)))))
+
+(define (build-coordinator-allocation-plan-stats coordinator)
+ (let ((reply (make-channel)))
+ (put-message (build-coordinator-allocator-channel coordinator)
+ (list 'stats reply))
+ (get-message reply)))
+
+(define (build-coordinator-fetch-agent-allocation-plan coordinator agent-id)
+ (let ((reply (make-channel)))
+ (put-message (build-coordinator-allocator-channel coordinator)
+ (list 'fetch-agent-plan agent-id reply))
+ (get-message reply)))
+
+(define (build-coordinator-allocation-plan coordinator)
+ (let ((reply (make-channel)))
+ (put-message (build-coordinator-allocator-channel coordinator)
+ (list 'fetch-plan reply))
+ (get-message reply)))
+
+(define (build-coordinator-build-in-allocation-plan? coordinator uuid)
+ (any
+ (match-lambda
+ ((agent-id . build-uuids)
+ (->bool (member uuid build-uuids string=?))))
+ (build-coordinator-allocation-plan coordinator)))
+
+(define (build-coordinator-remove-build-from-allocation-plan coordinator uuid)
+ (let ((reply (make-channel)))
+ (put-message (build-coordinator-allocator-channel coordinator)
+ (list 'remove-build uuid reply))
+ (get-message reply)))
+
+(define (build-coordinator-replace-build-allocation-plan coordinator plan)
+ (let ((reply (make-channel)))
+ (put-message (build-coordinator-allocator-channel coordinator)
+ (list 'replace plan reply))
+ (get-message reply)))
+
+(define (build-coordinator-fetch-build-to-allocate coordinator
+ agent-id)
+ (define datastore
+ (build-coordinator-datastore coordinator))
+
+ (let ((all-planned-builds
+ (build-coordinator-fetch-agent-allocation-plan coordinator
+ agent-id)))
+
+ (log-msg (build-coordinator-logger coordinator)
+ 'DEBUG
+ "fetching build to allocate to " agent-id ", "
+ (length all-planned-builds) " to consider")
+
+ (let loop ((planned-builds all-planned-builds))
+ (if (null? planned-builds)
+ #f
+ (match (datastore-fetch-build-to-allocate datastore
+ (first planned-builds))
+ (#f
+ (log-msg (build-coordinator-logger coordinator)
+ 'DEBUG
+ agent-id ": ignoring " (first planned-builds))
+ (loop (cdr planned-builds)))
+ (#(uuid derivation-id derivation-name derived_priority)
+
+ (if (datastore-check-if-derivation-conflicts?
+ datastore
+ agent-id
+ derivation-id)
+ (begin
+ (log-msg (build-coordinator-logger coordinator)
+ 'DEBUG
+ agent-id ": " uuid " conflicts with allocated build")
+ (loop (cdr planned-builds)))
+ (begin
+ (log-msg (build-coordinator-logger coordinator)
+ 'DEBUG
+ agent-id ": " uuid " selected")
+
+ `((uuid . ,uuid)
+ (derivation_name . ,derivation-name)
+ (derived_priority . ,derived_priority))))))))))
+
+(define* (build-coordinator-list-allocation-plan-builds coordinator
+ agent-id
+ #:key limit)
+ (define (take* lst i)
+ (if (< (length lst) i)
+ lst
+ (take lst i)))
+
+ (define datastore
+ (build-coordinator-datastore coordinator))
+
+ (let ((build-ids
+ (build-coordinator-fetch-agent-allocation-plan coordinator
+ agent-id)))
+ (filter-map
+ (lambda (build-id)
+ (match (datastore-fetch-build-to-allocate datastore build-id)
+ (#(uuid derivation_id derivation_name derived_priority)
+ (let ((build-details (datastore-find-build datastore uuid)))
+ `((uuid . ,uuid)
+ (derivation_name . ,derivation_name)
+ (system . ,(datastore-find-build-derivation-system
+ datastore
+ uuid))
+ (priority . ,(assq-ref build-details 'priority))
+ (derived_priority . ,derived_priority)
+ (tags . ,(vector-map
+ (lambda (_ tag)
+ (match tag
+ ((key . value)
+ `((key . ,key)
+ (value . ,value)))))
+ (datastore-fetch-build-tags
+ datastore
+ uuid))))))
+ (#f #f)))
+ (if limit
+ (take* build-ids limit)
+ build-ids))))
+
(define (spawn-fiber-to-watch-for-deferred-builds coordinator)
(spawn-fiber
(lambda ()
+ (sleep 10)
(while #t
- (sleep 60) ; 1 minute
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
@@ -1113,14 +1325,21 @@
exn))
(lambda ()
(let ((first-deferred-build
- (datastore-find-first-unallocated-deferred-build
- (build-coordinator-datastore coordinator))))
- (when (and first-deferred-build
- (time<=? (date->time-utc
- (assq-ref first-deferred-build 'deferred-until))
- (current-time)))
- (simple-format #t "guix-build-coordinator: triggering build allocation for deferred build: ~A\n" (assq-ref first-deferred-build 'uuid))
- (trigger-build-allocation coordinator))))
+ (datastore-find-deferred-build
+ (build-coordinator-datastore coordinator)
+ (lambda (build-details)
+ (build-coordinator-build-in-allocation-plan?
+ coordinator
+ (assq-ref build-details 'uuid))))))
+ (if (and first-deferred-build
+ (time<=? (date->time-utc
+ (assq-ref first-deferred-build 'deferred-until))
+ (current-time)))
+ (begin
+ (simple-format #t "guix-build-coordinator: triggering build allocation for deferred build: ~A\n" (assq-ref first-deferred-build 'uuid))
+ (trigger-build-allocation coordinator)
+ (sleep 10))
+ (sleep 60))))
#:unwind? #t)))
#:parallel? #t))
@@ -1268,12 +1487,18 @@
(datastore-list-unprocessed-hook-events
datastore
event-name
- (+ 1 (length in-progress-ids)))))
- (find
- (match-lambda
- ((id rest ...)
- (not (member id in-progress-ids))))
- potential-jobs)))
+ (+ 1 (length in-progress-ids))))
+ (job
+ (find
+ (match-lambda
+ ((id rest ...)
+ (not (member id in-progress-ids))))
+ potential-jobs)))
+ (log-msg
+ (build-coordinator-logger build-coordinator)
+ 'DEBUG
+ event-name " work queue, got job " job)
+ job))
(lambda (id event arguments)
(process-event id event arguments handler))
#:name (symbol->string event-name))))
@@ -1299,11 +1524,13 @@
(build-coordinator-datastore build-coordinator))
(define (allocate-one-build agent-id)
- (let ((build-details (datastore-fetch-build-to-allocate datastore agent-id)))
+ (let ((build-details (build-coordinator-fetch-build-to-allocate
+ build-coordinator agent-id)))
(if build-details
(let ((build-id (assq-ref build-details 'uuid)))
(datastore-insert-to-allocated-builds datastore agent-id (list build-id))
- (datastore-remove-builds-from-plan datastore (list build-id))
+ (build-coordinator-remove-build-from-allocation-plan
+ build-coordinator build-id)
build-details)
#f)))
@@ -1335,20 +1562,6 @@
(let ((new-builds
(allocate-several-builds agent
(- target-count start-count))))
- (unless (null? new-builds)
- (let ((allocation-plan-metric
- (metrics-registry-fetch-metric
- (slot-ref datastore 'metrics-registry)
- "build_allocation_plan_total")))
- (for-each
- (match-lambda
- ((agent-id . count)
- (metric-set allocation-plan-metric
- count
- #:label-values
- `((agent_id . ,agent-id)))))
- (datastore-count-build-allocation-plan-entries datastore))))
-
;; Previously allocate builds just returned newly allocated
;; builds, but if max-builds is provided, return all the
;; builds. This means the agent can handle this in a idempotent
diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm
index a29a993..dc4fec6 100644
--- a/guix-build-coordinator/datastore.scm
+++ b/guix-build-coordinator/datastore.scm
@@ -68,7 +68,7 @@
(re-export datastore-count-builds-for-derivation)
(re-export datastore-list-processed-builds)
(re-export datastore-list-unprocessed-builds)
-(re-export datastore-find-first-unallocated-deferred-build)
+(re-export datastore-find-deferred-build)
(re-export datastore-fetch-prioritised-unprocessed-builds)
(re-export datastore-insert-unprocessed-hook-event)
(re-export datastore-count-unprocessed-hook-events)
@@ -86,16 +86,12 @@
(re-export datastore-list-builds-for-output)
(re-export datastore-list-builds-for-output-and-system)
(re-export datastore-agent-for-build)
-(re-export datastore-count-build-allocation-plan-entries)
-(re-export datastore-replace-build-allocation-plan)
-(re-export datastore-remove-build-from-allocation-plan)
(re-export datastore-count-allocated-builds)
(re-export datastore-agent-requested-systems)
(re-export datastore-update-agent-requested-systems)
(re-export datastore-fetch-build-to-allocate)
+(re-export datastore-check-if-derivation-conflicts?)
(re-export datastore-insert-to-allocated-builds)
-(re-export datastore-remove-builds-from-plan)
-(re-export datastore-list-allocation-plan-builds)
(define* (database-uri->datastore database
#:key
diff --git a/guix-build-coordinator/datastore/abstract.scm b/guix-build-coordinator/datastore/abstract.scm
index 68fc654..799485e 100644
--- a/guix-build-coordinator/datastore/abstract.scm
+++ b/guix-build-coordinator/datastore/abstract.scm
@@ -9,8 +9,7 @@
datastore-new-agent
datastore-new-agent-password
datastore-list-agent-builds
- datastore-agent-password-exists?
- datastore-list-allocation-plan-builds))
+ datastore-agent-password-exists?))
(define-class <abstract-datastore> ())
@@ -24,5 +23,3 @@
(define-generic datastore-agent-password-exists?)
(define-generic datastore-agent-list-unprocessed-builds)
(define-generic datastore-list-agent-builds)
-(define-generic datastore-agent-replace-build-allocation-plan)
-(define-generic datastore-list-allocation-plan-builds)
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index 96751a5..ac04362 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -87,7 +87,7 @@
datastore-replace-agent-tags
datastore-list-processed-builds
datastore-list-unprocessed-builds
- datastore-find-first-unallocated-deferred-build
+ datastore-find-deferred-build
datastore-fetch-prioritised-unprocessed-builds
datastore-insert-unprocessed-hook-event
datastore-count-unprocessed-hook-events
@@ -96,21 +96,20 @@
datastore-delete-unprocessed-hook-event
datastore-list-agent-builds
datastore-agent-for-build
- datastore-count-build-allocation-plan-entries
datastore-replace-build-allocation-plan
- datastore-remove-build-from-allocation-plan
datastore-count-allocated-builds
datastore-agent-requested-systems
datastore-update-agent-requested-systems
datastore-fetch-build-to-allocate
- datastore-insert-to-allocated-builds
- datastore-remove-builds-from-plan
- datastore-list-allocation-plan-builds))
+ datastore-check-if-derivation-conflicts?
+ datastore-insert-to-allocated-builds))
(define-class <sqlite-datastore> (<abstract-datastore>)
database-file
worker-reader-thread-channel
+ worker-reader-thread-proc-vector
worker-writer-thread-channel
+ worker-writer-thread-proc-vector
metrics-registry)
(define* (sqlite-datastore database-uri
@@ -140,138 +139,129 @@
(slot-set! datastore 'database-file database-file)
(slot-set! datastore 'metrics-registry metrics-registry)
- (slot-set!
- datastore
- 'worker-writer-thread-channel
- (make-worker-thread-channel
- (lambda ()
- (let ((db
- (db-open database-file)))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
- (sqlite-exec db "PRAGMA synchronous = NORMAL;")
- (sqlite-exec db "PRAGMA temp_store = MEMORY;")
- (sqlite-exec db "PRAGMA foreign_keys = ON;")
-
- (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;")
-
- (sqlite-exec
- db
- "
-CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
- build_id INTEGER NOT NULL,
- agent_id TEXT NOT NULL,
- ordering INTEGER NOT NULL,
- PRIMARY KEY (agent_id, build_id)
-);")
-
- (list db)))
- #:name "ds write"
- #:destructor
- (let ((writer-thread-destructor-counter
- (make-gauge-metric metrics-registry
- "datastore_writer_thread_close_total")))
- (lambda (db)
- (db-optimize db
- database-file
- metrics-registry
- #:maybe-truncate-wal? #f)
-
- (metric-increment writer-thread-destructor-counter)
- (sqlite-close db)))
- #:lifetime 500
- #:expire-on-exception? #t
-
- ;; SQLite doesn't support parallel writes
- #:parallelism 1
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
- metrics-registry
- "datastore_write_delay_seconds")))
- (lambda (seconds-delayed)
- (metric-observe delay-metric
- ;; TODO exact->inexact to work around
- ;; a bug in guile-prometheus where
- ;; the metric sum will output in the
- ;; exact form including the /q
- (exact->inexact seconds-delayed))
- (log-delay "datastore write" seconds-delayed)
- (when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database write delayed by ~1,2f seconds~%"
- seconds-delayed))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 10)
- (format
- (current-error-port)
- "warning: database write took ~1,2f seconds (~a)~%"
- duration
- proc)))
- #:log-exception? worker-thread-log-exception?))
-
- ;; Make sure the worker thread has initialised, and created the in memory
- ;; tables
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
- (const #t))
-
- (slot-set!
- datastore
- 'worker-reader-thread-channel
- (make-worker-thread-channel
- (lambda ()
- (let ((db
- (db-open database-file #:write? #f)))
- (sqlite-exec db "PRAGMA temp_store = MEMORY;")
- (sqlite-exec db "PRAGMA busy_timeout = 4000;")
- (sqlite-exec db "PRAGMA cache_size = -16000;")
-
- (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;")
-
- (list db)))
- #:name "ds read"
- #:destructor
- (let ((reader-thread-destructor-counter
- (make-gauge-metric metrics-registry
- "datastore_reader_thread_close_total")))
- (lambda (db)
- (metric-increment reader-thread-destructor-counter)
- (sqlite-close db)))
- #:lifetime 50000
- #:expire-on-exception? #t
-
- ;; Use a minimum of 8 and a maximum of 16 threads
- #:parallelism
- (min (max (current-processor-count)
- 8)
- 16)
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
+ (let ((channel
+ proc-vector
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file)))
+ (sqlite-exec db "PRAGMA busy_timeout = 0;")
+ (sqlite-exec db "PRAGMA synchronous = NORMAL;")
+ (sqlite-exec db "PRAGMA temp_store = MEMORY;")
+ (sqlite-exec db "PRAGMA foreign_keys = ON;")
+
+ (list db)))
+ #:name "ds write"
+ #:destructor
+ (let ((writer-thread-destructor-counter
+ (make-gauge-metric metrics-registry
+ "datastore_writer_thread_close_total")))
+ (lambda (db)
+ (db-optimize db
+ database-file
metrics-registry
- "datastore_read_delay_seconds")))
- (lambda (seconds-delayed)
- (metric-observe delay-metric
- ;; TODO exact->inexact to work around
- ;; a bug in guile-prometheus where
- ;; the metric sum will output in the
- ;; exact form including the /q
- (exact->inexact seconds-delayed))
- (log-delay "datastore read" seconds-delayed)
- (when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database read delayed by ~1,2f seconds~%"
- seconds-delayed))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 30)
- (format
- (current-error-port)
- "warning: database read took ~1,2f seconds (~a)~%"
- duration
- proc)))
- #:log-exception? worker-thread-log-exception?))
+ #:maybe-truncate-wal? #f)
+
+ (metric-increment writer-thread-destructor-counter)
+ (sqlite-close db)))
+ #:lifetime 500
+ #:expire-on-exception? #t
+
+ ;; SQLite doesn't support parallel writes
+ #:parallelism 1
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "datastore_write_delay_seconds")))
+ (lambda (seconds-delayed)
+ (metric-observe delay-metric
+ ;; TODO exact->inexact to work around
+ ;; a bug in guile-prometheus where
+ ;; the metric sum will output in the
+ ;; exact form including the /q
+ (exact->inexact seconds-delayed))
+ (log-delay "datastore write" seconds-delayed)
+ (when (> seconds-delayed 1)
+ (format
+ (current-error-port)
+ "warning: database write delayed by ~1,2f seconds~%"
+ seconds-delayed))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 10)
+ (format
+ (current-error-port)
+ "warning: database write took ~1,2f seconds (~a)~%"
+ duration
+ proc)))
+ #:log-exception? worker-thread-log-exception?)))
+
+ (slot-set! datastore
+ 'worker-writer-thread-channel
+ channel)
+ (slot-set! datastore
+ 'worker-writer-thread-proc-vector
+ proc-vector))
+
+ (let ((channel
+ proc-vector
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file #:write? #f)))
+ (sqlite-exec db "PRAGMA temp_store = MEMORY;")
+ (sqlite-exec db "PRAGMA busy_timeout = 0;")
+ (sqlite-exec db "PRAGMA cache_size = -16000;")
+
+ (list db)))
+ #:name "ds read"
+ #:destructor
+ (let ((reader-thread-destructor-counter
+ (make-gauge-metric metrics-registry
+ "datastore_reader_thread_close_total")))
+ (lambda (db)
+ (metric-increment reader-thread-destructor-counter)
+ (sqlite-close db)))
+ #:lifetime 50000
+ #:expire-on-exception? #t
+
+ ;; Use a minimum of 8 and a maximum of 16 threads
+ #:parallelism
+ (min (max (current-processor-count)
+ 8)
+ 16)
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "datastore_read_delay_seconds")))
+ (lambda (seconds-delayed)
+ (metric-observe delay-metric
+ ;; TODO exact->inexact to work around
+ ;; a bug in guile-prometheus where
+ ;; the metric sum will output in the
+ ;; exact form including the /q
+ (exact->inexact seconds-delayed))
+ (log-delay "datastore read" seconds-delayed)
+ (when (> seconds-delayed 1)
+ (format
+ (current-error-port)
+ "warning: database read delayed by ~1,2f seconds~%"
+ seconds-delayed))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 30)
+ (format
+ (current-error-port)
+ "warning: database read took ~1,2f seconds (~a)~%"
+ duration
+ proc)))
+ #:log-exception? worker-thread-log-exception?)))
+ (slot-set! datastore
+ 'worker-reader-thread-channel
+ channel)
+ (slot-set! datastore
+ 'worker-reader-thread-proc-vector
+ proc-vector))
datastore))
@@ -305,11 +295,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
metrics-registry
checkpoint-duration-metric-name
(lambda ()
- (if (> (wal-size) extreme-wal-size-threshold)
- ;; Since the WAL is really getting too big, wait for much longer
- (sqlite-exec db "PRAGMA busy_timeout = 300000;")
- (sqlite-exec db "PRAGMA busy_timeout = 20;"))
-
(let* ((statement
(sqlite-prepare
db
@@ -331,8 +316,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
modified-page-count
pages-moved-to-db)
#t))))))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
-
result)))
#t))
@@ -369,6 +352,17 @@ PRAGMA optimize;")
(spawn-fiber
(lambda ()
(while #t
+ (sleep 20)
+ (vector-for-each
+ (lambda (i proc)
+ (simple-format (current-error-port)
+ "reader thread ~A running: ~A\n"
+ i proc))
+ (slot-ref datastore 'worker-reader-thread-proc-vector)))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
(sleep (* 60 10)) ; 10 minutes
(with-exception-handler
(lambda (exn)
@@ -1742,42 +1736,6 @@ WHERE build_id = :build_id"
#t)
rest))
-(define-method (datastore-remove-build-from-allocation-plan
- (datastore <sqlite-datastore>)
- uuid)
- (define (update-build-allocation-plan-metrics)
- (let ((allocation-plan-metric
- (metrics-registry-fetch-metric
- (slot-ref datastore 'metrics-registry)
- "build_allocation_plan_total")))
- (for-each
- (match-lambda
- ((agent-id . count)
- (metric-set allocation-plan-metric
- count
- #:label-values
- `((agent_id . ,agent-id)))))
- (datastore-count-build-allocation-plan-entries datastore))))
-
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (let ((statement (sqlite-prepare
- db
- "
-DELETE FROM build_allocation_plan WHERE build_id = :build_id"
- #:cache? #t)))
-
- (sqlite-bind-arguments
- statement
- #:build_id (db-find-build-id db uuid))
-
- (sqlite-step-and-reset statement)
-
- (unless (= 0 (changes-count db))
- (update-build-allocation-plan-metrics)))))
- #t)
-
(define-method (datastore-count-build-results
(datastore <sqlite-datastore>))
(call-with-worker-thread
@@ -3024,8 +2982,9 @@ ORDER BY priority DESC"
builds)))))
-(define-method (datastore-find-first-unallocated-deferred-build
- (datastore <sqlite-datastore>))
+(define-method (datastore-find-deferred-build
+ (datastore <sqlite-datastore>)
+ select?)
(call-with-worker-thread
(slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
@@ -3040,25 +2999,29 @@ INNER JOIN derivations
WHERE processed = 0
AND canceled = 0
AND deferred_until IS NOT NULL
- AND builds.id NOT IN (SELECT build_id FROM build_allocation_plan)
-ORDER BY deferred_until ASC
-LIMIT 1"
+ORDER BY deferred_until ASC"
#:cache? #t)))
- (match (sqlite-step-and-reset statement)
- (#(uuid derivation_name priority created_at deferred_until)
- `((uuid . ,uuid)
- (derivation-name . ,derivation_name)
- (priority . ,priority)
- (created-at . ,(if (string? created_at)
- (string->date created_at
- "~Y-~m-~d ~H:~M:~S")
- #f))
- (deferred-until . ,(if (string? deferred_until)
- (string->date deferred_until
- "~Y-~m-~d ~H:~M:~S")
- #f))))
- (#f #f))))))
+ (let loop ((row (sqlite-step statement)))
+ (match row
+ (#(uuid derivation_name priority created_at deferred_until)
+ (let ((res
+ (select?
+ `((uuid . ,uuid)
+ (derivation-name . ,derivation_name)
+ (priority . ,priority)
+ (created-at . ,(if (string? created_at)
+ (string->date created_at
+ "~Y-~m-~d ~H:~M:~S")
+ #f))
+ (deferred-until . ,(if (string? deferred_until)
+ (string->date deferred_until
+ "~Y-~m-~d ~H:~M:~S")
+ #f))))))
+ (if res
+ res
+ (loop (sqlite-step statement)))))
+ (#f #f)))))))
(define-method (datastore-fetch-prioritised-unprocessed-builds
(datastore <sqlite-datastore>))
@@ -3230,103 +3193,6 @@ DELETE FROM unprocessed_hook_events WHERE id = :id"
(sqlite-step-and-reset statement))))
#t)
-(define-method (datastore-count-build-allocation-plan-entries
- (datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (let ((statement
- (sqlite-prepare
- db
- "
-SELECT agent_id, COUNT(*)
-FROM build_allocation_plan
-GROUP BY agent_id"
- #:cache? #t)))
-
- (let ((result
- (sqlite-map
- (match-lambda
- (#(agent_id count)
- (cons agent_id count)))
- statement)))
- (sqlite-reset statement)
-
- result)))))
-
-(define-method (datastore-replace-build-allocation-plan
- (datastore <sqlite-datastore>)
- planned-builds)
- (define (clear-current-plan db)
- (sqlite-exec
- db
- "DELETE FROM build_allocation_plan"))
-
- (define insert-sql
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (string-append
- "
-INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES "
- (string-join
- (map (match-lambda
- ((build-uuid agent-id ordering)
- (simple-format
- #f
- "('~A', '~A', ~A)"
- (db-find-build-id db build-uuid)
- agent-id
- ordering)))
- planned-builds)
- ", ")
- ";"))))
-
- (define (insert-new-plan db planned-builds)
- (sqlite-exec
- db
- insert-sql))
-
- (datastore-call-with-transaction
- datastore
- (lambda (db)
- (clear-current-plan db)
- (unless (null? planned-builds)
- (insert-new-plan db planned-builds)))
- #:duration-metric-name "replace_build_allocation_plan")
-
- (let* ((agent-ids
- (map (lambda (agent-details)
- (assq-ref agent-details 'uuid))
- (datastore-list-agents datastore)))
- (counts-by-agent
- (make-vector (length agent-ids) 0)))
- (for-each
- (match-lambda
- ((_ agent-id _)
- (let ((index (list-index (lambda (list-agent-id)
- (string=? agent-id list-agent-id))
- agent-ids)))
- (vector-set! counts-by-agent
- index
- (+ (vector-ref counts-by-agent
- index)
- 1)))))
- planned-builds)
-
- (let ((metric
- (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry)
- "build_allocation_plan_total")))
- (for-each
- (lambda (index agent-id)
- (metric-set metric
- (vector-ref counts-by-agent index)
- #:label-values
- `((agent_id . ,agent-id))))
- (iota (length agent-ids))
- agent-ids)))
- #t)
-
(define-method (datastore-count-allocated-builds
(datastore <sqlite-datastore>))
(call-with-worker-thread/delay-logging
@@ -3426,34 +3292,40 @@ INSERT INTO build_allocation_agent_requested_systems (agent_id, system_id) VALUE
(define-method (datastore-fetch-build-to-allocate
(datastore <sqlite-datastore>)
- agent-id)
- (datastore-call-with-transaction
- datastore
+ build-id)
+ (call-with-worker-thread
+ (slot-ref datastore 'worker-reader-thread-channel)
(lambda (db)
(let ((statement
(sqlite-prepare
db
- ;; This needs to guard against the plan being out of date
"
SELECT builds.uuid, derivations.id, derivations.name,
unprocessed_builds_with_derived_priorities.derived_priority
FROM builds
-INNER JOIN build_allocation_plan
- ON builds.id = build_allocation_plan.build_id
INNER JOIN derivations
ON builds.derivation_id = derivations.id
-INNER JOIN build_allocation_agent_requested_systems
- ON build_allocation_agent_requested_systems.agent_id = :agent_id
- AND build_allocation_agent_requested_systems.system_id = derivations.system_id
LEFT JOIN unprocessed_builds_with_derived_priorities
ON unprocessed_builds_with_derived_priorities.build_id = builds.id
-WHERE build_allocation_plan.agent_id = :agent_id
+WHERE builds.uuid = :uuid
AND builds.processed = 0
AND builds.canceled = 0
- AND builds.id NOT IN (SELECT build_id FROM allocated_builds)
-ORDER BY build_allocation_plan.ordering ASC"
- #:cache? #t))
- (output-conflicts-statement
+ AND builds.id NOT IN (SELECT build_id FROM allocated_builds)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:uuid build-id)
+
+ (sqlite-step-and-reset statement)))))
+
+(define-method (datastore-check-if-derivation-conflicts?
+ (datastore <sqlite-datastore>)
+ agent-id
+ derivation-id)
+ (call-with-worker-thread
+ (slot-ref datastore 'worker-reader-thread-channel)
+ (lambda (db)
+ (let ((statement
(sqlite-prepare
db
"
@@ -3471,34 +3343,11 @@ WHERE build_derivation_outputs.derivation_id = :derivation_id
allocated_builds_derivation_outputs.output_id"
#:cache? #t)))
- (define (get-build-to-allocate)
- (let loop ((build-details (sqlite-step statement)))
- (match build-details
- (#f #f)
- (#(uuid derivation-id derivation-name derived_priority)
-
- (sqlite-bind-arguments output-conflicts-statement
- #:agent_id agent-id
- #:derivation_id derivation-id)
-
- (if (eq? (sqlite-step-and-reset output-conflicts-statement)
- #f)
- `((uuid . ,uuid)
- (derivation_name . ,derivation-name)
- (derived_priority . ,derived_priority))
- (loop (sqlite-step statement)))))))
-
- (sqlite-bind-arguments
- statement
- #:agent_id agent-id)
-
- (let ((result (get-build-to-allocate)))
- (sqlite-reset statement)
-
- result)))
+ (sqlite-bind-arguments statement
+ #:agent_id agent-id
+ #:derivation_id derivation-id)
- #:readonly? #t
- #:duration-metric-name "fetch_builds_to_allocate"))
+ (->bool (sqlite-step-and-reset statement))))))
(define-method (datastore-insert-to-allocated-builds
(datastore <sqlite-datastore>)
@@ -3523,25 +3372,6 @@ INSERT INTO allocated_builds (build_id, agent_id) VALUES "
", ")
";")))))
-(define-method (datastore-remove-builds-from-plan
- (datastore <sqlite-datastore>)
- build-uuids)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (sqlite-exec
- db
- (string-append
- "
-DELETE FROM build_allocation_plan
-WHERE build_id IN ("
- (string-join
- (map (lambda (build-uuid)
- (number->string (db-find-build-id db build-uuid)))
- build-uuids)
- ", ")
- ")")))))
-
(define-method (datastore-list-allocation-plan-builds
(datastore <sqlite-datastore>)
.
diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm
index bc055c2..030f310 100644
--- a/guix-build-coordinator/hooks.scm
+++ b/guix-build-coordinator/hooks.scm
@@ -32,8 +32,9 @@
#:use-module (guix config)
#:use-module (guix derivations)
#:use-module (guix serialization)
- #:use-module ((guix utils) #:select (default-keyword-arguments))
- #:use-module (guix build utils)
+ #:use-module ((guix utils) #:select (default-keyword-arguments
+ call-with-decompressed-port))
+ #:use-module ((guix build utils) #:select (dump-port mkdir-p))
#:use-module (guix-build-coordinator config)
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator datastore)
@@ -81,6 +82,20 @@
build-id agent-id)
(current-error-port))))
+(define* (default-nar-compressions #:key output-filename nar-size
+ source-compression source-size)
+ (define MiB (* (expt 2 20) 1.))
+
+ (if (eq? 'none source-compression)
+ ;; If the agent didn't compress the nar, don't change that here
+ (list 'none)
+ (let* ((compression-proportion
+ (/ source-size nar-size)))
+ (if (or (> compression-proportion 0.95)
+ (< nar-size (* 0.05 MiB)))
+ '(none)
+ (list source-compression)))))
+
(define* (build-success-publish-hook
publish-directory
#:key
@@ -94,7 +109,8 @@
post-publish-hook
combined-post-publish-hook
(publish-referenced-derivation-source-files? #t)
- derivation-substitute-urls)
+ derivation-substitute-urls
+ (nar-compressions-proc default-nar-compressions))
(mkdir-p (string-append publish-directory "/nar/lzip"))
(lambda (build-coordinator build-id)
@@ -218,8 +234,7 @@
nar-filename))
#:unwind? #t))
- (cons narinfo-filename
- nar-filename)))))
+ narinfo-filename))))
referenced-source-files))))))
(define (process-output drv-name output)
@@ -230,12 +245,6 @@
(nar-location
(build-output-file-location datastore build-id
output-name))
- (nar-filename
- (string-append "nar/lzip/"
- (basename output-filename)))
- (nar-destination
- (string-append publish-directory "/"
- nar-filename))
(narinfo-filename
(string-append (string-take (basename output-filename) 32)
".narinfo"))
@@ -246,8 +255,78 @@
(if (skip-publishing-proc narinfo-filename narinfo-directory)
#f
- (begin
- (copy-file nar-location nar-destination)
+ (let ((compressions
+ (nar-compressions-proc
+ #:output-filename output-filename
+ #:nar-size (assq-ref output 'size)
+ ;; TODO Don't hardcode this
+ #:source-compression 'lzip
+ #:source-size (stat:size (stat nar-location #f)))))
+
+ (for-each
+ (lambda (compression)
+ (if (or (and (pair? compression)
+ (eq? (car compression) 'lzip))
+ (eq? compression 'lzip))
+ ;; TODO If the agents start uploading uncompressed files
+ ;; or files compressed differently, this might not be
+ ;; right
+ (let ((nar-destination
+ (string-append publish-directory "/"
+ "nar/lzip/"
+ (basename output-filename))))
+ (copy-file nar-location nar-destination))
+ (let* ((target-compression
+ (if (pair? compression)
+ (car compression)
+ compression))
+ ;; TODO This logic should sit elsewhere
+ (nar-destination
+ (string-append
+ publish-directory "/"
+ "nar/"
+ (if (eq? compression 'none)
+ ""
+ (string-append
+ (symbol->string target-compression) "/"))
+ (basename output-filename)))
+ (temporary-destination
+ (string-append nar-destination ".tmp")))
+ (mkdir-p (dirname temporary-destination))
+ (call-with-input-file nar-location
+ (lambda (source-port)
+ (call-with-decompressed-port
+ ;; TODO Don't assume the source compression
+ 'lzip
+ source-port
+ (lambda (decompressed-source-port)
+ (let ((call-with-compressed-output-port*
+ (match target-compression
+ ('gzip
+ (@ (zlib) call-with-gzip-output-port))
+ ('lzip
+ (@ (lzlib) call-with-lzip-output-port))
+ ('zstd
+ (@ (zstd) call-with-zstd-output-port))
+ ('none
+ (lambda (port proc)
+ (proc port)
+ (close-port port))))))
+ (when (file-exists? temporary-destination)
+ (delete-file temporary-destination))
+ (apply
+ call-with-compressed-output-port*
+ (open-output-file temporary-destination
+ #:binary #t)
+ (lambda (compressed-port)
+ (dump-port decompressed-source-port
+ compressed-port))
+ (if (pair? compression)
+ (cdr compression)
+ '())))))))
+ (rename-file temporary-destination
+ nar-destination))))
+ compressions)
(call-with-output-file narinfo-location
(lambda (port)
@@ -257,7 +336,23 @@
(assq-ref output 'size)
(vector->list
(assq-ref output 'references))
- `((lzip ,(stat:size (stat nar-location #f))))
+ (map
+ (lambda (compression-details)
+ (let* ((compression
+ (if (pair? compression-details)
+ (car compression-details)
+ compression-details))
+ ;; TODO This logic should sit elsewhere
+ (file (string-append
+ publish-directory "/"
+ "nar/"
+ (if (eq? compression 'none)
+ ""
+ (string-append
+ (symbol->string compression) "/"))
+ (basename output-filename))))
+ (list compression (stat:size (stat file #f)))))
+ compressions)
#:system (datastore-find-derivation-system
datastore
drv-name)
@@ -276,18 +371,16 @@
(raise-exception exn))
(lambda ()
(post-publish-hook publish-directory
- narinfo-filename
- nar-filename))
+ narinfo-filename))
#:unwind? #t))
- (cons narinfo-filename
- nar-filename)))))
+ narinfo-filename))))
(let* ((build-details
(datastore-find-build datastore build-id))
(drv-name
(assq-ref build-details 'derivation-name))
- (narinfos-and-nars
+ (narinfos
(append
(if publish-referenced-derivation-source-files?
(process-referenced-derivation-source-files drv-name)
@@ -297,23 +390,22 @@
(process-output drv-name output))
(datastore-list-build-outputs datastore build-id)))))
(when (and combined-post-publish-hook
- (not (null? narinfos-and-nars)))
+ (not (null? narinfos)))
(with-exception-handler
(lambda (exn)
;; Rollback narinfo creation, to make this more
;; transactional
(for-each
- (match-lambda
- ((narinfo-filename . _)
- (delete-file
- (string-append
- narinfo-directory "/" narinfo-filename))))
- narinfos-and-nars)
+ (lambda
+ (narinfo-filename)
+ (delete-file
+ (string-append
+ narinfo-directory "/" narinfo-filename)))
+ narinfos)
(raise-exception exn))
(lambda ()
- (combined-post-publish-hook publish-directory
- narinfos-and-nars))
+ (combined-post-publish-hook publish-directory narinfos))
#:unwind? #t)))))
(define* (build-success-s3-publish-hook
@@ -544,8 +636,14 @@
output-log-file)
(delete-file source-log-file)))))
-(define (default-build-missing-inputs-hook build-coordinator
- build-id missing-inputs)
+(define* (default-build-missing-inputs-hook
+ build-coordinator
+ build-id missing-inputs
+ #:key (submit-build? (lambda* (missing-input
+ #:key successful-builds
+ pending-builds)
+ (and (null? successful-builds)
+ (null? pending-builds)))))
(define datastore
(build-coordinator-datastore build-coordinator))
@@ -579,8 +677,9 @@
(not (assq-ref build-details 'processed))
(not (assq-ref build-details 'canceled))))
builds-for-output)))
- (if (and (null? successful-builds)
- (null? pending-builds))
+ (if (submit-build? missing-input
+ #:successful-builds successful-builds
+ #:pending-builds pending-builds)
(begin
(simple-format #t
"submitting build for ~A\n"
@@ -590,8 +689,7 @@
#:tags (datastore-fetch-build-tags
datastore
build-id)))
- (simple-format #t "~A builds exist for ~A, skipping\n"
- (length builds-for-output)
+ (simple-format #t "skipping submitting build for ~A\n"
missing-input)))
(begin
(simple-format (current-error-port)
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index 0acd62a..d747962 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -215,7 +215,7 @@
(define port-write-timeout-error?
(record-predicate &port-write-timeout))
-(define* (with-port-timeouts thunk #:key (timeout (* 120 1000)))
+(define* (with-port-timeouts thunk #:key (timeout 120))
;; When the GC runs, it restarts the poll syscall, but the timeout remains
;; unchanged! When the timeout is longer than the time between the syscall
@@ -227,8 +227,7 @@
(define (wait port mode)
(let ((timeout-internal
(+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
+ (* internal-time-units-per-second timeout))))
(let loop ((poll-value
(port-poll port mode poll-timeout-ms)))
(if (= poll-value 0)
@@ -567,9 +566,9 @@
(define* (store-item->recutils compression file-size)
(let ((url (encode-and-join-uri-path
`(,@(split-and-decode-uri-path nar-path)
- ,@(if compression
- (list (symbol->string compression))
- '())
+ ,@(if (eq? compression 'none)
+ '()
+ (list (symbol->string compression)))
,(basename store-path)))))
(format #f "URL: ~a~%Compression: ~a~%~@[FileSize: ~a~%~]"
url
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 293429c..a064175 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -51,6 +51,8 @@
(name "unnamed"))
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the worker thread procedure."
+ (define thread-proc-vector
+ (make-vector parallelism #f))
(define (initializer/safe)
(let ((args
@@ -74,7 +76,7 @@ arguments of the worker thread procedure."
args
;; never give up, just keep retrying
(begin
- (sleep 5)
+ (sleep 1)
(initializer/safe)))))
(define (destructor/safe args)
@@ -100,10 +102,10 @@ arguments of the worker thread procedure."
(or success?
#t
(begin
- (sleep 5)
+ (sleep 1)
(destructor/safe args)))))
- (define (process channel args)
+ (define (process thread-index channel args)
(let loop ((current-lifetime lifetime))
(let ((exception?
(match (get-message channel)
@@ -124,6 +126,9 @@ arguments of the worker thread procedure."
internal-time-units-per-second)
exn))
(lambda ()
+ (vector-set! thread-proc-vector
+ thread-index
+ proc)
(with-throw-handler #t
(lambda ()
(call-with-values
@@ -149,6 +154,10 @@ arguments of the worker thread procedure."
(put-message reply
response)
+ (vector-set! thread-proc-vector
+ thread-index
+ #f)
+
(match response
(('worker-thread-error duration _)
(when duration-logger
@@ -188,7 +197,7 @@ arguments of the worker thread procedure."
"worker-thread-channel: exception: ~A\n" exn))
(lambda ()
(parameterize ((%worker-thread-args args))
- (process channel args)))
+ (process thread-index channel args)))
#:unwind? #t)
(when destructor
@@ -196,7 +205,9 @@ arguments of the worker thread procedure."
(init (initializer/safe))))))
(iota parallelism))
- channel))
+
+ (values channel
+ thread-proc-vector)))
(define &worker-thread-timeout
(make-exception-type '&worker-thread-timeout
@@ -447,20 +458,16 @@ If already in the worker thread, call PROC immediately."
(define (readable? port)
"Test if PORT is writable."
- (match (select (vector port) #() #() 0)
- ((#() #() #()) #f)
- ((#(_) #() #()) #t)))
+ (= 1 (port-poll port "r" 0)))
(define (writable? port)
"Test if PORT is writable."
- (match (select #() (vector port) #() 0)
- ((#() #() #()) #f)
- ((#() #(_) #()) #t)))
+ (= 1 (port-poll port "w" 0)))
(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
(make-base-operation #f
(lambda _
- (and (ready? (port-ready-fd port)) values))
+ (and (ready? port) values))
(lambda (flag sched resume)
(define (commit)
(match (atomic-box-compare-and-swap! flag 'W 'S)