aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.dir-locals.el2
-rw-r--r--Makefile.am3
-rw-r--r--guix-build-coordinator/agent-messaging/http.scm21
-rw-r--r--guix-build-coordinator/agent-messaging/http/server.scm136
-rw-r--r--guix-build-coordinator/agent.scm198
-rw-r--r--guix-build-coordinator/build-allocator.scm166
-rw-r--r--guix-build-coordinator/client-communication.scm179
-rw-r--r--guix-build-coordinator/coordinator.scm1475
-rw-r--r--guix-build-coordinator/datastore.scm25
-rw-r--r--guix-build-coordinator/datastore/abstract.scm5
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm1621
-rw-r--r--guix-build-coordinator/guix-data-service.scm4
-rw-r--r--guix-build-coordinator/hooks.scm191
-rw-r--r--guix-build-coordinator/utils.scm374
-rw-r--r--guix-build-coordinator/utils/fibers.scm595
-rw-r--r--guix-build-coordinator/utils/timeout.scm81
-rw-r--r--guix-dev.scm34
-rw-r--r--scripts/guix-build-coordinator-agent.in11
-rw-r--r--scripts/guix-build-coordinator.in55
-rw-r--r--sqitch/pg/deploy/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/pg/deploy/background-jobs-queue.sql7
-rw-r--r--sqitch/pg/deploy/builds_replace_unprocessed_index.sql7
-rw-r--r--sqitch/pg/revert/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/pg/revert/background-jobs-queue.sql7
-rw-r--r--sqitch/pg/revert/builds_replace_unprocessed_index.sql7
-rw-r--r--sqitch/pg/verify/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/pg/verify/background-jobs-queue.sql7
-rw-r--r--sqitch/pg/verify/builds_replace_unprocessed_index.sql7
-rw-r--r--sqitch/sqitch.plan3
-rw-r--r--sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/sqlite/deploy/background-jobs-queue.sql11
-rw-r--r--sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql9
-rw-r--r--sqitch/sqlite/revert/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/sqlite/revert/background-jobs-queue.sql7
-rw-r--r--sqitch/sqlite/revert/builds_replace_unprocessed_index.sql7
-rw-r--r--sqitch/sqlite/verify/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/sqlite/verify/background-jobs-queue.sql7
-rw-r--r--sqitch/sqlite/verify/builds_replace_unprocessed_index.sql7
38 files changed, 3001 insertions, 2310 deletions
diff --git a/.dir-locals.el b/.dir-locals.el
index 60601df..1c73c7a 100644
--- a/.dir-locals.el
+++ b/.dir-locals.el
@@ -10,6 +10,6 @@
(eval put 'with-db-worker-thread 'scheme-indent-function 1)
(eval put 'with-time-logging 'scheme-indent-function 1)
(eval put 'with-timeout 'scheme-indent-function 1)
- (eval put 'letpar& 'scheme-indent-function 1)
+ (eval put 'fibers-let 'scheme-indent-function 1)
(eval . (put 'call-with-lzip-output-port 'scheme-indent-function 1))
(eval . (put 'with-store 'scheme-indent-function 1))))
diff --git a/Makefile.am b/Makefile.am
index 5081695..8a9af41 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -10,7 +10,8 @@ MINIMALSOURCES = \
guix-build-coordinator/agent-messaging/http.scm \
guix-build-coordinator/agent.scm \
guix-build-coordinator/config.scm \
- guix-build-coordinator/utils.scm
+ guix-build-coordinator/utils.scm \
+ guix-build-coordinator/utils/timeout.scm
ALLSOURCES = \
$(MINIMALSOURCES) \
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm
index 0baa75b..33fb717 100644
--- a/guix-build-coordinator/agent-messaging/http.scm
+++ b/guix-build-coordinator/agent-messaging/http.scm
@@ -50,6 +50,7 @@
#:use-module (guix base64)
#:use-module (guix build utils)
#:use-module (guix-build-coordinator utils)
+ #:use-module (guix-build-coordinator utils timeout)
#:use-module (guix-build-coordinator agent-messaging abstract)
#:export (make-http-agent-interface
@@ -73,10 +74,10 @@
password)
(let* ((gnutls-ver (gnutls-version))
(guix-ver %guix-version))
- (simple-format (current-error-port)
- "(gnutls version: ~A, guix version: ~A)\n"
- gnutls-ver
- guix-ver))
+ (simple-format/safe (current-error-port)
+ "(gnutls version: ~A, guix version: ~A)\n"
+ gnutls-ver
+ guix-ver))
(make <http-agent-interface>
#:coordinator-uri coordinator-uri
@@ -204,7 +205,8 @@
response)))))
(retry-on-error (lambda ()
- (with-port-timeouts make-request))
+ (with-port-timeouts make-request
+ #:timeout 120))
#:times retry-times
#:delay 10
#:no-retry agent-error-from-coordinator?))
@@ -420,10 +422,11 @@
(lambda (uploaded-bytes)
(= uploaded-bytes file-size)))
(retry-on-error (lambda ()
- (with-throw-handler #t
- perform-upload
- (lambda _
- (backtrace))))
+ (with-exception-handler
+ (lambda (exn)
+ (backtrace)
+ (raise-exception exn))
+ perform-upload))
#:times 100
#:delay 60
#:error-hook
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm
index 85ba0d4..fa227da 100644
--- a/guix-build-coordinator/agent-messaging/http/server.scm
+++ b/guix-build-coordinator/agent-messaging/http/server.scm
@@ -46,13 +46,15 @@
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers operations)
+ #:use-module (knots)
+ #:use-module (knots timeout)
+ #:use-module (knots web-server)
+ #:use-module (knots thread-pool)
#:use-module (prometheus)
#:use-module (guix base32)
#:use-module (guix base64)
#:use-module (guix progress)
#:use-module (guix build utils)
- #:use-module ((guix build syscalls)
- #:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator utils fibers)
#:use-module (guix-build-coordinator datastore)
@@ -135,7 +137,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(define (http-agent-messaging-start-server port host secret-key-base
build-coordinator
- chunked-request-channel
output-hash-channel)
(define plain-metrics-registry
(make-metrics-registry))
@@ -146,18 +147,15 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(define process-metrics-updater
(get-process-metrics-updater plain-metrics-registry))
- (define thread-metric
- (make-gauge-metric
- (build-coordinator-metrics-registry build-coordinator)
- "guile_threads_total"))
-
(define datastore-metrics-updater
(base-datastore-metrics-updater build-coordinator))
+ (define (build-coordinator-metrics-updater)
+ (build-coordinator-update-metrics build-coordinator))
+
(define (update-managed-metrics!)
+ (call-with-delay-logging build-coordinator-metrics-updater)
(call-with-delay-logging gc-metrics-updater)
- (metric-set thread-metric
- (length (all-threads)))
(call-with-delay-logging process-metrics-updater)
(call-with-delay-logging datastore-metrics-updater))
@@ -168,8 +166,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
"exception when starting: ~A\n" exn)
(primitive-exit 1))
(lambda ()
- (run-server/patched
- (lambda (request body)
+ (run-knots-web-server
+ (lambda (request)
(log-msg (build-coordinator-logger build-coordinator)
'INFO
(format #f "~4a ~a\n"
@@ -180,10 +178,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(cons (request-method request)
(split-and-decode-uri-path
(uri-path (request-uri request))))
- body
secret-key-base
build-coordinator
- chunked-request-channel
output-hash-channel
update-managed-metrics!
plain-metrics-registry)))
@@ -611,10 +607,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(define (controller request
method-and-path-components
- body
secret-key-base
build-coordinator
- chunked-request-channel
output-hash-channel
update-managed-metrics!
plain-metrics-registry)
@@ -642,6 +636,10 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(define logger
(build-coordinator-logger build-coordinator))
+ ;; TODO Handle this in the controller
+ (define body
+ (read-request-body request))
+
(define (controller-thunk)
(match method-and-path-components
(('GET "agent" uuid)
@@ -710,14 +708,11 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(('POST "agent" uuid "fetch-builds")
(if (authenticated? uuid request)
(let* ((json-body (json-string->scm (utf8->string body)))
- ;; count is deprecated, use target_count instead
- (count (assoc-ref json-body "count"))
(target-count (assoc-ref json-body "target_count"))
(systems (assoc-ref json-body "systems"))
(builds (fetch-builds build-coordinator uuid
(vector->list systems)
- target-count
- count)))
+ target-count)))
(render-json
`((builds . ,(list->vector builds)))))
(render-json
@@ -794,18 +789,9 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
body)))
(call-with-output-file tmp-output-file-name
(lambda (output-port)
- ;; Older agents may still attempt to use chunked encoding
- ;; for this request
- (if (member '(chunked) (request-transfer-encoding request))
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (dump-port body-port
- output-port
- (request-content-length request))))
- (dump-port body-port
- output-port
- (request-content-length request))))))
+ (dump-port body-port
+ output-port
+ (request-content-length request)))))
(rename-file tmp-output-file-name
output-file-name)
(no-content))
@@ -852,20 +838,12 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
"deleting " tmp-output-file-name)
(delete-file tmp-output-file-name))
- (if (member '(chunked) (request-transfer-encoding request))
- ;; Older agents may use chunked encoding for this request
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (receive-file body
- #f
- tmp-output-file-name)))
- (let ((content-length
- (request-content-length request)))
- (when (> content-length 0)
- (receive-file body
- content-length
- tmp-output-file-name))))
+ (let ((content-length
+ (request-content-length request)))
+ (when (> content-length 0)
+ (receive-file body
+ content-length
+ tmp-output-file-name)))
(if (file-exists? output-file-name)
(render-json
@@ -945,19 +923,10 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
"deleting " output-file-name)
(delete-file output-file-name))
- (if (member '(chunked) (request-transfer-encoding request))
- ;; Older agents may use chunked encoding for this request
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (receive-file body
- #f
- tmp-output-file-name
- #:append? #t)))
- (receive-file body
- (request-content-length request)
- tmp-output-file-name
- #:append? #t)))
+ (receive-file body
+ content-length
+ tmp-output-file-name
+ #:append? #t))
(if (file-exists? output-file-name)
(render-json
@@ -1015,12 +984,13 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
#:code 200
#:headers '((content-type . (text/plain))
(vary . (accept))))
- (lambda (port)
- (write-metrics (build-coordinator-metrics-registry
- build-coordinator)
- port)
- (write-metrics plain-metrics-registry
- port))))))
+ (call-with-output-string
+ (lambda (port)
+ (write-metrics (build-coordinator-metrics-registry
+ build-coordinator)
+ port)
+ (write-metrics plain-metrics-registry
+ port)))))))
(_
(render-json
"not-found"
@@ -1037,7 +1007,7 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(render-json
`((error . chunked-input-ended-prematurely))
#:code 400))
- ((worker-thread-timeout-error? exn)
+ ((thread-pool-timeout-error? exn)
(render-json
`((error . ,(simple-format #f "~A" exn)))
#:code 503))
@@ -1046,30 +1016,20 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
`((error . ,(simple-format #f "~A" exn)))
#:code 500))))
(lambda ()
- (with-throw-handler #t
- controller-thunk
- (lambda (key . args)
- (unless (and (eq? '%exception key)
- (or (agent-error? (car args))
- (worker-thread-timeout-error? (car args))
- (chunked-input-ended-prematurely-error? (car args))))
+ (with-exception-handler
+ (lambda (exn)
+ (unless (or (agent-error? exn)
+ (thread-pool-timeout-error? exn)
+ (chunked-input-ended-prematurely-error? exn))
(match method-and-path-components
((method path-components ...)
(simple-format
(current-error-port)
- "error: when processing: /~A ~A\n ~A ~A\n"
- method (string-join path-components "/")
- key args)))
-
- (let* ((stack (make-stack #t 4))
- (backtrace
- (call-with-output-string
- (lambda (port)
- (display "\nBacktrace:\n" port)
- (display-backtrace stack port)
- (newline port)
- (newline port)))))
- (display
- backtrace
- (current-error-port)))))))
+ "error: when processing: /~A ~A\n"
+ method (string-join path-components "/"))))
+
+ (print-backtrace-and-exception/knots exn))
+
+ (raise-exception exn))
+ controller-thunk))
#:unwind? #t))
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index 8144947..47a7c5f 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -46,8 +46,9 @@
#:use-module (guix base32)
#:use-module (guix serialization)
#:use-module ((guix build syscalls)
- #:select (set-thread-name))
+ #:select (set-thread-name free-disk-space))
#:use-module (guix-build-coordinator utils)
+ #:use-module (guix-build-coordinator utils timeout)
#:use-module (guix-build-coordinator agent-messaging)
#:use-module (guix-build-coordinator agent-messaging abstract)
#:export (run-agent))
@@ -77,29 +78,35 @@
derivation-substitute-urls
non-derivation-substitute-urls
metrics-file
- max-1min-load-average)
+ max-1min-load-average
+ timestamp-log-output?)
(define lgr (make <logger>))
- (define port-log (make <port-log>
+ (define port-log (make <custom-port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
- (format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str))))
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
+ (format #f "~a(~5a): ~a~%"
+ (if timestamp-log-output?
+ (strftime "%F %H:%M:%S " (localtime
+ (second args)))
+ "")
+ (first args)
+ (third args)))))
(define metrics-enabled?
(and (not (string-null? metrics-file))
(let ((directory (dirname metrics-file)))
(or (file-exists? directory)
(begin
- (simple-format (current-error-port)
- "skipping writing metrics as ~A does not exist\n"
- directory)
+ (simple-format/safe (current-error-port)
+ "skipping writing metrics as ~A does not exist\n"
+ directory)
#f)))
(with-exception-handler
(lambda (exn)
- (simple-format
+ (simple-format/safe
(current-error-port)
"skipping writing metrics, encountered exception ~A\n"
exn)
@@ -375,9 +382,16 @@
list-jobs
(create-work-queue current-max-builds
(lambda (build)
- (process-job build
- perform-post-build-actions
- uploads-updater))
+ ;; This poorly handles cases where the
+ ;; connection to the daemon fails, or other
+ ;; errors occur
+ (retry-on-error
+ (lambda ()
+ (process-job build
+ perform-post-build-actions
+ uploads-updater))
+ #:times 3
+ #:delay 10))
#:thread-start-delay
(make-time
time-duration
@@ -393,48 +407,47 @@
20)
#:name "job")))
(define (display-info)
- (display
- (simple-format
- #f "current threads: ~A current jobs: ~A\n~A\n"
- (count-threads)
- (+ (count-jobs) (count-post-build-jobs))
- (string-append
- (string-join
- (map (match-lambda
- ((build-details)
- (simple-format
- #f " - ~A (derived priority: ~A)
+ (simple-format/safe
+ (current-error-port)
+ "current threads: ~A current jobs: ~A\n~A\n"
+ (count-threads)
+ (+ (count-jobs) (count-post-build-jobs))
+ (string-append
+ (string-join
+ (map (match-lambda
+ ((build-details)
+ (simple-format
+ #f " - ~A (derived priority: ~A)
~A"
- (assoc-ref build-details "uuid")
- (assoc-ref build-details "derived_priority")
- (or
- (assoc-ref build-details "derivation_name")
- (assoc-ref build-details "derivation-name")))))
- (list-jobs))
- "\n")
- "\n"
- (string-join
- (map (match-lambda
- ((build-details upload-progress _)
- (simple-format
- #f " - ~A (derived priority: ~A)
+ (assoc-ref build-details "uuid")
+ (assoc-ref build-details "derived_priority")
+ (or
+ (assoc-ref build-details "derivation_name")
+ (assoc-ref build-details "derivation-name")))))
+ (list-jobs))
+ "\n")
+ "\n"
+ (string-join
+ (map (match-lambda
+ ((build-details upload-progress _)
+ (simple-format
+ #f " - ~A (derived priority: ~A)
~A~A"
- (assoc-ref build-details "uuid")
- (assoc-ref build-details "derived_priority")
- (or
- (assoc-ref build-details "derivation_name")
- (assoc-ref build-details "derivation-name"))
- (if (upload-progress-file upload-progress)
- (simple-format
- #f "
+ (assoc-ref build-details "uuid")
+ (assoc-ref build-details "derived_priority")
+ (or
+ (assoc-ref build-details "derivation_name")
+ (assoc-ref build-details "derivation-name"))
+ (if (upload-progress-file upload-progress)
+ (simple-format/safe
+ #f "
uploading ~A (~A/~A)"
- (upload-progress-file upload-progress)
- (upload-progress-bytes-sent upload-progress)
- (upload-progress-total-bytes upload-progress))
- ""))))
- (list-post-build-jobs))
- "\n")))
- (current-error-port)))
+ (upload-progress-file upload-progress)
+ (upload-progress-bytes-sent upload-progress)
+ (upload-progress-total-bytes upload-progress))
+ ""))))
+ (list-post-build-jobs))
+ "\n"))))
(let ((details (submit-status coordinator-interface
'idle
@@ -513,12 +526,21 @@
'())
(lambda ()
- (fetch-builds-for-agent
- coordinator-interface
- systems
- (+ (max current-threads 1)
- (count-post-build-jobs))
- #:log (build-log-procedure lgr)))
+ (let ((free-space (free-disk-space "/gnu/store")))
+ (if (< free-space (* 2 (expt 2 30))) ; 2G
+ (begin
+ (log-msg
+ lgr 'WARN
+ "low space on /gnu/store, "
+ "not fetching new builds")
+ (sleep 30)
+ '())
+ (fetch-builds-for-agent
+ coordinator-interface
+ systems
+ (+ (max current-threads 1)
+ (count-post-build-jobs))
+ #:log (build-log-procedure lgr)))))
#:unwind? #t))
(new-builds
(remove (lambda (build)
@@ -630,7 +652,7 @@ unable to query substitute servers without caching"))
(has-substiutes-no-cache?
non-derivation-substitute-urls
file))
- #:timeout (* 60 1000)))
+ #:timeout 60))
#:times 20
#:delay (random 15))
#f)))
@@ -684,7 +706,15 @@ but the guix-daemon claims it's unavailable"
#:timeout ,(* 60 60)))
(let ((log-port (open-output-string)))
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg lgr 'ERROR
+ "exception when fetching missing paths: "
+ exn)
+ (display/safe (get-output-string log-port))
+ (display/safe "\n")
+ (close-output-port log-port)
+ (raise-exception exn))
(lambda ()
(with-port-timeouts
(lambda ()
@@ -692,14 +722,7 @@ but the guix-daemon claims it's unavailable"
((current-build-output-port log-port))
(build-things fetch-substitute-store
missing-paths)))
- #:timeout (* 60 10 1000)))
- (lambda (key . args)
- (log-msg lgr 'ERROR
- "exception when fetching missing paths "
- key ": " args)
- (display (get-output-string log-port))
- (display (newline))
- (close-output-port log-port)))))
+ #:timeout (* 60 10))))))
(for-each (lambda (missing-path)
(add-temp-root store missing-path))
missing-paths))
@@ -761,7 +784,7 @@ but the guix-daemon claims it's unavailable"
(delete-paths store output-file-names)))
#t)
(lambda (key args)
- (display (get-output-string log-port))
+ (display/safe (get-output-string log-port))
(log-msg lgr 'ERROR
"delete-outputs: "
key args)
@@ -850,25 +873,30 @@ but the guix-daemon claims it's unavailable"
(raise-exception exn)))
#f)
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (unless (and (store-protocol-error? exn)
+ (let ((status (store-protocol-error-status exn)))
+ (or (= status 1)
+ (= status 100)
+ (= status 101))))
+ (simple-format/safe (current-error-port)
+ "exception when performing build: ~A\n"
+ exn)
+ (let* ((stack (make-stack #t))
+ (backtrace
+ (call-with-output-string
+ (lambda (port)
+ (display-backtrace stack port)
+ (newline port)))))
+ (display/safe backtrace)))
+ (raise-exception exn))
(lambda ()
(build-things store (list (derivation-file-name derivation)))
(for-each (lambda (output)
(add-temp-root store output))
(map derivation-output-path
- (map cdr (derivation-outputs derivation)))))
- (lambda (key . args)
- (unless (and (eq? key '%exception)
- (store-protocol-error? (car args))
- (let ((status (store-protocol-error-status
- (car args))))
- (or (= status 1)
- (= status 100)
- (= status 101))))
- (simple-format (current-error-port)
- "exception when performing build: ~A ~A\n"
- key args)
- (backtrace))))
+ (map cdr (derivation-outputs derivation))))))
#t)
#:unwind? #t)))
diff --git a/guix-build-coordinator/build-allocator.scm b/guix-build-coordinator/build-allocator.scm
index f73c457..8c08144 100644
--- a/guix-build-coordinator/build-allocator.scm
+++ b/guix-build-coordinator/build-allocator.scm
@@ -352,20 +352,13 @@
#:label-values `((system . ,system)))))
counts)))
- ;; Go through the setup failures and look specifically at the
- ;; missing_inputs ones. Eliminate any missing_inputs failures if all the
- ;; missing inputs appear to have been built successfully, and update the
+ ;; Go through the setup failures and update the
;; derived-build-priorities-hash to reflect the priorities of builds based
;; on the builds that would be "unblocked" if they were completed.
(for-each
(lambda (setup-failure-build-id)
(let ((setup-failures
- (filter
- (lambda (setup-failure)
- (string=? "missing_inputs"
- (assq-ref setup-failure 'failure-reason)))
- (hash-ref setup-failures-hash
- setup-failure-build-id)))
+ (hash-ref setup-failures-hash setup-failure-build-id))
(setup-failure-build-derived-priority
(hash-ref derived-build-priorities-hash
setup-failure-build-id)))
@@ -425,31 +418,28 @@
setup-failures-hash))
(let ((result
- (append-map
+ (map
(lambda (agent-id)
(log "considering builds for" agent-id)
(let ((builds-sorted-by-derived-priority
(sort-list (filter (filter-builds-for-agent agent-id)
builds)
(build-sorting-function-for-agent agent-id))))
- (if (null? builds-sorted-by-derived-priority)
- '()
- (let ((final-ordered-builds
- (concatenate
- (map sort-priority-sublist
- (limit-processed-sublists
- (break-builds-in-to-priority-sublists
- builds-sorted-by-derived-priority))))))
- (let ((builds-for-agent
- (limit-planned-builds final-ordered-builds)))
- (map (lambda (build-id ordering)
- (list build-id
- agent-id
- ordering))
- (map (lambda (build)
- (assq-ref build 'uuid))
- builds-for-agent)
- (iota (length builds-for-agent))))))))
+ (cons
+ agent-id
+ (if (null? builds-sorted-by-derived-priority)
+ '()
+ (let ((final-ordered-builds
+ (concatenate
+ (map sort-priority-sublist
+ (limit-processed-sublists
+ (break-builds-in-to-priority-sublists
+ builds-sorted-by-derived-priority))))))
+ (let ((builds-for-agent
+ (limit-planned-builds final-ordered-builds)))
+ (map (lambda (build)
+ (assq-ref build 'uuid))
+ builds-for-agent)))))))
(map (lambda (agent)
(assq-ref agent 'uuid))
agents))))
@@ -502,18 +492,6 @@
(let ((prioritised-builds
(datastore-fetch-prioritised-unprocessed-builds datastore)))
- (define systems-for-builds
- ;; TODO Should be one query
- (let ((table (make-hash-table)))
- (for-each (lambda (build-id)
- (hash-set! table
- build-id
- (datastore-find-build-derivation-system
- datastore
- build-id)))
- prioritised-builds)
- table))
-
(define tags-for-build
(let ((build-tags (make-hash-table)))
(lambda (build-id)
@@ -538,7 +516,10 @@
(assq-ref setup-failure 'failure-reason)))
(cond
((string=? failure-reason "missing_inputs")
- #f)
+ ;; This problem might go away, but just don't try the same agent
+ ;; again for now.
+ (string=? (assq-ref setup-failure 'agent-id)
+ agent-id))
((string=? failure-reason "could_not_delete_outputs")
;; This problem might go away, but just don't try the same agent
;; again for now.
@@ -552,43 +533,44 @@
(else
(error "Unknown setup failure " failure-reason)))))
- (lambda (build-id)
- (log "build:" build-id)
- (and
- (or (null? requested-systems)
- (let ((build-system (hash-ref systems-for-builds build-id)))
- (member build-system requested-systems)))
- (agent-tags-match-build-tags agent-tags tags-for-build
- agent-id build-id)
- (let* ((setup-failures-for-build
- (or (hash-ref setup-failures-hash build-id)
- '()))
- (relevant-setup-failures
- (filter relevant-setup-failure?
- setup-failures-for-build)))
- (log "relevant setup failures:" relevant-setup-failures)
- (if (null? relevant-setup-failures)
- #t
- #f)))))
-
- (when metrics-registry
- (let ((counts
- (hash-fold
- (lambda (_ system result)
- `(,@(alist-delete system result)
- (,system . ,(+ 1 (or (assoc-ref result system) 0)))))
- '()
- systems-for-builds)))
- (for-each
- (match-lambda
- ((system . count)
- (metric-set allocator-considered-builds-metric
- count
- #:label-values `((system . ,system)))))
- counts)))
+ (match-lambda
+ (#(build-id build-system)
+ (log "build:" build-id)
+ (and
+ (or (null? requested-systems)
+ (member build-system requested-systems))
+ (agent-tags-match-build-tags agent-tags tags-for-build
+ agent-id build-id)
+ (let* ((setup-failures-for-build
+ (or (hash-ref setup-failures-hash build-id)
+ '()))
+ (relevant-setup-failures
+ (filter relevant-setup-failure?
+ setup-failures-for-build)))
+ (log "relevant setup failures:" relevant-setup-failures)
+ (if (null? relevant-setup-failures)
+ #t
+ #f))))))
+
+ ;; TODO Restore this in a more performant way
+ ;; (when metrics-registry
+ ;; (let ((counts
+ ;; (hash-fold
+ ;; (lambda (_ system result)
+ ;; `(,@(alist-delete system result)
+ ;; (,system . ,(+ 1 (or (assoc-ref result system) 0)))))
+ ;; '()
+ ;; systems-for-builds)))
+ ;; (for-each
+ ;; (match-lambda
+ ;; ((system . count)
+ ;; (metric-set allocator-considered-builds-metric
+ ;; count
+ ;; #:label-values `((system . ,system)))))
+ ;; counts)))
(let ((result
- (append-map
+ (map
(lambda (agent-id)
(log "considering builds for" agent-id)
(let* ((filter-proc
@@ -596,32 +578,24 @@
(build-ids
(let loop ((count 0)
(build-ids '())
- (potential-build-ids prioritised-builds))
+ (potential-builds prioritised-builds))
(if (or (and planned-builds-for-agent-limit
(>= count planned-builds-for-agent-limit))
- (null? potential-build-ids))
- build-ids ;; highest priority last
- (let ((potential-build (first potential-build-ids)))
- (if (filter-proc potential-build)
+ (null? potential-builds))
+ (reverse build-ids) ;; highest priority last, so
+ ;; reverse
+ (let ((potential-build-details (first potential-builds)))
+ (if (filter-proc potential-build-details)
(loop (+ 1 count)
- (cons potential-build
+ (cons (vector-ref
+ potential-build-details
+ 0)
build-ids)
- (cdr potential-build-ids))
+ (cdr potential-builds))
(loop count
build-ids
- (cdr potential-build-ids))))))))
- (if (null? build-ids)
- '()
- (let ((build-ids-count
- (length build-ids)))
- (map (lambda (build-id ordering)
- (list build-id
- agent-id
- ordering))
- build-ids
- (iota build-ids-count
- build-ids-count
- -1))))))
+ (cdr potential-builds))))))))
+ (cons agent-id build-ids)))
(map (lambda (agent)
(assq-ref agent 'uuid))
agents))))
diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm
index c0118aa..6ec6578 100644
--- a/guix-build-coordinator/client-communication.scm
+++ b/guix-build-coordinator/client-communication.scm
@@ -32,6 +32,11 @@
#:use-module (json)
#:use-module (logging logger)
#:use-module (gcrypt random)
+ #:use-module (knots)
+ #:use-module (knots timeout)
+ #:use-module (knots web-server)
+ #:use-module (knots thread-pool)
+ #:use-module (prometheus)
#:use-module (web uri)
#:use-module (web client)
#:use-module (web request)
@@ -65,9 +70,10 @@
(define (start-client-request-server secret-key-base
host
port
- build-coordinator)
- (run-server/patched
- (lambda (request body)
+ build-coordinator
+ utility-thread-pool)
+ (run-knots-web-server
+ (lambda (request)
(log-msg (build-coordinator-logger build-coordinator)
'INFO
(format #f "~4a ~a\n"
@@ -78,9 +84,10 @@
(cons (request-method request)
(split-and-decode-uri-path
(uri-path (request-uri request))))
- body
+ (read-request-body request)
secret-key-base
- build-coordinator)))
+ build-coordinator
+ utility-thread-pool)))
#:host host
#:port port))
@@ -88,7 +95,8 @@
method-and-path-components
raw-body
secret-key-base
- build-coordinator)
+ build-coordinator
+ utility-thread-pool)
(define datastore
(build-coordinator-datastore build-coordinator))
@@ -97,6 +105,14 @@
(json-string->scm (utf8->string raw-body))
'()))
+ (define read-drv-error-count-metric
+ (or (metrics-registry-fetch-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "read_derivation_error_total")
+ (make-counter-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "read_derivation_error_total")))
+
(define (controller-thunk)
(match method-and-path-components
(('GET "build" uuid)
@@ -151,7 +167,8 @@
(alist-delete
'end-time
build-details))
- ,@(if (assq-ref build-details 'processed)
+ ,@(if (or (assq-ref build-details 'processed)
+ (assq-ref build-details 'canceled))
'()
(datastore-find-unprocessed-build-entry datastore uuid))
(created-at . ,(or (and=>
@@ -291,8 +308,8 @@
(render-json
`((build_allocation_plan
. ,(list->vector
- (datastore-list-allocation-plan-builds
- datastore
+ (build-coordinator-list-allocation-plan-builds
+ build-coordinator
agent-id)))))))
(('POST "agent" agent-id "passwords")
(let ((password (new-agent-password
@@ -401,6 +418,14 @@
(or (and=> (assq-ref query-parameters 'priority_lt)
string->number)
'unset)
+ #:created-at->
+ (or (and=> (assq-ref query-parameters 'created_at_gt)
+ datastore-validate-datetime-string)
+ 'unset)
+ #:created-at-<
+ (or (and=> (assq-ref query-parameters 'created_at_lt)
+ datastore-validate-datetime-string)
+ 'unset)
#:relationship
(or (and=> (assq-ref query-parameters 'relationship)
string->symbol)
@@ -460,13 +485,42 @@
(simple-format #f "derivation must be a string: ~A\n"
derivation))))
+ (unless (derivation-path? derivation-file)
+ (raise-exception
+ (make-exception-with-message
+ "invalid derivation path")))
+
+ (string-for-each
+ (lambda (c)
+ (unless (or (char-alphabetic? c)
+ (char-numeric? c)
+ (member c '(#\+ #\- #\. #\_ #\? #\=)))
+ (raise-exception
+ (make-exception-with-message
+ (simple-format #f "invalid character in derivation name: ~A"
+ c)))))
+ (store-path-base derivation-file))
+
(define (read-drv/substitute derivation-file)
- (with-store/non-blocking store
- (unless (valid-path? store derivation-file)
- (substitute-derivation store
- derivation-file
- #:substitute-urls substitute-urls)))
- (read-derivation-from-file* derivation-file))
+ (call-with-delay-logging
+ (lambda ()
+ (with-store/non-blocking store
+ (unless (valid-path? store derivation-file)
+ (with-port-timeouts
+ (lambda ()
+ (substitute-derivation store
+ derivation-file
+ #:substitute-urls substitute-urls))
+ #:timeout 60)))))
+ ;; Read the derivation in a thread to avoid blocking fibers
+ (call-with-thread
+ utility-thread-pool
+ (lambda ()
+ (read-derivation-from-file* derivation-file))
+ #:duration-logger
+ (lambda (duration)
+ (log-delay read-derivation-from-file*
+ duration))))
(let ((submit-build-result
(call-with-delay-logging
@@ -475,24 +529,36 @@
`(,build-coordinator
,derivation-file
#:read-drv
- ,(lambda (derivation-file)
- (with-exception-handler
- (lambda (exn)
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'WARN
- "exception substituting derivation " derivation-file
- ": " exn)
-
- (if (null? (or substitute-urls '()))
- ;; Try again
- (read-drv/substitute derivation-file)
- (read-derivation-through-substitutes
- derivation-file
- substitute-urls)))
- (lambda ()
- (read-drv/substitute derivation-file))
- #:unwind? #t))
+ ,(let ((memoized-drv #f))
+ (lambda (derivation-file)
+ (or
+ memoized-drv
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg
+ (build-coordinator-logger build-coordinator)
+ 'WARN
+ "exception substituting derivation " derivation-file
+ ": " exn)
+ (raise-exception exn))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
+ (lambda ()
+ (let ((result
+ (retry-on-error
+ (lambda ()
+ (read-drv/substitute derivation-file))
+ #:times 2
+ #:delay 3
+ #:error-hook
+ (lambda _
+ (metric-increment read-drv-error-count-metric)))))
+ (set! memoized-drv result)
+ result))))
+ #:unwind? #t))))
,@(let ((priority (assoc-ref body "priority")))
(if priority
`(#:priority ,priority)
@@ -507,6 +573,10 @@
body "ensure-all-related-derivation-outputs-have-builds")
'(#:ensure-all-related-derivation-outputs-have-builds? #t)
'())
+ ,@(if (assoc-ref
+ body "skip-updating-derived-priorities")
+ '(#:skip-updating-derived-priorities? #t)
+ '())
,@(if (assoc-ref body "tags")
`(#:tags
,(map
@@ -530,8 +600,7 @@
datastore
(lambda (_)
(let ((allocation-plan-counts
- (datastore-count-build-allocation-plan-entries
- datastore)))
+ (build-coordinator-allocation-plan-stats build-coordinator)))
`((state_id . ,(build-coordinator-get-state-id build-coordinator))
(agents . ,(list->vector
(map
@@ -581,7 +650,9 @@
(datastore-list-agent-builds
datastore
(assq-ref agent-details 'uuid))))))))
- (datastore-list-agents datastore))))))))))
+ (datastore-list-agents datastore)))))))
+ #:priority 'high
+ #:duration-metric-name "get_state")))
(('GET "events")
(let ((headers (request-headers request)))
(list (build-response
@@ -626,7 +697,7 @@
(render-json
`((error . ,(client-error-details exn)))
#:code 400))
- ((worker-thread-timeout-error? exn)
+ ((thread-pool-timeout-error? exn)
(render-json
`((error . ,(simple-format #f "~A" exn)))
#:code 503))
@@ -635,20 +706,18 @@
`((error . 500))
#:code 500))))
(lambda ()
- (with-throw-handler #t
- controller-thunk
- (lambda (key . args)
- (unless (and (eq? '%exception key)
- (or
- (worker-thread-timeout-error? (car args))
- (client-error? (car args))))
+ (with-exception-handler
+ (lambda (exn)
+ (unless (or
+ (thread-pool-timeout-error? exn)
+ (client-error? exn))
(match method-and-path-components
((method path-components ...)
(simple-format
(current-error-port)
- "error: when processing client request: /~A ~A\n ~A ~A\n"
+ "error: when processing client request: /~A ~A\n ~A\n"
method (string-join path-components "/")
- key args)))
+ exn)))
(let* ((stack (make-stack #t 4))
(backtrace
@@ -660,7 +729,9 @@
(newline port)))))
(display
backtrace
- (current-error-port)))))))
+ (current-error-port))))
+ (raise-exception exn))
+ controller-thunk))
#:unwind? #t))
(define* (render-json json #:key (extra-headers '())
@@ -748,7 +819,8 @@
ensure-all-related-derivation-outputs-have-builds?
tags
#:key
- defer-until)
+ defer-until
+ skip-updating-derived-priorities?)
(send-request coordinator-uri
'POST
"/builds"
@@ -766,6 +838,9 @@
,@(if ensure-all-related-derivation-outputs-have-builds?
'((ensure-all-related-derivation-outputs-have-builds . #t))
'())
+ ,@(if skip-updating-derived-priorities?
+ '((skip-updating-derived-priorities . #t))
+ '())
,@(if (null? tags)
'()
`((tags . ,(list->vector tags))))
@@ -826,6 +901,8 @@
(canceled 'unset)
(priority-> 'unset)
(priority-< 'unset)
+ (created-at-> 'unset)
+ (created-at-< 'unset)
(relationship 'unset)
(after-id #f)
(limit #f))
@@ -874,6 +951,12 @@
,@(if (number? priority-<)
(list (simple-format #f "priority_lt=~A" priority-<))
'())
+ ,@(if (string? created-at->)
+ (list (simple-format #f "created_at_gt=~A" created-at->))
+ '())
+ ,@(if (string? created-at-<)
+ (list (simple-format #f "created_at_lt=~A" created-at-<))
+ '())
,@(if (and relationship (not (eq? 'unset relationship)))
(list (simple-format #f "relationship=~A" relationship))
'())
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index 3830d88..adb7575 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -25,18 +25,23 @@
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-19)
#:use-module (srfi srfi-26)
+ #:use-module (srfi srfi-43)
#:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
#:use-module (ice-9 vlist)
#:use-module (ice-9 match)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 binary-ports)
+ #:use-module (ice-9 suspendable-ports)
#:use-module (ice-9 format)
#:use-module (ice-9 atomic)
#:use-module (ice-9 control)
#:use-module (ice-9 threads)
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
+ #:use-module (system repl server)
+ #:use-module (system repl command)
+ #:use-module (system repl debug)
#:use-module (web uri)
#:use-module (web http)
#:use-module (oop goops)
@@ -44,12 +49,18 @@
#:use-module (logging port-log)
#:use-module (gcrypt random)
#:use-module (fibers)
+ #:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers conditions)
+ #:use-module (fibers operations)
+ #:use-module (knots)
+ #:use-module (knots timeout)
+ #:use-module (knots parallelism)
+ #:use-module (knots thread-pool)
#:use-module (prometheus)
- #:use-module ((guix build syscalls)
- #:select (set-thread-name))
+ #:use-module (guix store)
+ #:use-module (guix progress)
#:use-module (guix derivations)
#:use-module (guix build utils)
#:use-module (guix-build-coordinator utils)
@@ -66,6 +77,8 @@
client-error?
client-error-details
+ %build-coordinator
+
make-build-coordinator
build-coordinator-datastore
build-coordinator-hooks
@@ -97,12 +110,20 @@
build-coordinator-prompt-hook-processing-for-event
start-hook-processing-threads
+ build-coordinator-update-metrics
+
+ build-coordinator-allocation-plan-stats
+ build-coordinator-trigger-build-allocation
+ build-coordinator-list-allocation-plan-builds
+
build-output-file-location
build-log-file-destination
build-log-file-location
handle-build-start-report
handle-build-result
- handle-setup-failure-report))
+ handle-setup-failure-report
+
+ build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built))
(define-exception-type &agent-error &error
make-agent-error
@@ -114,48 +135,44 @@
client-error?
(details client-error-details))
+(define %build-coordinator
+ (make-parameter #f))
+
(define-record-type <build-coordinator>
(make-build-coordinator-record datastore hooks metrics-registry
- allocation-strategy logger)
+ allocation-strategy allocator-channel
+ logger)
build-coordinator?
(datastore build-coordinator-datastore)
(hooks build-coordinator-hooks)
(hook-condvars build-coordinator-hook-condvars
set-build-coordinator-hook-condvars!)
+ (background-job-conditions
+ build-coordinator-background-job-conditions
+ set-build-coordinator-background-job-conditions!)
(metrics-registry build-coordinator-metrics-registry)
(allocation-strategy build-coordinator-allocation-strategy)
- (allocator-thread build-coordinator-allocator-thread
- set-build-coordinator-allocator-thread!)
+ (trigger-build-allocation
+ build-coordinator-trigger-build-allocation
+ set-build-coordinator-trigger-build-allocation!)
+ (allocator-channel build-coordinator-allocator-channel)
(logger build-coordinator-logger)
(events-channel build-coordinator-events-channel
set-build-coordinator-events-channel!)
(get-state-id build-coordinator-get-state-id-proc
set-build-coordinator-get-state-id-proc!)
(scheduler build-coordinator-scheduler
- set-build-coordinator-scheduler!))
+ set-build-coordinator-scheduler!)
+ (utility-thread-pool build-coordinator-utility-thread-pool
+ set-build-coordinator-utility-thread-pool!))
(set-record-type-printer!
<build-coordinator>
(lambda (build-coordinator port)
(display "#<build-coordinator>" port)))
-(define-class <custom-port-log> (<log-handler>)
- (port #:init-value #f #:accessor port #:init-keyword #:port))
-
-(define-method (emit-log (self <custom-port-log>) str)
- (when (port self)
- (put-bytevector (port self)
- (string->utf8 str))
- ;; Even though the port is line buffered, writing to it with
- ;; put-bytevector doesn't cause the buffer to be flushed.
- (force-output (port self))))
-
-(define-method (flush-log (self <custom-port-log>))
- (and=> (port self) force-output))
-
-(define-method (close-log! (self <custom-port-log>))
- (and=> (port self) close-port)
- (set! (port self) #f))
+(define %command-duration-histogram-buckets
+ (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf)))
(define %known-hooks
'(build-submitted
@@ -201,7 +218,15 @@
#f)
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (if (and
+ (exception-with-origin? exn)
+ (string=? (exception-origin exn)
+ "fport_write"))
+ #f
+ (print-backtrace-and-exception/knots exn))
+ (raise-exception exn))
(lambda ()
(match (atomic-box-ref
current-state-id-and-event-buffer-index-box)
@@ -225,16 +250,7 @@
(iota event-count-to-send
(+ 1 last-sent-state-id))))
- current-state-id)))
- (lambda (key . args)
- (if (and
- (eq? key 'system-error)
- (match args
- (("fport_write" "~A" ("Broken pipe") rest ...)
- #t)
- (_ #f)))
- #f
- (backtrace)))))
+ current-state-id)))))
#:unwind? #t)))
(unless (eq? #f new-state-id)
@@ -276,7 +292,8 @@
(if (> requested-after-state-id
current-state-id)
current-state-id
- requested-after-state-id)
+ (max 0
+ requested-after-state-id))
current-state-id)))))
(atomic-box-set!
listener-channels-box
@@ -346,6 +363,27 @@
(define (build-coordinator-get-state-id build-coordinator)
((build-coordinator-get-state-id-proc build-coordinator)))
+(define (build-coordinator-update-metrics build-coordinator)
+ (define metrics-registry
+ (build-coordinator-metrics-registry build-coordinator))
+
+ (let ((utility-thread-pool-used-thread-metric
+ (or (metrics-registry-fetch-metric
+ metrics-registry
+ "utility_thread_pool_used_thread_total")
+ (make-gauge-metric
+ metrics-registry
+ "utility_thread_pool_used_thread_total"))))
+
+ (and=> (build-coordinator-utility-thread-pool build-coordinator)
+ (lambda (utility-thread-pool)
+ (metric-set
+ utility-thread-pool-used-thread-metric
+ (vector-count
+ (lambda (_ proc)
+ (->bool proc))
+ (thread-pool-proc-vector utility-thread-pool)))))))
+
(define* (make-build-coordinator
#:key
database-uri-string
@@ -357,13 +395,14 @@
(database-uri->datastore
database-uri-string
#:metrics-registry metrics-registry
- #:worker-thread-log-exception?
+ #:thread-pool-log-exception?
(lambda (exn)
(and (not (agent-error? exn))
(not (client-error? exn))))))
hooks
(allocation-strategy
- basic-build-allocation-strategy))
+ basic-build-allocation-strategy)
+ (timestamp-log-output? #t))
(and (or (list? hooks)
(begin
(simple-format
@@ -394,16 +433,22 @@
(port-log (make <custom-port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
- (format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str))))
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
+ (format #f "~a(~5a): ~a~%"
+ (if timestamp-log-output?
+ (strftime "%F %H:%M:%S " (localtime
+ (second args)))
+ "")
+ (first args)
+ (third args)))))
(build-coordinator
(make-build-coordinator-record datastore
hooks
metrics-registry
allocation-strategy
+ (make-channel)
lgr)))
(add-handler! lgr port-log)
@@ -427,6 +472,16 @@
(setrlimit 'core #f #f))
#:unwind? #t)
+ (let ((core-file
+ (string-append (getcwd) "/core"))
+ (metric
+ (make-gauge-metric (build-coordinator-metrics-registry
+ build-coordinator)
+ "core_dump_file_last_modified_seconds")))
+ (when (file-exists? core-file)
+ (metric-set metric
+ (stat:mtime (stat core-file)))))
+
(with-exception-handler
(lambda (exn)
(simple-format #t "failed increasing open file limit: ~A\n" exn))
@@ -441,29 +496,49 @@
(lambda (scheduler port)
(display "#<scheduler>" port)))
+ (call-with-new-thread
+ (lambda ()
+ (set-thread-name
+ (string-append "gc watcher"))
+
+ (add-hook!
+ after-gc-hook
+ (let ((last-gc-time-taken
+ (assq-ref (gc-stats) 'gc-time-taken)))
+ (lambda ()
+ (let* ((gc-time-taken
+ (assq-ref (gc-stats) 'gc-time-taken))
+ (time-since-last
+ (/ (- gc-time-taken
+ last-gc-time-taken)
+ internal-time-units-per-second)))
+ (when (> time-since-last 0.1)
+ (format (current-error-port)
+ "after gc (additional time taken: ~f)\n"
+ time-since-last))
+ (set! last-gc-time-taken
+ (assq-ref (gc-stats) 'gc-time-taken))))))
+ (while #t
+ (sleep 0.1))))
+
(when update-datastore?
(datastore-update (build-coordinator-datastore build-coordinator)))
- (when pid-file
- (call-with-output-file pid-file
- (lambda (port)
- (simple-format port "~A\n" (getpid)))))
-
- (set-build-coordinator-allocator-thread!
+ (set-build-coordinator-trigger-build-allocation!
build-coordinator
(make-build-allocator-thread build-coordinator))
- (set-build-coordinator-hook-condvars!
- build-coordinator
- (start-hook-processing-threads build-coordinator
- parallel-hooks))
-
(when trigger-build-allocation?
(trigger-build-allocation build-coordinator)))
(define %default-agent-uri (string->uri "http://0.0.0.0:8745"))
(define %default-client-uri (string->uri "http://127.0.0.1:8746"))
+(define %default-repl-server-port
+ ;; Default port to run REPL server on, if --listen-repl is provided
+ ;; but no port is mentioned
+ 37146)
+
(define* (run-coordinator-service build-coordinator
#:key
(update-datastore? #t)
@@ -471,7 +546,10 @@
(agent-communication-uri %default-agent-uri)
(client-communication-uri %default-client-uri)
secret-key-base
- (parallel-hooks '()))
+ (parallel-hooks '())
+ listen-repl)
+ (install-suspendable-ports!)
+
(with-fluids ((%file-port-name-canonicalization 'none))
(perform-coordinator-service-startup
build-coordinator
@@ -479,33 +557,65 @@
#:pid-file pid-file
#:parallel-hooks parallel-hooks)
+ (when listen-repl
+ (parameterize ((%build-coordinator build-coordinator))
+ (cond
+ ((or (eq? #t listen-repl)
+ (number? listen-repl))
+ (let ((listen-repl
+ (if (eq? #t listen-repl)
+ %default-repl-server-port
+ listen-repl)))
+ (format (current-error-port)
+ "REPL server listening on port ~a~%"
+ listen-repl)
+ (spawn-server (make-tcp-server-socket
+ #:port
+ (if (eq? #t listen-repl)
+ %default-repl-server-port
+ listen-repl)))))
+ (else
+ (format (current-error-port)
+ "REPL server listening on ~a~%"
+ listen-repl)
+ (spawn-server (make-unix-domain-server-socket #:path listen-repl))))))
+
;; Create some worker thread channels, which need to be created prior
;; to run-fibers being called.
- (let ((chunked-request-channel
- ;; There are fibers issues when trying to read the chunked
- ;; requests, so do this in dedicated threads.
- (make-worker-thread-channel
- (const '())
- #:name "chunked request"
- #:parallelism 16
- #:log-exception?
- (lambda (exn)
- (not
- (chunked-input-ended-prematurely-error?
- exn)))
- #:delay-logger
- (lambda (seconds-delayed)
- (log-delay "chunked request channel"
- seconds-delayed)
- (when (> seconds-delayed 0.1)
- (format
- (current-error-port)
- "warning: chunked request channel delayed by ~1,2f seconds~%"
- seconds-delayed)))))
-
- (output-hash-channel
+ (let ((output-hash-channel
(make-output-hash-channel
- build-coordinator)))
+ build-coordinator))
+
+ (utility-thread-pool
+ (make-thread-pool
+ 18
+ #:name "utility"
+ #:delay-logger
+ (let ((delay-metric
+ (make-histogram-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "utility_thread_pool_delay_seconds")))
+ (lambda (seconds-delayed proc)
+ (log-delay "utility thread channel"
+ seconds-delayed)
+ (metric-observe delay-metric seconds-delayed)
+ (when (> seconds-delayed 0.1)
+ (format
+ (current-error-port)
+ "warning: utility thread channel delayed by ~1,2f seconds~%"
+ seconds-delayed)))))))
+
+ (let ((metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "utility_thread_pool_thread_total")))
+ (metric-set metric
+ (vector-length
+ (thread-pool-proc-vector utility-thread-pool))))
+
+ (set-build-coordinator-utility-thread-pool!
+ build-coordinator
+ utility-thread-pool)
(let ((finished? (make-condition)))
(call-with-sigint
@@ -528,6 +638,9 @@
(iota (length schedulers))
schedulers))
+ (set-build-coordinator-scheduler! build-coordinator
+ (current-scheduler))
+
(log-msg (build-coordinator-logger build-coordinator)
'INFO
"initialising metrics")
@@ -539,10 +652,18 @@
(datastore-spawn-fibers
(build-coordinator-datastore build-coordinator))
+ (set-build-coordinator-hook-condvars!
+ build-coordinator
+ (start-hook-processing-threads build-coordinator
+ parallel-hooks))
+
+ (set-build-coordinator-background-job-conditions!
+ build-coordinator
+ (start-background-job-processing-fibers build-coordinator))
+
(spawn-fiber-to-watch-for-deferred-builds build-coordinator)
- (set-build-coordinator-scheduler! build-coordinator
- (current-scheduler))
+ (spawn-build-allocation-plan-management-fiber build-coordinator)
(let ((events-channel
get-state-id
@@ -566,7 +687,6 @@
host
secret-key-base
build-coordinator
- chunked-request-channel
output-hash-channel)
(log-msg 'INFO "listening on " host ":" port))))
@@ -575,7 +695,13 @@
secret-key-base
(uri-host client-communication-uri)
(uri-port client-communication-uri)
- build-coordinator)
+ build-coordinator
+ utility-thread-pool)
+
+ (when pid-file
+ (call-with-output-file pid-file
+ (lambda (port)
+ (simple-format port "~A\n" (getpid)))))
;; Guile seems to just stop listening on ports, so try to
;; monitor that internally and just quit if it happens
@@ -585,8 +711,8 @@
finished?)
(wait finished?))
- #:hz 10
- #:parallelism 2))
+ #:hz 0
+ #:parallelism 1))
finished?)))))
(define* (submit-build build-coordinator derivation-file
@@ -596,6 +722,7 @@
(ignore-if-build-for-derivation-exists? #f)
(ignore-if-build-for-outputs-exists? #f)
(ensure-all-related-derivation-outputs-have-builds? #f)
+ skip-updating-derived-priorities?
(tags '())
defer-until
(read-drv read-derivation-from-file*))
@@ -608,35 +735,15 @@
#:include-canceled? #f)
0))
- (define (build-for-output-already-exists?)
- ;; Handle the derivation not existing in the database here, so that adding
- ;; it to the database isn't required for this code to work
- (let* ((system-from-database (datastore-find-derivation-system datastore
- derivation-file))
-
- (derivation-exists-in-database? (not (eq? #f system-from-database)))
-
- (derivation
- (if derivation-exists-in-database?
- #f ; unnecessary to fetch derivation
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file))))
-
- (system
- (or system-from-database
- (derivation-system derivation)))
-
- (outputs
- (if derivation-exists-in-database?
- (datastore-find-derivation-outputs datastore
- derivation-file)
- (map
- (match-lambda
- ((name . output)
- `((name . ,name)
- (output . ,(derivation-output-path output)))))
- (derivation-outputs derivation)))))
+ (define (build-for-output-already-exists/with-derivation? derivation)
+ (let ((system (derivation-system derivation))
+ (outputs
+ (map
+ (match-lambda
+ ((name . output)
+ `((name . ,name)
+ (output . ,(derivation-output-path output)))))
+ (derivation-outputs derivation))))
(any
(lambda (output-details)
(let ((builds-for-output
@@ -648,13 +755,37 @@
(not (null? builds-for-output))))
outputs)))
- (define (check-whether-to-store-build)
+ (define (build-for-output-already-exists?)
+ (let ((system (datastore-find-derivation-system datastore
+ derivation-file)))
+ (if (eq? #f system) ; derivation does not exist in database?
+ (build-for-output-already-exists/with-derivation?
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
+ (any
+ (lambda (output-details)
+ (let ((builds-for-output
+ (datastore-list-builds-for-output-and-system
+ datastore
+ (assq-ref output-details 'output)
+ system
+ #:include-canceled? #f)))
+ (not (null? builds-for-output))))
+ (datastore-find-derivation-outputs datastore
+ derivation-file)))))
+
+ (define* (check-whether-to-store-build #:optional derivation)
(cond
((and ignore-if-build-for-derivation-exists?
(build-for-derivation-exists?))
'((no-build-submitted . build-already-exists-for-this-derivation)))
((and ignore-if-build-for-outputs-exists?
- (call-with-delay-logging build-for-output-already-exists?))
+ (if derivation
+ (call-with-delay-logging
+ build-for-output-already-exists/with-derivation?
+ #:args (list derivation))
+ (call-with-delay-logging build-for-output-already-exists?)))
'((no-build-submitted . build-already-exists-for-a-output)))
(else
'continue)))
@@ -684,18 +815,20 @@
(or requested-uuid
(random-v4-uuid)))
- (define (build-perform-datastore-changes derivations-lacking-builds)
+ (define (build-perform-datastore-changes derivation derivations-lacking-builds)
(lambda (_)
;; Check again now, since new builds could have been added since the
;; checks were made before the start of the transaction.
- (match (check-whether-to-store-build)
+ (match (check-whether-to-store-build derivation)
('continue
;; Actually create a build, do this first so the derived priorities
;; for the builds inserted below are informed by this build.
(store-build derivation-file
build-id
priority
- tags)
+ tags
+ #:skip-updating-other-build-derived-priorities
+ skip-updating-derived-priorities?)
(for-each
(match-lambda
@@ -733,65 +866,84 @@
(build-coordinator-metrics-registry build-coordinator)
"coordinator_submit_build_duration_seconds"
(lambda ()
+ (unless (derivation-path? derivation-file)
+ (raise-exception
+ (make-client-error 'invalid-derivation-file)))
+
+ (string-for-each
+ (lambda (c)
+ (unless (or (char-alphabetic? c)
+ (char-numeric? c)
+ (member c '(#\+ #\- #\. #\_ #\? #\=)))
+ (raise-exception
+ (make-client-error 'invalid-character-in-derivation-file))))
+ (store-path-base derivation-file))
+
(match (check-whether-to-store-build)
('continue
- ;; Store the derivation first, so that listing related derivations
- ;; with no builds works
- (unless (datastore-find-derivation datastore derivation-file)
- (datastore-store-derivation
- datastore
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file))))
-
- (let ((related-derivations-lacking-builds
- (if ensure-all-related-derivation-outputs-have-builds?
- (datastore-list-related-derivations-with-no-build-for-outputs
+ (let ((drv
+ ;; If the dervation is missing from the database, read it and
+ ;; enter it in to the database, so that listing related
+ ;; derivations with no builds works
+ (if (datastore-find-derivation datastore derivation-file)
+ #f
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))))
+ (when drv
+ (datastore-store-derivation datastore drv))
+
+ (let ((related-derivations-lacking-builds
+ (if ensure-all-related-derivation-outputs-have-builds?
+ (datastore-list-related-derivations-with-no-build-for-outputs
+ datastore
+ derivation-file)
+ '())))
+ (match (datastore-call-with-transaction
datastore
- derivation-file)
- '())))
- (match (datastore-call-with-transaction
- datastore
- (build-perform-datastore-changes
- ;; Do this here so it doesn't take time in the writer thread
- (map
- (lambda (drv)
- ;; Generate the UUID's outside the transaction to save
- ;; time too.
- (cons drv (random-v4-uuid)))
- related-derivations-lacking-builds))
- #:duration-metric-name
- "store_build")
- (#t ; build submitted
- (build-coordinator-prompt-hook-processing-for-event
- build-coordinator
- 'build-submitted)
-
- (build-coordinator-send-event
- build-coordinator
- 'build-submitted
- `((id . ,build-id)
- (derivation . ,derivation-file)
- (priority . ,priority)
- (tags
- . ,(list->vector
+ (build-perform-datastore-changes
+ drv
+ ;; Do this here so it doesn't take time in the writer thread
(map
- (match-lambda
- ((key . value)
- `((key . ,key)
- (value . ,value))))
- (if (vector? tags)
- (vector->list tags)
- tags))))
- (defer_until . ,defer-until)))
-
- (trigger-build-allocation build-coordinator)
-
- `((build-submitted . ,build-id)))
- (stop-condition
- stop-condition))))
+ (lambda (related-drv)
+ ;; Generate the UUID's outside the transaction to save
+ ;; time too.
+ (cons related-drv (random-v4-uuid)))
+ related-derivations-lacking-builds))
+ #:priority 'low
+ #:duration-metric-name "store_build"
+ #:duration-metric-buckets %command-duration-histogram-buckets)
+ (#t ; build submitted
+ (build-coordinator-prompt-hook-processing-for-event
+ build-coordinator
+ 'build-submitted)
+
+ (build-coordinator-send-event
+ build-coordinator
+ 'build-submitted
+ `((id . ,build-id)
+ (derivation . ,derivation-file)
+ (priority . ,priority)
+ (tags
+ . ,(list->vector
+ (map
+ (match-lambda
+ ((key . value)
+ `((key . ,key)
+ (value . ,value))))
+ (if (vector? tags)
+ (vector->list tags)
+ tags))))
+ (defer_until . ,defer-until)))
+
+ (trigger-build-allocation build-coordinator)
+
+ `((build-submitted . ,build-id)))
+ (stop-condition
+ stop-condition)))))
(stop-condition
- stop-condition)))))
+ stop-condition)))
+ #:buckets %command-duration-histogram-buckets))
(define* (cancel-build build-coordinator uuid
#:key (ignore-if-build-required-by-another? #t)
@@ -804,6 +956,10 @@
datastore
(lambda (db)
(let ((build-details (datastore-find-build datastore uuid)))
+ (unless build-details
+ (raise-exception
+ (make-client-error 'build-unknown)))
+
(when (assq-ref build-details 'canceled)
(raise-exception
(make-client-error 'build-already-canceled)))
@@ -819,12 +975,17 @@
(make-transaction-rollback-exception
'skipped-as-build-required-by-another)))
- (datastore-remove-build-from-allocation-plan datastore uuid)
+ (build-coordinator-remove-build-from-allocation-plan
+ build-coordinator
+ uuid)
(datastore-cancel-build datastore uuid)
(datastore-insert-unprocessed-hook-event datastore
"build-canceled"
(list uuid))
- 'build-canceled))))
+ 'build-canceled)
+ #:priority 'low
+ #:duration-metric-name "cancel_build"
+ #:duration-metric-buckets %command-duration-histogram-buckets)))
(when (eq? val 'build-canceled)
(unless skip-updating-derived-priorities?
@@ -845,6 +1006,10 @@
val))
+ (unless (datastore-find-build datastore uuid)
+ (raise-exception
+ (make-client-error 'build-unknown)))
+
(if ignore-if-build-required-by-another?
(let ((build-required
;; Do this check here outside the transaction to avoid having to
@@ -867,6 +1032,10 @@
datastore
(lambda (db)
(let ((build-details (datastore-find-build datastore uuid)))
+ (unless build-details
+ (raise-exception
+ (make-client-error 'build-unknown)))
+
(when (assq-ref build-details 'canceled)
(raise-exception
(make-client-error 'build-already-canceled)))
@@ -881,7 +1050,10 @@
new-priority
#:skip-updating-derived-priorities?
skip-updating-derived-priorities?
- #:override-derived-priority override-derived-priority)))
+ #:override-derived-priority override-derived-priority))
+ #:priority 'low
+ #:duration-metric-name "update_build_priority"
+ #:duration-metric-buckets %command-duration-histogram-buckets)
(trigger-build-allocation build-coordinator)
@@ -957,14 +1129,24 @@
(processor_count . ,processor-count))))
(define (trigger-build-allocation build-coordinator)
- ((build-coordinator-allocator-thread build-coordinator)))
+ ((build-coordinator-trigger-build-allocation build-coordinator)))
(define (build-coordinator-prompt-hook-processing-for-event build-coordinator
event-name)
(and=> (assoc-ref (build-coordinator-hook-condvars build-coordinator)
event-name)
(lambda (condvar)
- (signal-condition-variable condvar)
+ (cond
+ ((condition-variable? condvar)
+ (signal-condition-variable condvar))
+ ((reusable-condition? condvar)
+ (signal-reusable-condition!
+ condvar
+ (build-coordinator-scheduler build-coordinator)))
+ (else
+ (error
+ (simple-format #f "unrecognised condvar ~A"
+ condvar))))
#t)))
(define (update-build-allocation-plan build-coordinator)
@@ -978,16 +1160,20 @@
(allocator-proc datastore
#:metrics-registry (build-coordinator-metrics-registry
build-coordinator)))))
- (datastore-replace-build-allocation-plan datastore new-plan)
+ (build-coordinator-replace-build-allocation-plan
+ build-coordinator
+ new-plan)
- (let ((build-count-per-agent
- (datastore-count-build-allocation-plan-entries
- datastore)))
- (build-coordinator-send-event
- build-coordinator
- "allocation-plan-update"
- `((allocation_plan_counts . ,build-count-per-agent)))))
- #t)
+ (build-coordinator-send-event
+ build-coordinator
+ "allocation-plan-update"
+ `((allocation_plan_counts
+ . ,(map
+ (match-lambda
+ ((agent-id . builds)
+ (cons agent-id (length builds))))
+ new-plan))))
+ #t))
(define (make-build-allocator-thread build-coordinator)
(define mtx (make-mutex))
@@ -1022,24 +1208,19 @@
(lambda ()
(with-exception-handler
(lambda (exn)
- (simple-format
- (current-error-port)
- "build-allocator-thread: exception: ~A\n"
- exn)
(metric-increment failure-counter-metric)
(atomic-box-set! allocation-needed #t))
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "error in build allocator thread\n")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
(lambda ()
(update-build-allocation-plan build-coordinator)
- (metric-increment success-counter-metric))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "error in build allocator thread: ~A ~A\n"
- key
- args)
- (backtrace))))
+ (metric-increment success-counter-metric))))
#:unwind? #t))
#:buckets ((@@ (prometheus) exponential-histogram-buckets) ; TODO
#:start 1
@@ -1059,31 +1240,239 @@
exn)
(exit 1))
(lambda ()
- (let ((build-allocation-plan-total
- (make-gauge-metric
- (build-coordinator-metrics-registry build-coordinator)
- "build_allocation_plan_total"
- #:labels '(agent_id))))
- (with-time-logging
- "allocator initialise metrics"
- (for-each (match-lambda
- ((agent-id . count)
- (metric-set build-allocation-plan-total
- count
- #:label-values
- `((agent_id . ,agent-id)))))
- (datastore-count-build-allocation-plan-entries
- (build-coordinator-datastore build-coordinator)))))
-
(update-build-allocation-plan-loop)))))
trigger-build-allocation)
+(define (spawn-build-allocation-plan-management-fiber coordinator)
+ (define allocation-plan '())
+
+ (define allocation-plan-metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry coordinator)
+ "build_allocation_plan_total"
+ #:labels '(agent_id)))
+
+ (define (update-build-allocation-plan-metrics!)
+ (for-each
+ (match-lambda
+ ((agent-id . builds)
+ (metric-set allocation-plan-metric
+ (length builds)
+ #:label-values
+ `((agent_id . ,agent-id)))))
+ allocation-plan)
+ #t)
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda _ #f)
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "exception in allocation plan fiber\n")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
+ (lambda ()
+ (match (get-message (build-coordinator-allocator-channel coordinator))
+ (('stats reply)
+ (put-message
+ reply
+ (map
+ (match-lambda
+ ((agent-id . builds)
+ (cons agent-id (length builds))))
+ allocation-plan)))
+ (('fetch-agent-plan agent-id reply)
+ (put-message
+ reply
+ (or (list-copy (assoc-ref allocation-plan agent-id))
+ '())))
+ (('fetch-plan reply)
+ (put-message reply
+ (map (match-lambda
+ ((agent-id . builds)
+ (cons agent-id
+ (list-copy builds))))
+ allocation-plan)))
+ (('remove-build uuid reply)
+ (set!
+ allocation-plan
+ (map
+ (match-lambda
+ ((agent-id . builds)
+ (cons agent-id
+ (remove
+ (lambda (build-uuid)
+ (string=? build-uuid uuid))
+ builds))))
+ allocation-plan))
+
+ (put-message reply #t))
+ (('replace new-plan reply)
+
+ (set! allocation-plan new-plan)
+
+ (update-build-allocation-plan-metrics!)
+
+ (put-message reply #t))))))
+ #:unwind? #t)))))
+
+(define (build-coordinator-allocation-plan-stats coordinator)
+ (let ((reply (make-channel)))
+ (put-message (build-coordinator-allocator-channel coordinator)
+ (list 'stats reply))
+ (get-message reply)))
+
+(define (build-coordinator-count-allocation-plan-builds coordinator agent-id)
+ (or (assoc-ref (build-coordinator-allocation-plan-stats coordinator)
+ agent-id)
+ 0))
+
+(define (build-coordinator-fetch-agent-allocation-plan coordinator agent-id)
+ (let ((reply (make-channel)))
+ (put-message (build-coordinator-allocator-channel coordinator)
+ (list 'fetch-agent-plan agent-id reply))
+ (get-message reply)))
+
+(define (build-coordinator-allocation-plan coordinator)
+ (let ((reply (make-channel)))
+ (put-message (build-coordinator-allocator-channel coordinator)
+ (list 'fetch-plan reply))
+ (get-message reply)))
+
+(define (build-coordinator-build-in-allocation-plan? coordinator uuid)
+ (any
+ (match-lambda
+ ((agent-id . build-uuids)
+ (->bool (member uuid build-uuids string=?))))
+ (build-coordinator-allocation-plan coordinator)))
+
+(define (build-coordinator-remove-build-from-allocation-plan coordinator uuid)
+ (let ((reply (make-channel)))
+ (put-message (build-coordinator-allocator-channel coordinator)
+ (list 'remove-build uuid reply))
+ (get-message reply)))
+
+(define (build-coordinator-replace-build-allocation-plan coordinator plan)
+ (let ((reply (make-channel)))
+ (put-message (build-coordinator-allocator-channel coordinator)
+ (list 'replace plan reply))
+ (get-message reply)))
+
+(define (build-coordinator-fetch-build-to-allocate coordinator
+ agent-id)
+ (define datastore
+ (build-coordinator-datastore coordinator))
+
+ (let ((all-planned-builds
+ (build-coordinator-fetch-agent-allocation-plan coordinator
+ agent-id)))
+
+ (log-msg (build-coordinator-logger coordinator)
+ 'DEBUG
+ "fetching build to allocate to " agent-id ", "
+ (length all-planned-builds) " to consider")
+
+ (let loop ((planned-builds all-planned-builds))
+ (if (null? planned-builds)
+ #f
+ (match (datastore-fetch-build-to-allocate datastore
+ (first planned-builds))
+ (#f
+ (log-msg (build-coordinator-logger coordinator)
+ 'DEBUG
+ agent-id ": ignoring " (first planned-builds))
+ (loop (cdr planned-builds)))
+ (#(uuid derivation-id derivation-name derived_priority)
+
+ (if (datastore-check-if-derivation-conflicts?
+ datastore
+ agent-id
+ derivation-id)
+ (begin
+ (log-msg (build-coordinator-logger coordinator)
+ 'DEBUG
+ agent-id ": " uuid " conflicts with allocated build")
+ (loop (cdr planned-builds)))
+ (begin
+ (log-msg (build-coordinator-logger coordinator)
+ 'DEBUG
+ agent-id ": " uuid " selected")
+
+ `((uuid . ,uuid)
+ (derivation_name . ,derivation-name)
+ (derived_priority . ,derived_priority))))))))))
+
+(define* (build-coordinator-list-allocation-plan-builds coordinator
+ agent-id
+ #:key limit
+ filter?)
+ (define (take* lst i)
+ (if (< (length lst) i)
+ lst
+ (take lst i)))
+
+ (define datastore
+ (build-coordinator-datastore coordinator))
+
+ (define (build-data uuid
+ derivation-name
+ derived-priority
+ build-details)
+ `((uuid . ,uuid)
+ (derivation_name . ,derivation-name)
+ (system . ,(datastore-find-build-derivation-system
+ datastore
+ uuid))
+ (priority . ,(assq-ref build-details 'priority))
+ (derived_priority . ,derived-priority)
+ (tags . ,(vector-map
+ (lambda (_ tag)
+ (match tag
+ ((key . value)
+ `((key . ,key)
+ (value . ,value)))))
+ (datastore-fetch-build-tags
+ datastore
+ uuid)))))
+
+ (let ((build-ids
+ (build-coordinator-fetch-agent-allocation-plan coordinator
+ agent-id)))
+ (filter-map
+ (lambda (build-id)
+ (if filter?
+ (match (datastore-fetch-build-to-allocate datastore build-id)
+ (#(uuid derivation_id derivation_name derived_priority)
+ (let ((build-details (datastore-find-build datastore uuid)))
+ (build-data uuid
+ derivation_name
+ derived_priority
+ build-details)))
+ (#f #f))
+ (let ((build-details (datastore-find-build datastore build-id))
+ (unprocessed-builds-entry
+ (datastore-find-unprocessed-build-entry
+ datastore
+ build-id)))
+ (build-data build-id
+ (assq-ref build-details 'derivation-name)
+ (assq-ref unprocessed-builds-entry
+ 'derived-priority)
+ build-details))))
+ (if limit
+ (take* build-ids limit)
+ build-ids))))
+
(define (spawn-fiber-to-watch-for-deferred-builds coordinator)
(spawn-fiber
(lambda ()
+ (sleep 10)
(while #t
- (sleep 60) ; 1 minute
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
@@ -1091,14 +1480,21 @@
exn))
(lambda ()
(let ((first-deferred-build
- (datastore-find-first-unallocated-deferred-build
- (build-coordinator-datastore coordinator))))
- (when (and first-deferred-build
- (time<=? (date->time-utc
- (assq-ref first-deferred-build 'deferred-until))
- (current-time)))
- (simple-format #t "guix-build-coordinator: triggering build allocation for deferred build: ~A\n" (assq-ref first-deferred-build 'uuid))
- (trigger-build-allocation coordinator))))
+ (datastore-find-deferred-build
+ (build-coordinator-datastore coordinator)
+ (lambda (build-details)
+ (build-coordinator-build-in-allocation-plan?
+ coordinator
+ (assq-ref build-details 'uuid))))))
+ (if (and first-deferred-build
+ (time<=? (date->time-utc
+ (assq-ref first-deferred-build 'deferred-until))
+ (current-time)))
+ (begin
+ (simple-format #t "guix-build-coordinator: triggering build allocation for deferred build: ~A\n" (assq-ref first-deferred-build 'uuid))
+ (trigger-build-allocation coordinator)
+ (sleep 10))
+ (sleep 60))))
#:unwind? #t)))
#:parallel? #t))
@@ -1118,6 +1514,18 @@
"hook_failure_total"
#:labels '(event)))
+ (define process-events-thread-pool-thread-total-metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "hook_thread_pool_thread_total"
+ #:labels '(event)))
+
+ (define process-events-thread-pool-used-thread-total-metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "hook_thread_pool_used_thread_total"
+ #:labels '(event)))
+
(define (process-event id event arguments handler)
(log-msg (build-coordinator-logger build-coordinator)
'DEBUG
@@ -1126,10 +1534,6 @@
(and
(with-exception-handler
(lambda (exn)
- (log-msg (build-coordinator-logger build-coordinator)
- 'ERROR
- "error running " event " (" id ") hook: "
- exn)
(metric-increment failure-counter-metric
#:label-values
`((event . ,event)))
@@ -1140,25 +1544,34 @@
(build-coordinator-metrics-registry build-coordinator)
"hook_duration_seconds"
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (let* ((stack
+ (match (fluid-ref %stacks)
+ ((stack-tag . prompt-tag)
+ (make-stack #t
+ 0 prompt-tag
+ 0 (and prompt-tag 1)))))
+ (backtrace
+ (call-with-output-string
+ (lambda (port)
+ (print-frames (stack->vector stack)
+ port
+ #:count (stack-length stack))
+ (print-exception
+ port
+ (stack-ref stack 4)
+ '%exception
+ (list exn))))))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'ERROR
+ "error running " event " (" id ") hook\n"
+ backtrace))
+ (raise-exception exn))
(lambda ()
(start-stack
- 'hook
- (apply handler build-coordinator arguments)))
- (lambda (key . args)
- (log-msg (build-coordinator-logger build-coordinator)
- 'ERROR
- "error running " event " (" id ") hook: "
- key " " args)
- (let* ((stack (make-stack #t 3))
- (backtrace
- (call-with-output-string
- (lambda (port)
- (display-backtrace stack port)
- (newline port)))))
- (display
- backtrace
- (current-output-port))))))
+ #t
+ (apply handler build-coordinator arguments)))))
#:labels '(event)
#:label-values `((event . ,event)))
#t)
@@ -1184,78 +1597,116 @@
(define (single-thread-process-events event-name handler)
(let ((mtx (make-mutex))
(condvar (make-condition-variable)))
- (call-with-new-thread
+ (call-with-default-io-waiters
(lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (symbol->string event-name)))
- (const #t))
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (symbol->string event-name)))
+ (const #t))
+
+ (lock-mutex mtx)
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (sleep 10))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg
+ (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "error in " event-name " hook processing thread")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
+ (lambda ()
+ (while #t
+ (match (datastore-list-unprocessed-hook-events datastore event-name 1)
+ (()
+ (wait-condition-variable condvar mtx))
+ (((id event arguments))
+ (process-event id event arguments handler)))))))
+ #:unwind? #t))))))
+ condvar))
- (lock-mutex mtx)
- (while #t
- (with-exception-handler
- (lambda (exn)
- (with-exception-handler
- (lambda _
- ;; Things are really going wrong if logging about
- ;; the hook processing thread crashing, also raises
- ;; an exception, so just try and sleep and hope
- ;; things go better next time
- (sleep 10))
- (lambda ()
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "hook processing thread " event-name
- " exception: " exn))
- #:unwind? #t)
- (sleep 10))
- (lambda ()
- (with-throw-handler #t
+ (define (thread-pool-process-events event-name handler thread-count)
+ (let ((thread-pool
+ (make-thread-pool
+ thread-count
+ #:name (simple-format #f "~A" event-name)))
+ (reusable-condition
+ (make-reusable-condition))
+ (coordination-channel
+ (make-channel)))
+
+ (metric-set process-events-thread-pool-thread-total-metric
+ thread-count
+ #:label-values `((event . ,event-name)))
+
+ (spawn-fiber
+ (lambda ()
+ (let loop ((running-ids '()))
+ (metric-set process-events-thread-pool-used-thread-total-metric
+ (vector-count
+ (lambda (_ proc)
+ (->bool proc))
+ (thread-pool-proc-vector thread-pool))
+ #:label-values `((event . ,event-name)))
+ (match (get-message coordination-channel)
+ (('process id event arguments)
+ (if (member id running-ids)
+ ;; Ignore already running jobs
+ (loop running-ids)
+ (begin
+ (spawn-fiber
+ (lambda ()
+ (call-with-thread
+ thread-pool
+ (lambda ()
+ (process-event id event arguments handler))
+ ;; TODO Make this the default through knots
+ #:timeout #f)
+ (put-message coordination-channel
+ (list 'finished id))))
+ (loop (cons id running-ids)))))
+ (('finished id)
+ (when (< (length running-ids)
+ (* 2 thread-count))
+ (signal-reusable-condition! reusable-condition))
+ (loop (delete id running-ids)))
+ (('count-running reply)
+ (let ((count (length running-ids)))
+ (spawn-fiber
(lambda ()
- (while #t
- (match (datastore-list-unprocessed-hook-events datastore event-name 1)
- (()
- (wait-condition-variable condvar mtx))
- (((id event arguments))
- (process-event id event arguments handler)))))
- (lambda (key . args)
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "error in " event-name " hook processing thread: " key " " args)
- (backtrace))))
- #:unwind? #t))))
- condvar))
+ (put-message reply count))))
+ (loop running-ids))))))
- (define (work-queue-process-events event-name handler thread-count)
- (let-values (((pool-mutex job-available count-threads list-jobs)
- (create-thread-pool
- (lambda ()
- (max
- 1
- (length
- (datastore-list-unprocessed-hook-events
- datastore
- event-name
- thread-count))))
- (lambda (running-jobs)
- (let* ((in-progress-ids
- (map car running-jobs))
- (potential-jobs
- (datastore-list-unprocessed-hook-events
- datastore
- event-name
- (+ 1 (length in-progress-ids)))))
- (find
- (match-lambda
- ((id rest ...)
- (not (member id in-progress-ids))))
- potential-jobs)))
- (lambda (id event arguments)
- (process-event id event arguments handler))
- #:name (symbol->string event-name))))
- job-available))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let ((count
+ (let ((channel (make-channel)))
+ (put-message coordination-channel
+ (list 'count-running channel))
+ (get-message channel))))
+ (when (< count (* 2 thread-count))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG "submitting batch of " event-name " hook events")
+ (for-each
+ (match-lambda
+ ((id event arguments)
+ (put-message coordination-channel
+ (list 'process id event arguments))))
+ (datastore-list-unprocessed-hook-events
+ datastore
+ event-name
+ (* 20 thread-count)))))
+ (reusable-condition-wait reusable-condition
+ #:timeout 60))))
+
+ reusable-condition))
(map
(match-lambda
@@ -1264,25 +1715,30 @@
(or (and=>
(assq-ref parallel-hooks event-name)
(lambda (thread-count)
- (work-queue-process-events event-name
- handler
- thread-count)))
+ (thread-pool-process-events event-name
+ handler
+ thread-count)))
(single-thread-process-events event-name
handler)))))
(build-coordinator-hooks build-coordinator)))
-(define (fetch-builds build-coordinator agent systems
- max-builds deprecated-requested-count)
+(define (fetch-builds build-coordinator agent systems max-builds)
(define datastore
(build-coordinator-datastore build-coordinator))
(define (allocate-one-build agent-id)
- (let ((build-details (datastore-fetch-build-to-allocate datastore agent-id)))
+ (let ((build-details (build-coordinator-fetch-build-to-allocate
+ build-coordinator agent-id)))
(if build-details
(let ((build-id (assq-ref build-details 'uuid)))
- (datastore-insert-to-allocated-builds datastore agent-id (list build-id))
- (datastore-remove-builds-from-plan datastore (list build-id))
- build-details)
+ (datastore-insert-to-allocated-builds
+ datastore
+ agent-id
+ build-id)
+ (build-coordinator-remove-build-from-allocation-plan
+ build-coordinator build-id)
+ `(,@build-details
+ (submit_outputs . null)))
#f)))
(define (allocate-several-builds agent-id count)
@@ -1306,42 +1762,68 @@
(datastore-list-agent-builds datastore agent))
(start-count
(length initially-allocated-builds))
- (target-count (or max-builds
- (+ start-count
- deprecated-requested-count))))
+ (target-count max-builds))
(if (< start-count target-count)
(let ((new-builds
(allocate-several-builds agent
(- target-count start-count))))
- (unless (null? new-builds)
- (let ((allocation-plan-metric
- (metrics-registry-fetch-metric
- (slot-ref datastore 'metrics-registry)
- "build_allocation_plan_total")))
- (for-each
- (match-lambda
- ((agent-id . count)
- (metric-set allocation-plan-metric
- count
- #:label-values
- `((agent_id . ,agent-id)))))
- (datastore-count-build-allocation-plan-entries datastore))))
-
- ;; Previously allocate builds just returned newly allocated
- ;; builds, but if max-builds is provided, return all the
- ;; builds. This means the agent can handle this in a idempotent
- ;; manor.
- (if max-builds
- (append initially-allocated-builds
- new-builds)
- new-builds))
- ;; Previously allocate builds just returned newly allocated builds,
- ;; but if max-builds is provided, return all the builds. This means
- ;; the agent can handle this in a idempotent manor.
- (if max-builds
- initially-allocated-builds
- '()))))
- #:duration-metric-name "allocate_builds_to_agent"))
+ (if (null? new-builds)
+ (values initially-allocated-builds
+ #f)
+ (values (append initially-allocated-builds
+ new-builds)
+ #t)))
+ (values initially-allocated-builds
+ #f))))
+ #:duration-metric-name "allocate_builds_to_agent"
+ #:duration-metric-buckets %command-duration-histogram-buckets))
+
+ (define (send-agent-builds-allocated-event builds)
+ (build-coordinator-send-event
+ build-coordinator
+ "agent-builds-allocated"
+ `((agent_id . ,agent)
+ (builds . ,(list->vector
+ (map
+ (lambda (build)
+ `(,@build
+ (tags
+ . ,(list->vector
+ (map
+ (match-lambda
+ ((key . value)
+ `((key . ,key)
+ (value . ,value))))
+ (vector->list
+ (datastore-fetch-build-tags
+ datastore
+ (assq-ref build 'uuid))))))))
+ builds))))))
+
+ (define (submit-outputs? build)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "build-submit-outputs hook raised exception")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
+ (lambda ()
+ (let ((hook-result
+ (call-with-delay-logging
+ (lambda ()
+ (build-submit-outputs-hook
+ build-coordinator
+ (assq-ref build 'uuid))))))
+ (if (boolean? hook-result)
+ hook-result
+ (begin
+ (log-msg
+ (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "build-submit-outputs hook returned non boolean: "
+ hook-result)
+ #t))))))
(call-with-duration-metric
(build-coordinator-metrics-registry build-coordinator)
@@ -1357,111 +1839,42 @@
(trigger-build-allocation build-coordinator)))
(let ((builds
- (get-builds)))
-
- (build-coordinator-send-event
- build-coordinator
- "agent-builds-allocated"
- `((agent_id . ,agent)
- (builds . ,(list->vector
- (map
- (lambda (build)
- `(,@build
- (tags
- . ,(list->vector
- (map
- (match-lambda
- ((key . value)
- `((key . ,key)
- (value . ,value))))
- (vector->list
- (datastore-fetch-build-tags
- datastore
- (assq-ref build 'uuid))))))))
- builds)))))
-
- (map (lambda (build)
- (define submit-outputs?
- (with-exception-handler
- (lambda (exn)
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook raised exception: "
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (let ((hook-result
- (call-with-delay-logging
- (lambda ()
- (build-submit-outputs-hook
- build-coordinator
- (assq-ref build 'uuid))))))
- (if (boolean? hook-result)
- hook-result
- (begin
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook returned non boolean: "
- hook-result)
- #t))))
- (lambda (key . args)
- (backtrace))))
- #:unwind? #t))
-
- `(,@build
- ;; TODO This needs reconsidering when things having been built in
- ;; the past doesn't necessarily mean they're still available.
- (submit_outputs . ,submit-outputs?)))
- builds)))))))
+ new-builds-allocated?
+ (if (= 0
+ (build-coordinator-count-allocation-plan-builds
+ build-coordinator
+ agent))
+ (values
+ (datastore-list-agent-builds datastore agent)
+ #f)
+ (get-builds))))
+
+ (when new-builds-allocated?
+ (send-agent-builds-allocated-event builds))
+
+ (map
+ (lambda (build)
+ (if (eq? 'null (assq-ref build 'submit_outputs))
+ (let ((submit-outputs? (submit-outputs? build)))
+ (datastore-update-allocated-build-submit-outputs
+ (build-coordinator-datastore build-coordinator)
+ (assq-ref build 'uuid)
+ submit-outputs?)
+
+ `(,@(alist-delete 'submit_outputs build)
+ (submit_outputs . ,submit-outputs?)))
+ build))
+ builds)))))))
(define (agent-details build-coordinator agent-id)
(define datastore
(build-coordinator-datastore build-coordinator))
- (define build-submit-outputs-hook
- (assq-ref (build-coordinator-hooks build-coordinator)
- 'build-submit-outputs))
-
- (define (submit-outputs? build)
- (with-exception-handler
- (lambda (exn)
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook raised exception: "
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (let ((hook-result
- (call-with-delay-logging
- (lambda ()
- (build-submit-outputs-hook
- build-coordinator
- (assq-ref build 'uuid))))))
- (if (boolean? hook-result)
- hook-result
- (begin
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook returned non boolean: "
- hook-result)
- #t))))
- (lambda (key . args)
- (backtrace))))
- #:unwind? #t))
-
(let ((agent (datastore-find-agent datastore agent-id))
(allocated-builds (datastore-list-agent-builds datastore agent-id)))
`(,@agent ; description
- (builds . ,(list->vector
- (map (lambda (build)
- `(,@build
- (submit_outputs . ,(submit-outputs? build))))
- allocated-builds))))))
+ (builds . ,(list->vector allocated-builds)))))
(define (build-data-location build-id )
(string-append (%config 'builds-dir) "/"
@@ -1610,10 +2023,11 @@
(list build-id))
(when success?
- (datastore-delete-relevant-outputs-from-unbuilt-outputs
+ (datastore-insert-background-job
datastore
- build-id)
- (datastore-update-unprocessed-builds-for-build-success
+ 'build-success
+ (list build-id))
+ (datastore-delete-relevant-outputs-from-unbuilt-outputs
datastore
build-id)
(datastore-store-output-metadata
@@ -1623,7 +2037,8 @@
(assoc-ref result-json "outputs"))))
#f))))
- #:duration-metric-name "store_build_result")))
+ #:duration-metric-name "store_build_result"
+ #:duration-metric-buckets %command-duration-histogram-buckets)))
(when exception
;; Raise the exception here to avoid aborting the transaction
(raise-exception exception)))
@@ -1637,6 +2052,11 @@
'build-success
'build-failure))
+ (when success?
+ (build-coordinator-trigger-background-job-processing
+ build-coordinator
+ 'build-success))
+
(build-coordinator-send-event
build-coordinator
(if success?
@@ -1649,7 +2069,8 @@
;; could change the allocation
(trigger-build-allocation build-coordinator)
- #t))))
+ #t))
+ #:buckets %command-duration-histogram-buckets))
(define (handle-build-start-report build-coordinator
agent-id
@@ -1668,7 +2089,8 @@
build-coordinator
'build-started
`((build_id . ,build-id)
- (agent_id . ,agent-id))))))
+ (agent_id . ,agent-id))))
+ #:buckets %command-duration-histogram-buckets))
(define (handle-setup-failure-report build-coordinator
agent-id build-id report-json)
@@ -1704,3 +2126,142 @@
;; Trigger build allocation, so that the allocator can handle this setup
;; failure
(trigger-build-allocation build-coordinator))
+
+(define (build-coordinator-trigger-background-job-processing
+ build-coordinator
+ type)
+ (let ((condition
+ (assq-ref (build-coordinator-background-job-conditions
+ build-coordinator)
+ type)))
+ (unless condition
+ (error
+ (simple-format #f "unknown condition ~A" type)))
+ (signal-reusable-condition! condition)))
+
+(define (start-background-job-processing-fibers build-coordinator)
+ (define %background-job-duration-histogram-buckets
+ (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf)))
+
+ (define* (start-job-fibers type proc #:key (parallelism 1))
+ (let ((coordination-channel
+ (make-channel))
+ (condition
+ (make-reusable-condition))
+ (process-in-fiber
+ (fiberize
+ (lambda args
+ (call-with-duration-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "coordinator_background_job_duration_seconds"
+ (lambda ()
+ (call-with-delay-logging proc #:args args))
+ #:labels '(name)
+ #:label-values `((name . ,type))
+ #:buckets %background-job-duration-histogram-buckets))
+ #:parallelism parallelism))
+ (job-exception-counter-metric
+ (make-counter-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "coordinator_background_job_failures_total"
+ #:labels '(name))))
+
+ (define (process id . args)
+ (spawn-fiber
+ (lambda ()
+ (let loop ((retry 0))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG
+ "processing " type " background job (id: "
+ id ", args: " args ", retry: " retry ")")
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'WARN
+ type " background job error (id: "
+ id "): " exn)
+ #f)
+ (lambda ()
+ (apply process-in-fiber args))
+ #:unwind? #t)))
+ (if success?
+ (begin
+ (datastore-delete-background-job
+ (build-coordinator-datastore build-coordinator)
+ id)
+ (put-message coordination-channel
+ (list 'job-finished id)))
+ (begin
+ (metric-increment job-exception-counter-metric
+ #:label-values `((name . ,type)))
+ (sleep 30)
+ (loop (+ 1 retry)))))))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let ((job-details
+ (datastore-select-background-jobs
+ (build-coordinator-datastore build-coordinator)
+ type
+ #:limit (* 2 parallelism))))
+
+ (unless (null? job-details)
+ (put-message coordination-channel
+ (list 'process job-details)))
+
+ (reusable-condition-wait condition
+ #:timeout 30)))))
+
+ (spawn-fiber
+ (lambda ()
+ (let loop ((running-job-ids '()))
+ (match (get-message coordination-channel)
+ (('process jobs)
+ (let* ((job-ids (map (lambda (job)
+ (assq-ref job 'id))
+ jobs))
+ (new-ids
+ (lset-difference = job-ids running-job-ids))
+ (jobs-to-start
+ (take new-ids
+ (min
+ (- parallelism
+ (length running-job-ids))
+ (length new-ids)))))
+ (for-each (lambda (job)
+ (apply process
+ (assq-ref job 'id)
+ (assq-ref job 'args)))
+ (filter
+ (lambda (job-details)
+ (member (assq-ref job-details 'id)
+ jobs-to-start))
+ jobs))
+ (loop (append running-job-ids
+ jobs-to-start))))
+ (('job-finished id)
+ ;; Maybe not very efficient, but should work
+ (signal-reusable-condition! condition)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG type
+ " background job " id
+ " finished successfully")
+ (loop (delete id running-job-ids)))))))
+
+ condition))
+
+ `((build-success . ,(start-job-fibers
+ 'build-success
+ (lambda (build-id)
+ (datastore-update-unprocessed-builds-for-build-success
+ (build-coordinator-datastore build-coordinator)
+ build-id))
+ #:parallelism 24))))
+
+(define (build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built
+ build-coordinator)
+ (datastore-check-and-correct-unprocessed-builds-all-inputs-built
+ (build-coordinator-datastore build-coordinator)
+ #:progress-reporter progress-reporter/bar))
diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm
index a29a993..5768630 100644
--- a/guix-build-coordinator/datastore.scm
+++ b/guix-build-coordinator/datastore.scm
@@ -6,7 +6,8 @@
#:use-module (guix-build-coordinator datastore postgresql)
#:duplicates (merge-generics)
#:export (database-uri->datastore
- datastore-find-build-output))
+ datastore-find-build-output
+ datastore-validate-datetime-string))
(re-export datastore-optimize)
(re-export datastore-spawn-fibers)
@@ -68,7 +69,7 @@
(re-export datastore-count-builds-for-derivation)
(re-export datastore-list-processed-builds)
(re-export datastore-list-unprocessed-builds)
-(re-export datastore-find-first-unallocated-deferred-build)
+(re-export datastore-find-deferred-build)
(re-export datastore-fetch-prioritised-unprocessed-builds)
(re-export datastore-insert-unprocessed-hook-event)
(re-export datastore-count-unprocessed-hook-events)
@@ -86,29 +87,30 @@
(re-export datastore-list-builds-for-output)
(re-export datastore-list-builds-for-output-and-system)
(re-export datastore-agent-for-build)
-(re-export datastore-count-build-allocation-plan-entries)
-(re-export datastore-replace-build-allocation-plan)
-(re-export datastore-remove-build-from-allocation-plan)
(re-export datastore-count-allocated-builds)
(re-export datastore-agent-requested-systems)
(re-export datastore-update-agent-requested-systems)
(re-export datastore-fetch-build-to-allocate)
+(re-export datastore-check-if-derivation-conflicts?)
(re-export datastore-insert-to-allocated-builds)
-(re-export datastore-remove-builds-from-plan)
-(re-export datastore-list-allocation-plan-builds)
+(re-export datastore-update-allocated-build-submit-outputs)
+(re-export datastore-insert-background-job)
+(re-export datastore-delete-background-job)
+(re-export datastore-select-background-jobs)
+(re-export datastore-check-and-correct-unprocessed-builds-all-inputs-built)
(define* (database-uri->datastore database
#:key
metrics-registry
- worker-thread-log-exception?)
+ thread-pool-log-exception?)
(cond
((string-prefix? "pg://" database)
(postgresql-datastore database))
((string-prefix? "sqlite://" database)
(sqlite-datastore database
#:metrics-registry metrics-registry
- #:worker-thread-log-exception?
- worker-thread-log-exception?))
+ #:thread-pool-log-exception?
+ thread-pool-log-exception?))
(else
(error
(simple-format #f "Unknown database ~A" database)))))
@@ -123,3 +125,6 @@
(assq-ref output 'output)
#f))
outputs)))
+
+(define (datastore-validate-datetime-string s)
+ (strftime "%F %T" (car (strptime "%F %T" s))))
diff --git a/guix-build-coordinator/datastore/abstract.scm b/guix-build-coordinator/datastore/abstract.scm
index 68fc654..799485e 100644
--- a/guix-build-coordinator/datastore/abstract.scm
+++ b/guix-build-coordinator/datastore/abstract.scm
@@ -9,8 +9,7 @@
datastore-new-agent
datastore-new-agent-password
datastore-list-agent-builds
- datastore-agent-password-exists?
- datastore-list-allocation-plan-builds))
+ datastore-agent-password-exists?))
(define-class <abstract-datastore> ())
@@ -24,5 +23,3 @@
(define-generic datastore-agent-password-exists?)
(define-generic datastore-agent-list-unprocessed-builds)
(define-generic datastore-list-agent-builds)
-(define-generic datastore-agent-replace-build-allocation-plan)
-(define-generic datastore-list-allocation-plan-builds)
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index e67a940..bb1d8e8 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -10,8 +10,11 @@
#:use-module (ice-9 exceptions)
#:use-module (sqlite3)
#:use-module (fibers)
+ #:use-module (knots parallelism)
+ #:use-module (knots thread-pool)
#:use-module (prometheus)
#:use-module (guix base16)
+ #:use-module (guix progress)
#:use-module (guix derivations)
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator utils fibers)
@@ -87,7 +90,7 @@
datastore-replace-agent-tags
datastore-list-processed-builds
datastore-list-unprocessed-builds
- datastore-find-first-unallocated-deferred-build
+ datastore-find-deferred-build
datastore-fetch-prioritised-unprocessed-builds
datastore-insert-unprocessed-hook-event
datastore-count-unprocessed-hook-events
@@ -96,28 +99,37 @@
datastore-delete-unprocessed-hook-event
datastore-list-agent-builds
datastore-agent-for-build
- datastore-count-build-allocation-plan-entries
datastore-replace-build-allocation-plan
- datastore-remove-build-from-allocation-plan
datastore-count-allocated-builds
datastore-agent-requested-systems
datastore-update-agent-requested-systems
datastore-fetch-build-to-allocate
+ datastore-check-if-derivation-conflicts?
datastore-insert-to-allocated-builds
- datastore-remove-builds-from-plan
- datastore-list-allocation-plan-builds))
+ datastore-update-allocated-build-submit-outputs
+ datastore-insert-background-job
+ datastore-delete-background-job
+ datastore-select-background-jobs
+ datastore-check-and-correct-unprocessed-builds-all-inputs-built))
+
+(define %transaction-duration-histogram-buckets
+ (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf)))
(define-class <sqlite-datastore> (<abstract-datastore>)
database-file
- worker-reader-thread-channel
- worker-writer-thread-channel
+ reader-thread-pool
+ writer-thread-pool
+ low-priority-writer-thread-channel
+ default-priority-writer-thread-channel
+ high-priority-writer-thread-channel
+ writer-thread-channel-queue-stats
metrics-registry)
(define* (sqlite-datastore database-uri
#:key
update-database?
metrics-registry
- worker-thread-log-exception?)
+ thread-pool-log-exception?)
(define database-file
(string-drop database-uri
(string-length "sqlite://")))
@@ -132,7 +144,8 @@
(let ((db (db-open database-file)))
(sqlite-exec db "PRAGMA journal_mode=WAL;")
(sqlite-exec db "PRAGMA optimize;")
- (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")
+ (with-time-logging "truncating the WAL"
+ (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);"))
(sqlite-close db))
(let ((datastore (make <sqlite-datastore>)))
@@ -140,141 +153,164 @@
(slot-set! datastore 'database-file database-file)
(slot-set! datastore 'metrics-registry metrics-registry)
- (slot-set!
- datastore
- 'worker-writer-thread-channel
- (make-worker-thread-channel
- (lambda ()
- (let ((db
- (db-open database-file)))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
- (sqlite-exec db "PRAGMA synchronous = NORMAL;")
- (sqlite-exec db "PRAGMA temp_store = MEMORY;")
- (sqlite-exec db "PRAGMA foreign_keys = ON;")
-
- (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;")
-
- (sqlite-exec
- db
- "
-CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
- build_id INTEGER NOT NULL,
- agent_id TEXT NOT NULL,
- ordering INTEGER NOT NULL,
- PRIMARY KEY (agent_id, build_id)
-);")
-
- (list db)))
- #:name "ds write"
- #:destructor
- (let ((writer-thread-destructor-counter
- (make-gauge-metric metrics-registry
- "datastore_writer_thread_close_total")))
- (lambda (db)
- (db-optimize db
- database-file
- metrics-registry
- #:maybe-truncate-wal? #f)
-
- (metric-increment writer-thread-destructor-counter)
- (sqlite-close db)))
- #:lifetime 500
- #:expire-on-exception? #t
-
- ;; SQLite doesn't support parallel writes
- #:parallelism 1
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
- metrics-registry
- "datastore_write_delay_seconds")))
- (lambda (seconds-delayed)
- (metric-observe delay-metric
- ;; TODO exact->inexact to work around
- ;; a bug in guile-prometheus where
- ;; the metric sum will output in the
- ;; exact form including the /q
- (exact->inexact seconds-delayed))
- (log-delay "datastore write" seconds-delayed)
- (when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database write delayed by ~1,2f seconds~%"
- seconds-delayed))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 10)
- (format
- (current-error-port)
- "warning: database write took ~1,2f seconds (~a)~%"
- duration
- proc)))
- #:log-exception? worker-thread-log-exception?))
-
- ;; Make sure the worker thread has initialised, and created the in memory
- ;; tables
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
- (const #t))
-
- (slot-set!
- datastore
- 'worker-reader-thread-channel
- (make-worker-thread-channel
- (lambda ()
- (let ((db
- (db-open database-file #:write? #f)))
- (sqlite-exec db "PRAGMA temp_store = MEMORY;")
- (sqlite-exec db "PRAGMA busy_timeout = 4000;")
- (sqlite-exec db "PRAGMA cache_size = -16000;")
-
- (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;")
-
- (list db)))
- #:name "ds read"
- #:destructor
- (let ((reader-thread-destructor-counter
- (make-gauge-metric metrics-registry
- "datastore_reader_thread_close_total")))
- (lambda (db)
- (metric-increment reader-thread-destructor-counter)
- (sqlite-close db)))
- #:lifetime 50000
- #:expire-on-exception? #t
-
- ;; Use a minimum of 8 and a maximum of 16 threads
- #:parallelism
- (min (max (current-processor-count)
- 8)
- 16)
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
+ (let ((writer-thread-pool
+ (make-thread-pool
+ ;; SQLite doesn't support parallel writes
+ 1
+ #:thread-initializer
+ (lambda ()
+ (let ((db
+ (db-open database-file)))
+ (sqlite-exec db "PRAGMA busy_timeout = 0;")
+ (sqlite-exec db "PRAGMA synchronous = NORMAL;")
+ (sqlite-exec db "PRAGMA temp_store = MEMORY;")
+ (sqlite-exec db "PRAGMA foreign_keys = ON;")
+
+ (list db)))
+ #:name "ds write"
+ #:thread-destructor
+ (let ((writer-thread-destructor-counter
+ (make-gauge-metric metrics-registry
+ "datastore_writer_thread_close_total")))
+ (lambda (db)
+ (db-optimize db
+ database-file
metrics-registry
- "datastore_read_delay_seconds")))
- (lambda (seconds-delayed)
- (metric-observe delay-metric
- ;; TODO exact->inexact to work around
- ;; a bug in guile-prometheus where
- ;; the metric sum will output in the
- ;; exact form including the /q
- (exact->inexact seconds-delayed))
- (log-delay "datastore read" seconds-delayed)
- (when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database read delayed by ~1,2f seconds~%"
- seconds-delayed))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 30)
- (format
- (current-error-port)
- "warning: database read took ~1,2f seconds (~a)~%"
- duration
- proc)))
- #:log-exception? worker-thread-log-exception?))
+ #:maybe-truncate-wal? #f)
+
+ (metric-increment writer-thread-destructor-counter)
+ (sqlite-close db)))
+ #:thread-lifetime 500
+ #:expire-on-exception? #t
+
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "datastore_write_delay_seconds"
+ #:buckets
+ %transaction-duration-histogram-buckets)))
+ (lambda (seconds-delayed proc)
+ (metric-observe delay-metric
+ ;; TODO exact->inexact to work around
+ ;; a bug in guile-prometheus where
+ ;; the metric sum will output in the
+ ;; exact form including the /q
+ (exact->inexact seconds-delayed))
+ (log-delay "datastore write" seconds-delayed)
+ (when (> seconds-delayed 1)
+ (format/safe
+ (current-error-port)
+ "warning: database write delayed by ~1,2f seconds~%"
+ seconds-delayed))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 10)
+ (format/safe
+ (current-error-port)
+ "warning: database write took ~1,2f seconds (~a)~%"
+ duration
+ proc)))
+ #:log-exception? thread-pool-log-exception?)))
+
+ (slot-set! datastore
+ 'writer-thread-pool
+ writer-thread-pool)
+ ;; This is changed in datastore-spawn-fibers
+ (slot-set! datastore
+ 'low-priority-writer-thread-channel
+ (thread-pool-channel writer-thread-pool))
+ (slot-set! datastore
+ 'default-priority-writer-thread-channel
+ (thread-pool-channel writer-thread-pool))
+ (slot-set! datastore
+ 'high-priority-writer-thread-channel
+ (thread-pool-channel writer-thread-pool)))
+
+ (let ((reader-thread-pool
+ (make-thread-pool
+ ;; Use a minimum of 8 and a maximum of 12 threads
+ (min (max (current-processor-count)
+ 8)
+ 12)
+ #:thread-initializer
+ (lambda ()
+ (let ((db
+ (db-open database-file #:write? #f)))
+ (sqlite-exec db "PRAGMA temp_store = MEMORY;")
+ (sqlite-exec db "PRAGMA busy_timeout = 0;")
+ (sqlite-exec db "PRAGMA cache_size = -16000;")
+
+ (list db)))
+ #:thread-destructor
+ (let ((reader-thread-destructor-counter
+ (make-gauge-metric metrics-registry
+ "datastore_reader_thread_close_total")))
+ (lambda (db)
+ (metric-increment reader-thread-destructor-counter)
+ (sqlite-close db)))
+ #:name "ds read"
+ #:thread-lifetime 50000
+ #:expire-on-exception? #t
+
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "datastore_read_delay_seconds"
+ #:buckets
+ %transaction-duration-histogram-buckets)))
+ (lambda (seconds-delayed proc)
+ (metric-observe delay-metric
+ ;; TODO exact->inexact to work around
+ ;; a bug in guile-prometheus where
+ ;; the metric sum will output in the
+ ;; exact form including the /q
+ (exact->inexact seconds-delayed))
+ (log-delay "datastore read" seconds-delayed)
+ (when (> seconds-delayed 1)
+ (format/safe
+ (current-error-port)
+ "warning: database read delayed by ~1,2f seconds~%"
+ seconds-delayed))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 30)
+ (format/safe
+ (current-error-port)
+ "warning: database read took ~1,2f seconds (~a)~%"
+ duration
+ proc)))
+ #:log-exception? thread-pool-log-exception?)))
+
+ (let ((metric (make-gauge-metric
+ metrics-registry
+ "datastore_reader_threads_total")))
+ (metric-set metric
+ (vector-length
+ (thread-pool-proc-vector reader-thread-pool))))
+
+
+ (slot-set! datastore
+ 'reader-thread-pool
+ reader-thread-pool))
datastore))
+(define* (call-with-writer-thread
+ datastore
+ proc
+ #:key priority duration-logger)
+ (call-with-thread
+ (slot-ref datastore 'writer-thread-pool)
+ proc
+ #:duration-logger duration-logger
+ #:channel
+ (slot-ref datastore
+ (assq-ref
+ '((high . high-priority-writer-thread-channel)
+ (default . default-priority-writer-thread-channel)
+ (low . low-priority-writer-thread-channel))
+ (or priority 'default)))))
+
(define (sqlite-step-and-reset statement)
(let ((val (sqlite-step statement)))
(sqlite-reset statement)
@@ -305,11 +341,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
metrics-registry
checkpoint-duration-metric-name
(lambda ()
- (if (> (wal-size) extreme-wal-size-threshold)
- ;; Since the WAL is really getting too big, wait for much longer
- (sqlite-exec db "PRAGMA busy_timeout = 300000;")
- (sqlite-exec db "PRAGMA busy_timeout = 20;"))
-
(let* ((statement
(sqlite-prepare
db
@@ -320,19 +351,17 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan (
(#(blocked? modified-page-count pages-moved-to-db)
(if (= blocked? 1)
(begin
- (simple-format
+ (simple-format/safe
(current-error-port)
"warning: wal checkpoint blocked\n")
#f)
(begin
- (simple-format
+ (simple-format/safe
(current-error-port)
"wal checkpoint completed (~A, ~A)\n"
modified-page-count
pages-moved-to-db)
#t))))))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
-
result)))
#t))
@@ -348,8 +377,8 @@ PRAGMA optimize;")
(define-method (datastore-optimize
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(db-optimize
db
@@ -358,15 +387,49 @@ PRAGMA optimize;")
(define-method (datastore-spawn-fibers
(datastore <sqlite-datastore>))
+ ;; Queue messages to the writer thread, so that they're handled in a first
+ ;; come first served manor
+ (let ((queues
+ queue-stats
+ (make-discrete-priority-queueing-channels
+ (thread-pool-channel
+ (slot-ref datastore 'writer-thread-pool))
+ 3)))
+ (match queues
+ ((high default low)
+ (slot-set! datastore 'high-priority-writer-thread-channel high)
+ (slot-set! datastore 'default-priority-writer-thread-channel default)
+ (slot-set! datastore 'low-priority-writer-thread-channel low)))
+ (slot-set! datastore
+ 'writer-thread-channel-queue-stats
+ queue-stats))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (sleep 20)
+ (let ((procs
+ (vector->list
+ (thread-pool-proc-vector
+ (slot-ref datastore 'reader-thread-pool)))))
+ (when (every procedure? procs)
+ (for-each
+ (lambda (i proc)
+ (simple-format/safe (current-error-port)
+ "reader thread ~A running: ~A\n"
+ i proc))
+ (iota (length procs))
+ procs))))))
+
(spawn-fiber
(lambda ()
(while #t
(sleep (* 60 10)) ; 10 minutes
(with-exception-handler
(lambda (exn)
- (simple-format (current-error-port)
- "exception when performing WAL checkpoint: ~A\n"
- exn))
+ (simple-format/safe (current-error-port)
+ "exception when performing WAL checkpoint: ~A\n"
+ exn))
(lambda ()
(with-time-logging
"performing regular database maintenance"
@@ -391,11 +454,18 @@ PRAGMA optimize;")
(let ((setup-failures-total
(make-gauge-metric registry
"setup_failures_total"
- #:labels '(agent_id reason))))
-
- (letpar& ((setup-failure-counts
- (with-time-logging "counting setup failures"
- (datastore-count-setup-failures datastore))))
+ #:labels '(agent_id reason)))
+ (background-jobs-inserted-total
+ (make-counter-metric registry
+ "coordinator_background_job_inserted_total"
+ #:labels '(name))))
+
+ (fibers-let ((setup-failure-counts
+ (with-time-logging "counting setup failures"
+ (datastore-count-setup-failures datastore)))
+ (background-job-counts
+ (with-time-logging "counting background jobs"
+ (datastore-count-background-jobs datastore))))
(for-each (match-lambda
(((agent-id reason) . count)
@@ -404,7 +474,14 @@ PRAGMA optimize;")
#:label-values
`((agent_id . ,agent-id)
(reason . ,reason)))))
- setup-failure-counts)))
+ setup-failure-counts)
+
+ (for-each (match-lambda
+ ((type . count)
+ (metric-increment background-jobs-inserted-total
+ #:by count
+ #:label-values `((name . ,type)))))
+ background-job-counts)))
#t)
(define-method (datastore-update-metrics!
@@ -437,12 +514,38 @@ PRAGMA optimize;")
"datastore_wal_bytes")
(make-gauge-metric
registry "datastore_wal_bytes"
- #:docstring "Size of the SQLite Write Ahead Log file"))))
+ #:docstring "Size of the SQLite Write Ahead Log file")))
+ (reader-threads-used-metric
+ (or (metrics-registry-fetch-metric registry
+ "datastore_reader_threads_used_total")
+ (make-gauge-metric
+ registry "datastore_reader_threads_used_total")))
+ (writer-queue-length-metric
+ (or (metrics-registry-fetch-metric registry
+ "datastore_writer_queue_total")
+ (make-gauge-metric
+ registry "datastore_writer_queue_total"
+ #:labels '(priority)))))
- (letpar& ((build-counts
- (datastore-count-builds datastore))
- (build-result-counts
- (datastore-count-build-results datastore)))
+ (metric-set reader-threads-used-metric
+ (vector-count
+ (lambda (_ proc)
+ (->bool proc))
+ (thread-pool-proc-vector
+ (slot-ref datastore 'reader-thread-pool))))
+
+ (for-each
+ (lambda (priority stats)
+ (metric-set writer-queue-length-metric
+ (assq-ref stats 'length)
+ #:label-values `((priority . ,priority))))
+ '(high default low)
+ ((slot-ref datastore 'writer-thread-channel-queue-stats)))
+
+ (fibers-let ((build-counts
+ (datastore-count-builds datastore))
+ (build-result-counts
+ (datastore-count-build-results datastore)))
(for-each (match-lambda
((system . count)
(metric-set builds-total
@@ -475,9 +578,10 @@ PRAGMA optimize;")
internal-time-units-per-second))
(apply values vals)))))
-(define (metric-observe-duration datastore
- thing
- duration-seconds)
+(define* (metric-observe-duration datastore
+ thing
+ duration-seconds
+ #:key (buckets %default-histogram-buckets))
(define registry (slot-ref datastore 'metrics-registry))
(define metric-name
(string-append "datastore_" thing "_duration_seconds"))
@@ -485,15 +589,25 @@ PRAGMA optimize;")
(let ((metric
(or (metrics-registry-fetch-metric registry metric-name)
(make-histogram-metric registry
- metric-name))))
+ metric-name
+ #:buckets buckets))))
(metric-observe metric duration-seconds)))
-(define (call-with-worker-thread/delay-logging channel proc)
- (call-with-worker-thread channel
+(define (call-with-thread/delay-logging thread-pool proc)
+ (call-with-thread thread-pool
+ proc
+ #:duration-logger
+ (lambda (duration)
+ (log-delay proc duration))))
+
+(define* (call-with-writer-thread/delay-logging datastore proc
+ #:key priority)
+ (call-with-writer-thread datastore
proc
#:duration-logger
(lambda (duration)
- (log-delay proc duration))))
+ (log-delay proc duration))
+ #:priority priority))
(define-exception-type &transaction-rollback-exception &exception
make-transaction-rollback-exception
@@ -507,21 +621,24 @@ PRAGMA optimize;")
#:key
readonly?
(immediate? (not readonly?))
- duration-metric-name)
+ priority
+ duration-metric-name
+ (duration-metric-buckets
+ %transaction-duration-histogram-buckets))
(define (run-proc-within-transaction db)
(define (attempt-begin)
(with-exception-handler
(lambda (exn)
(match (exception-args exn)
(('sqlite-exec 5 msg)
- (simple-format
+ (simple-format/safe
(current-error-port)
"warning: issue starting transaction (code: 5, proc: ~A): ~A\n"
proc msg)
#f)
(_
- (simple-format (current-error-port)
- "exception starting transaction\n")
+ (simple-format/safe (current-error-port)
+ "exception starting transaction\n")
(raise-exception exn))))
(lambda ()
(sqlite-exec db (if immediate?
@@ -535,14 +652,14 @@ PRAGMA optimize;")
(lambda (exn)
(match (exception-args exn)
(('sqlite-exec 5 msg)
- (simple-format
+ (simple-format/safe
(current-error-port)
"warning: attempt commit (code: 5, proc: ~A): ~A\n"
proc msg)
#f)
(_
- (simple-format (current-error-port)
- "exception committing transaction\n")
+ (simple-format/safe (current-error-port)
+ "exception committing transaction\n")
(raise-exception exn))))
(lambda ()
(sqlite-exec db "COMMIT TRANSACTION;")
@@ -550,63 +667,88 @@ PRAGMA optimize;")
#:unwind? #t))
(if (attempt-begin)
- (call-with-values
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (if (transaction-rollback-exception? exn)
- (begin
- (sqlite-exec db "ROLLBACK TRANSACTION;")
- (transaction-rollback-exception-return-value exn))
- (begin
- (simple-format (current-error-port)
- "error: sqlite rolling back transaction (~A)\n"
- exn)
- (sqlite-exec db "ROLLBACK TRANSACTION;")
- (raise-exception exn))))
+ (with-exception-handler
+ (lambda (exn)
+ (if (transaction-rollback-exception? exn)
+ (begin
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (transaction-rollback-exception-return-value exn))
+ (begin
+ (simple-format/safe
+ (current-error-port)
+ "error: sqlite rolling back transaction (~A)\n"
+ exn)
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (raise-exception exn))))
+ (lambda ()
+ (call-with-values
(lambda ()
- (parameterize ((%current-transaction-proc proc))
- (call-with-delay-logging proc #:args (list db))))
- #:unwind? #t))
- (lambda vals
- (let loop ((success? (attempt-commit)))
- (if success?
- (apply values vals)
- (loop (attempt-commit))))))
+ (with-exception-handler
+ (lambda (exn)
+ (unless (transaction-rollback-exception? exn)
+ (backtrace))
+ (raise-exception exn))
+ (lambda ()
+ (parameterize ((%current-transaction-proc proc)
+ ;; Set the arguments parameter for the
+ ;; reader thread pool so that any nested
+ ;; calls to call-with-thread for the
+ ;; reader thread pool just use the writer
+ ;; db connection and thus this
+ ;; transaction
+ ((thread-pool-arguments-parameter
+ (slot-ref datastore 'reader-thread-pool))
+ (list db)))
+ (call-with-delay-logging proc #:args (list db))))))
+ (lambda vals
+ (let loop ((success? (attempt-commit)))
+ (if success?
+ (apply values vals)
+ (loop (attempt-commit)))))))
+ #:unwind? #t)
;; Database is busy, so retry
(run-proc-within-transaction db)))
- (call-with-worker-thread
+ (call-with-thread
(slot-ref datastore (if readonly?
- 'worker-reader-thread-channel
- 'worker-writer-thread-channel))
+ 'reader-thread-pool
+ 'writer-thread-pool))
(lambda (db)
(if (%current-transaction-proc)
(call-with-delay-logging proc #:args (list db)) ; already in transaction
(run-proc-within-transaction db)))
+ #:channel
+ (if readonly?
+ (thread-pool-channel
+ (slot-ref datastore 'reader-thread-pool))
+ (slot-ref datastore
+ (assq-ref
+ '((high . high-priority-writer-thread-channel)
+ (default . default-priority-writer-thread-channel)
+ (low . low-priority-writer-thread-channel))
+ (or priority 'default))))
#:duration-logger
(lambda (duration-seconds)
(when (and (not readonly?)
(> duration-seconds 2))
- (display
- (format
- #f
- "warning: ~a:\n took ~4f seconds in transaction\n"
- proc
- duration-seconds)
- (current-error-port))
-
- (when duration-metric-name
- (metric-observe-duration datastore
- duration-metric-name
- duration-seconds))))))
+ (format/safe
+ (current-error-port)
+ "warning: ~a:\n took ~4f seconds in transaction\n"
+ proc
+ duration-seconds))
+
+ (when duration-metric-name
+ (metric-observe-duration datastore
+ duration-metric-name
+ duration-seconds
+ #:buckets duration-metric-buckets)))))
(define-method (datastore-find-agent
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -634,8 +776,8 @@ SELECT description FROM agents WHERE id = :id"
(define-method (datastore-find-agent-by-name
(datastore <sqlite-datastore>)
name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -662,8 +804,8 @@ SELECT id FROM agents WHERE name = :name"
(define-method (datastore-insert-dynamic-auth-token
(datastore <sqlite-datastore>)
token)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -681,8 +823,8 @@ INSERT INTO dynamic_auth_tokens (token) VALUES (:token)"
(define-method (datastore-dynamic-auth-token-exists?
(datastore <sqlite-datastore>)
token)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -709,8 +851,8 @@ SELECT 1 FROM dynamic_auth_tokens WHERE token = :token"
(define-method (datastore-fetch-agent-tags
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -746,16 +888,16 @@ WHERE agent_tags.agent_id = :agent_id"
uuid
name
description)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(insert-agent db uuid name description)))
#t)
(define-method (datastore-list-agents
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -783,8 +925,8 @@ SELECT id, name, description, active FROM agents ORDER BY id"
(unless (boolean? active?)
(error "datastore-set-agent-active called with non-boolean"))
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -803,8 +945,8 @@ UPDATE agents SET active = :active WHERE id = :uuid"
(define-method (datastore-find-agent-status
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -838,8 +980,8 @@ WHERE agent_id = :agent_id"
1min-load-average
system-uptime
processor-count)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -875,8 +1017,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr
(datastore <sqlite-datastore>)
agent-uuid
password)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(insert-agent-password db agent-uuid password)))
#t)
@@ -885,8 +1027,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr
(datastore <sqlite-datastore>)
uuid
password)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -908,8 +1050,8 @@ WHERE agent_id = :agent_id AND password = :password"
(define-method (datastore-agent-list-passwords
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1006,6 +1148,7 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)"
(lambda (db)
(insert-derivation-and-return-outputs db derivation)
(hash-clear! %derivation-outputs-cache))
+ #:priority 'low
#:duration-metric-name "store_derivation")
#t)
@@ -1013,8 +1156,8 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)"
(define-method (datastore-build-exists-for-derivation-outputs?
(datastore <sqlite-datastore>)
derivation)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1046,8 +1189,8 @@ WHERE derivation_outputs.derivation_id = :derivation_id
(define-method (datastore-build-required-by-another?
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1141,8 +1284,8 @@ SELECT name FROM derivations WHERE id = :id"
(match (sqlite-step-and-reset statement)
(#(name) name))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let loop ((derivation-ids (list (db-find-derivation-id db derivation)))
(result '()))
@@ -1167,8 +1310,8 @@ SELECT name FROM derivations WHERE id = :id"
args)
(apply
(lambda* (system #:key include-cancelled?)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1256,8 +1399,8 @@ FROM (
(define-method (datastore-list-builds-for-derivation-recursive-inputs
(datastore <sqlite-datastore>)
derivation)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1294,8 +1437,8 @@ INNER JOIN related_derivations
(define-method (datastore-find-unprocessed-build-entry
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1324,8 +1467,8 @@ WHERE build_id = :build_id"
(datastore <sqlite-datastore>)
build-uuid
tags)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((insert-tag-statement
(sqlite-prepare
@@ -1525,8 +1668,8 @@ WHERE build_id = :build_id"
uuid
explicit-priority-lower-bound)
(define builds-to-consider
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
;; Recursively find builds for all missing outputs that this build
;; takes as inputs. The order is important here, since we want to
@@ -1676,8 +1819,8 @@ WHERE build_id = :build_id"
override-derived-priority)
(let ((build-id
old-priority
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((build-id
(db-find-build-id db uuid)))
@@ -1734,46 +1877,10 @@ WHERE build_id = :build_id"
#t)
rest))
-(define-method (datastore-remove-build-from-allocation-plan
- (datastore <sqlite-datastore>)
- uuid)
- (define (update-build-allocation-plan-metrics)
- (let ((allocation-plan-metric
- (metrics-registry-fetch-metric
- (slot-ref datastore 'metrics-registry)
- "build_allocation_plan_total")))
- (for-each
- (match-lambda
- ((agent-id . count)
- (metric-set allocation-plan-metric
- count
- #:label-values
- `((agent_id . ,agent-id)))))
- (datastore-count-build-allocation-plan-entries datastore))))
-
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (let ((statement (sqlite-prepare
- db
- "
-DELETE FROM build_allocation_plan WHERE build_id = :build_id"
- #:cache? #t)))
-
- (sqlite-bind-arguments
- statement
- #:build_id (db-find-build-id db uuid))
-
- (sqlite-step-and-reset statement)
-
- (unless (= 0 (changes-count db))
- (update-build-allocation-plan-metrics)))))
- #t)
-
(define-method (datastore-count-build-results
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1831,8 +1938,8 @@ VALUES (:agent_id, :result, 1)"
#t))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1883,14 +1990,12 @@ LIMIT 1"
(#f #t)
(#(1) #f))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (let ((builds-statement
- (sqlite-prepare
- db
- "
-SELECT DISTINCT unprocessed_builds.id
+ (define (all-build-ids db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT input_builds.id
FROM builds
INNER JOIN derivation_outputs
ON builds.derivation_id = derivation_outputs.derivation_id
@@ -1898,45 +2003,62 @@ INNER JOIN derivation_outputs AS all_derivation_outputs
ON all_derivation_outputs.output_id = derivation_outputs.output_id
INNER JOIN derivation_inputs
ON derivation_inputs.derivation_output_id = all_derivation_outputs.id
-INNER JOIN builds AS unprocessed_builds
- ON unprocessed_builds.processed = 0
- AND unprocessed_builds.derivation_id = derivation_inputs.derivation_id
-INNER JOIN unprocessed_builds_with_derived_priorities
- ON unprocessed_builds_with_derived_priorities.build_id = unprocessed_builds.id
- AND unprocessed_builds_with_derived_priorities.all_inputs_built = 0
+INNER JOIN builds AS input_builds
+ ON input_builds.processed = 0
+ AND input_builds.canceled = 0
+ AND input_builds.derivation_id = derivation_inputs.derivation_id
WHERE builds.id = :build_id"
- #:cache? #t))
+ #:cache? #t)))
- (update-statement
- (sqlite-prepare
- db
- "
+ (sqlite-bind-arguments
+ statement
+ #:build_id (db-find-build-id db build-uuid))
+
+ (let ((result
+ (sqlite-fold
+ (lambda (row result)
+ (match row
+ (#(build-id)
+ (if (all-inputs-built? db build-id)
+ (cons build-id result)
+ result))))
+ '()
+ statement)))
+ (sqlite-reset statement)
+
+ result)))
+
+ (let ((build-ids
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (all-build-ids db)))))
+ (call-with-writer-thread/delay-logging
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
UPDATE unprocessed_builds_with_derived_priorities
SET all_inputs_built = 1
WHERE build_id = :build_id"
- #:cache? #t)))
+ #:cache? #t)))
- (sqlite-bind-arguments builds-statement
- #:build_id (db-find-build-id db build-uuid))
-
- (sqlite-fold
- (lambda (row result)
- (match row
- (#(build-id)
- (when (all-inputs-built? db build-id)
- (sqlite-bind-arguments update-statement
- #:build_id build-id)
- (sqlite-step-and-reset update-statement))))
- #f)
- #f
- builds-statement)
- (sqlite-reset builds-statement)))))
+ (for-each
+ (lambda (build-id)
+ (sqlite-bind-arguments statement
+ #:build_id build-id)
+ (sqlite-step-and-reset statement))
+ build-ids)
+
+ #t)))))
(define-method (datastore-remove-build-allocation
(datastore <sqlite-datastore>)
build-uuid agent-id)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1958,8 +2080,8 @@ DELETE FROM allocated_builds
(define-method (datastore-mark-build-as-processed
(datastore <sqlite-datastore>)
build-uuid end-time)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1995,8 +2117,8 @@ DELETE FROM unprocessed_builds_with_derived_priorities
(define-method (datastore-delete-relevant-outputs-from-unbuilt-outputs
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2023,8 +2145,8 @@ WHERE output_id IN (
(datastore <sqlite-datastore>)
build-uuid
output-metadata)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(define (name->output-id name)
(let ((statement
@@ -2101,8 +2223,8 @@ INSERT INTO build_starts (
(define-method (datastore-find-build-starts
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2211,8 +2333,8 @@ INSERT INTO setup_failure_missing_inputs (
(define-method (datastore-list-setup-failure-missing-inputs
(datastore <sqlite-datastore>)
setup-failure-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2240,8 +2362,8 @@ WHERE setup_failure_id = :id"
build-uuid
agent-id
failure-reason)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(insert-setup-failure-and-remove-allocation db
(db-find-build-id db build-uuid)
@@ -2258,8 +2380,8 @@ WHERE setup_failure_id = :id"
(define-method (datastore-count-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2282,8 +2404,8 @@ FROM builds_counts"
(define-method (datastore-for-each-build
(datastore <sqlite-datastore>)
proc)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2322,8 +2444,8 @@ SELECT id FROM builds WHERE uuid = :uuid"
(define-method (datastore-find-build
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2381,12 +2503,14 @@ WHERE uuid = :uuid"
(canceled 'unset)
(priority-> 'unset)
(priority-< 'unset)
+ (created-at-> 'unset)
+ (created-at-< 'unset)
(after-id #f)
(limit #f)
;; other-builds-dependent or no-dependent-builds
(relationship 'unset))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(define tag->expression
(let ((statement
@@ -2399,7 +2523,7 @@ SELECT id FROM tags WHERE key = :key AND value = :value"
(sqlite-prepare
db
"
-SELECT id FROM tags WHERE key = :key"
+SELECT 1 FROM tags WHERE key = :key LIMIT 1"
#:cache? #t)))
(lambda (tag not?)
(match tag
@@ -2420,28 +2544,27 @@ SELECT id FROM tags WHERE key = :key"
(sqlite-bind-arguments key-statement
#:key key)
- (let* ((tag-ids (sqlite-map
- (match-lambda
- (#(id) id))
- key-statement))
- (result
- (string-append
- "("
- (string-join
- (map
- (lambda (id)
+ (let ((tag-with-key-exists?
+ (->bool (sqlite-step-and-reset key-statement))))
+ (if tag-with-key-exists?
+ (let ((result
(string-append
+ "("
(if not? "NOT " "")
- "EXISTS (SELECT 1 FROM build_tags "
- "WHERE build_id = builds.id AND tag_id = "
- (number->string id)
- ")"))
- tag-ids)
- (if not? " AND " " OR "))
- ")")))
- (sqlite-reset key-statement)
-
- result))))))
+ "
+EXISTS (
+ SELECT 1
+ FROM build_tags
+ INNER JOIN tags ON build_tags.tag_id = tags.id
+ WHERE build_id = builds.id
+ AND tags.key = '"
+ key "'
+)"
+ ")")))
+ result)
+ (if not?
+ "TRUE"
+ "FALSE"))))))))
(let ((tag-expressions
(map (lambda (tag)
@@ -2463,6 +2586,8 @@ SELECT id FROM tags WHERE key = :key"
(not (null? not-systems))
(not (eq? priority-> 'unset))
(not (eq? priority-< 'unset))
+ (not (eq? created-at-> 'unset))
+ (not (eq? created-at-< 'unset))
(not (eq? processed 'unset))
(not (eq? canceled 'unset))
(not (eq? relationship 'unset))
@@ -2513,6 +2638,14 @@ INNER JOIN derivations
(list
(simple-format #f "priority < ~A" priority-<))
'())
+ (if (string? created-at->)
+ (list
+ (simple-format #f "created_at > '~A'" created-at->))
+ '())
+ (if (string? created-at-<)
+ (list
+ (simple-format #f "created_at < '~A'" created-at-<))
+ '())
(cond
((eq? processed #t) '("processed = 1"))
((eq? processed #f) '("processed = 0"))
@@ -2616,8 +2749,8 @@ WHERE derivation_outputs.derivation_id = builds.derivation_id)"))
(define-method (datastore-fetch-build-tags
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2651,8 +2784,8 @@ WHERE build_tags.build_id = :build_id"
(define-method (datastore-find-build-result
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2677,8 +2810,8 @@ WHERE build_id = :build_id"
(define-method (datastore-find-build-derivation-system
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2714,8 +2847,8 @@ WHERE builds.id = :build_id"
datastore
"list_builds_for_output"
(lambda ()
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2763,8 +2896,8 @@ WHERE derivation_outputs.output_id = :output_id"
rest)
(apply
(lambda* (output system #:key include-canceled?)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2808,8 +2941,8 @@ WHERE derivation_outputs.output_id = :output_id
rest)
(apply
(lambda* (derivation #:key (include-canceled? #t))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2844,8 +2977,8 @@ WHERE derivations.name = :derivation"
(define-method (datastore-count-setup-failures
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2869,8 +3002,8 @@ GROUP BY agent_id, failure_reason"
(define-method (datastore-list-setup-failures-for-build
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2902,8 +3035,8 @@ WHERE build_id = :build_id"
args)
(apply
(lambda* (#:key agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2954,8 +3087,8 @@ WHERE builds.processed = 0
(define-method (datastore-list-processed-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2981,8 +3114,8 @@ WHERE processed = 1"
(define-method (datastore-list-unprocessed-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3016,10 +3149,11 @@ ORDER BY priority DESC"
builds)))))
-(define-method (datastore-find-first-unallocated-deferred-build
- (datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+(define-method (datastore-find-deferred-build
+ (datastore <sqlite-datastore>)
+ select?)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3032,25 +3166,29 @@ INNER JOIN derivations
WHERE processed = 0
AND canceled = 0
AND deferred_until IS NOT NULL
- AND builds.id NOT IN (SELECT build_id FROM build_allocation_plan)
-ORDER BY deferred_until ASC
-LIMIT 1"
+ORDER BY deferred_until ASC"
#:cache? #t)))
- (match (sqlite-step-and-reset statement)
- (#(uuid derivation_name priority created_at deferred_until)
- `((uuid . ,uuid)
- (derivation-name . ,derivation_name)
- (priority . ,priority)
- (created-at . ,(if (string? created_at)
- (string->date created_at
- "~Y-~m-~d ~H:~M:~S")
- #f))
- (deferred-until . ,(if (string? deferred_until)
- (string->date deferred_until
- "~Y-~m-~d ~H:~M:~S")
- #f))))
- (#f #f))))))
+ (let loop ((row (sqlite-step statement)))
+ (match row
+ (#(uuid derivation_name priority created_at deferred_until)
+ (let ((res
+ (select?
+ `((uuid . ,uuid)
+ (derivation-name . ,derivation_name)
+ (priority . ,priority)
+ (created-at . ,(if (string? created_at)
+ (string->date created_at
+ "~Y-~m-~d ~H:~M:~S")
+ #f))
+ (deferred-until . ,(if (string? deferred_until)
+ (string->date deferred_until
+ "~Y-~m-~d ~H:~M:~S")
+ #f))))))
+ (if res
+ res
+ (loop (sqlite-step statement)))))
+ (#f #f)))))))
(define-method (datastore-fetch-prioritised-unprocessed-builds
(datastore <sqlite-datastore>))
@@ -3059,9 +3197,11 @@ LIMIT 1"
(sqlite-prepare
db
"
-SELECT builds.uuid
+SELECT builds.uuid, systems.system
FROM unprocessed_builds_with_derived_priorities
INNER JOIN builds ON build_id = builds.id
+INNER JOIN derivations ON builds.derivation_id = derivations.id
+INNER JOIN systems ON derivations.system_id = systems.id
WHERE all_inputs_built = 1
AND NOT EXISTS (
SELECT 1
@@ -3071,26 +3211,21 @@ WHERE all_inputs_built = 1
) ORDER BY derived_priority ASC"
#:cache? #t)))
- (let ((result (sqlite-fold
- (lambda (row result)
- (cons (vector-ref row 0)
- result))
- '()
- statement)))
+ (let ((result (sqlite-fold cons '() statement)))
(sqlite-reset statement)
result)))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
fetch-prioritised-unprocessed-builds))
(define-method (datastore-insert-unprocessed-hook-event
(datastore <sqlite-datastore>)
event
arguments)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(insert-unprocessed-hook-event db
event
@@ -3120,8 +3255,8 @@ VALUES (:event, :arguments)"
(define-method (datastore-count-unprocessed-hook-events
(datastore <sqlite-datastore>))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3144,8 +3279,8 @@ SELECT event, COUNT(*) FROM unprocessed_hook_events GROUP BY event"
(datastore <sqlite-datastore>)
event
limit)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3178,8 +3313,8 @@ LIMIT :limit"
(define-method (datastore-find-unprocessed-hook-event
(datastore <sqlite-datastore>)
id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3205,8 +3340,8 @@ WHERE id = :id"
(define-method (datastore-delete-unprocessed-hook-event
(datastore <sqlite-datastore>)
id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3219,110 +3354,14 @@ DELETE FROM unprocessed_hook_events WHERE id = :id"
statement
#:id id)
- (sqlite-step-and-reset statement))))
- #t)
-
-(define-method (datastore-count-build-allocation-plan-entries
- (datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (let ((statement
- (sqlite-prepare
- db
- "
-SELECT agent_id, COUNT(*)
-FROM build_allocation_plan
-GROUP BY agent_id"
- #:cache? #t)))
-
- (let ((result
- (sqlite-map
- (match-lambda
- (#(agent_id count)
- (cons agent_id count)))
- statement)))
- (sqlite-reset statement)
-
- result)))))
-
-(define-method (datastore-replace-build-allocation-plan
- (datastore <sqlite-datastore>)
- planned-builds)
- (define (clear-current-plan db)
- (sqlite-exec
- db
- "DELETE FROM build_allocation_plan"))
-
- (define insert-sql
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (string-append
- "
-INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES "
- (string-join
- (map (match-lambda
- ((build-uuid agent-id ordering)
- (simple-format
- #f
- "('~A', '~A', ~A)"
- (db-find-build-id db build-uuid)
- agent-id
- ordering)))
- planned-builds)
- ", ")
- ";"))))
-
- (define (insert-new-plan db planned-builds)
- (sqlite-exec
- db
- insert-sql))
-
- (datastore-call-with-transaction
- datastore
- (lambda (db)
- (clear-current-plan db)
- (unless (null? planned-builds)
- (insert-new-plan db planned-builds)))
- #:duration-metric-name "replace_build_allocation_plan")
-
- (let* ((agent-ids
- (map (lambda (agent-details)
- (assq-ref agent-details 'uuid))
- (datastore-list-agents datastore)))
- (counts-by-agent
- (make-vector (length agent-ids) 0)))
- (for-each
- (match-lambda
- ((_ agent-id _)
- (let ((index (list-index (lambda (list-agent-id)
- (string=? agent-id list-agent-id))
- agent-ids)))
- (vector-set! counts-by-agent
- index
- (+ (vector-ref counts-by-agent
- index)
- 1)))))
- planned-builds)
-
- (let ((metric
- (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry)
- "build_allocation_plan_total")))
- (for-each
- (lambda (index agent-id)
- (metric-set metric
- (vector-ref counts-by-agent index)
- #:label-values
- `((agent_id . ,agent-id))))
- (iota (length agent-ids))
- agent-ids)))
+ (sqlite-step-and-reset statement)))
+ #:priority 'high)
#t)
(define-method (datastore-count-allocated-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3344,8 +3383,8 @@ SELECT agent_id, COUNT(*) FROM allocated_builds GROUP BY agent_id"
(define-method (datastore-agent-requested-systems
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3418,34 +3457,40 @@ INSERT INTO build_allocation_agent_requested_systems (agent_id, system_id) VALUE
(define-method (datastore-fetch-build-to-allocate
(datastore <sqlite-datastore>)
- agent-id)
- (datastore-call-with-transaction
- datastore
+ build-id)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
db
- ;; This needs to guard against the plan being out of date
"
SELECT builds.uuid, derivations.id, derivations.name,
unprocessed_builds_with_derived_priorities.derived_priority
FROM builds
-INNER JOIN build_allocation_plan
- ON builds.id = build_allocation_plan.build_id
INNER JOIN derivations
ON builds.derivation_id = derivations.id
-INNER JOIN build_allocation_agent_requested_systems
- ON build_allocation_agent_requested_systems.agent_id = :agent_id
- AND build_allocation_agent_requested_systems.system_id = derivations.system_id
LEFT JOIN unprocessed_builds_with_derived_priorities
ON unprocessed_builds_with_derived_priorities.build_id = builds.id
-WHERE build_allocation_plan.agent_id = :agent_id
+WHERE builds.uuid = :uuid
AND builds.processed = 0
AND builds.canceled = 0
- AND builds.id NOT IN (SELECT build_id FROM allocated_builds)
-ORDER BY build_allocation_plan.ordering ASC"
- #:cache? #t))
- (output-conflicts-statement
+ AND builds.id NOT IN (SELECT build_id FROM allocated_builds)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:uuid build-id)
+
+ (sqlite-step-and-reset statement)))))
+
+(define-method (datastore-check-if-derivation-conflicts?
+ (datastore <sqlite-datastore>)
+ agent-id
+ derivation-id)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
(sqlite-prepare
db
"
@@ -3463,149 +3508,63 @@ WHERE build_derivation_outputs.derivation_id = :derivation_id
allocated_builds_derivation_outputs.output_id"
#:cache? #t)))
- (define (get-build-to-allocate)
- (let loop ((build-details (sqlite-step statement)))
- (match build-details
- (#f #f)
- (#(uuid derivation-id derivation-name derived_priority)
+ (sqlite-bind-arguments statement
+ #:agent_id agent-id
+ #:derivation_id derivation-id)
- (sqlite-bind-arguments output-conflicts-statement
- #:agent_id agent-id
- #:derivation_id derivation-id)
+ (->bool (sqlite-step-and-reset statement))))))
- (if (eq? (sqlite-step-and-reset output-conflicts-statement)
- #f)
- `((uuid . ,uuid)
- (derivation_name . ,derivation-name)
- (derived_priority . ,derived_priority))
- (loop (sqlite-step statement)))))))
+(define-method (datastore-insert-to-allocated-builds
+ (datastore <sqlite-datastore>)
+ agent-id
+ build-uuid)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO allocated_builds (build_id, agent_id)
+ VALUES (:build_id, :agent_id)"
+ #:cache? #t)))
(sqlite-bind-arguments
statement
+ #:build_id (db-find-build-id db build-uuid)
#:agent_id agent-id)
- (let ((result (get-build-to-allocate)))
- (sqlite-reset statement)
-
- result)))
-
- #:readonly? #t
- #:duration-metric-name "fetch_builds_to_allocate"))
-
-(define-method (datastore-insert-to-allocated-builds
- (datastore <sqlite-datastore>)
- agent-id
- build-uuids)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (sqlite-exec
- db
- (string-append
- "
-INSERT INTO allocated_builds (build_id, agent_id) VALUES "
- (string-join
- (map (lambda (build-uuid)
- (simple-format
- #f
- "(~A, '~A')"
- (db-find-build-id db build-uuid)
- agent-id))
- build-uuids)
- ", ")
- ";")))))
+ (sqlite-step-and-reset statement))))
+ #t)
-(define-method (datastore-remove-builds-from-plan
+(define-method (datastore-update-allocated-build-submit-outputs
(datastore <sqlite-datastore>)
- build-uuids)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ build-uuid
+ submit-outputs?)
+ (call-with-writer-thread
+ datastore
(lambda (db)
- (sqlite-exec
- db
- (string-append
- "
-DELETE FROM build_allocation_plan
-WHERE build_id IN ("
- (string-join
- (map (lambda (build-uuid)
- (number->string (db-find-build-id db build-uuid)))
- build-uuids)
- ", ")
- ")")))))
-
-(define-method (datastore-list-allocation-plan-builds
- (datastore <sqlite-datastore>)
- .
- rest)
- (apply
- (lambda* (agent-id #:key limit)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (let ((statement
- (sqlite-prepare
- db
- (string-append
- "
-SELECT builds.uuid, derivations.name, systems.system,
- builds.priority,
- unprocessed_builds_with_derived_priorities.derived_priority
-FROM builds
-INNER JOIN derivations
- ON builds.derivation_id = derivations.id
-INNER JOIN systems
- ON derivations.system_id = systems.id
-INNER JOIN build_allocation_plan
- ON builds.id = build_allocation_plan.build_id
-LEFT JOIN unprocessed_builds_with_derived_priorities
- ON builds.id = unprocessed_builds_with_derived_priorities.build_id
-WHERE build_allocation_plan.agent_id = :agent_id
- AND builds.processed = 0
- AND builds.id NOT IN (SELECT build_id FROM allocated_builds)
-ORDER BY build_allocation_plan.ordering ASC"
- (if limit
- "
-LIMIT :limit"
- ""))
- #:cache? #t)))
-
- (apply sqlite-bind-arguments
- statement
- #:agent_id agent-id
- (if limit
- (list #:limit limit)
- '()))
-
- (let ((builds (sqlite-map
- (match-lambda
- (#(uuid derivation_name system
- priority derived_priority)
- `((uuid . ,uuid)
- (derivation_name . ,derivation_name)
- (system . ,system)
- (priority . ,priority)
- (derived_priority . ,derived_priority)
- (tags . ,(vector-map
- (lambda (_ tag)
- (match tag
- ((key . value)
- `((key . ,key)
- (value . ,value)))))
- (datastore-fetch-build-tags
- datastore
- uuid))))))
- statement)))
- (sqlite-reset statement)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+UPDATE allocated_builds
+SET submit_outputs = :submit_outputs
+WHERE build_id = :build_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:build_id (db-find-build-id db build-uuid)
+ #:submit_outputs (if submit-outputs? 1 0))
- builds)))))
- rest))
+ (sqlite-step-and-reset statement))))
+ #t)
(define-method (datastore-list-agent-builds
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3613,7 +3572,7 @@ LIMIT :limit"
"
SELECT builds.uuid, derivations.name,
unprocessed_builds_with_derived_priorities.derived_priority,
- builds.canceled
+ builds.canceled, allocated_builds.submit_outputs
FROM builds
INNER JOIN derivations
ON builds.derivation_id = derivations.id
@@ -3630,11 +3589,17 @@ WHERE allocated_builds.agent_id = :agent_id"
(let ((builds (sqlite-map
(match-lambda
- (#(uuid derivation_name derived_priority canceled)
+ (#(uuid derivation_name derived_priority canceled
+ submit_outputs)
`((uuid . ,uuid)
(derivation_name . ,derivation_name)
(derived_priority . ,derived_priority)
- (canceled . ,(= 1 canceled)))))
+ (canceled . ,(= 1 canceled))
+ (submit_outputs . ,(cond
+ ((not submit_outputs)
+ 'null)
+ (else
+ (= 1 submit_outputs)))))))
statement)))
(sqlite-reset statement)
@@ -3643,8 +3608,8 @@ WHERE allocated_builds.agent_id = :agent_id"
(define-method (datastore-agent-for-build
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3706,11 +3671,11 @@ WHERE build_results.build_id = :build_id"
"_sqitch_registry.db")
(string-append "db:sqlite:" database-file))))
- (simple-format #t "running command: ~A\n"
- (string-join command))
+ (simple-format/safe #t "running command: ~A\n"
+ (string-join command))
(let ((pid (spawn (%config 'sqitch) command)))
(unless (zero? (cdr (waitpid pid)))
- (simple-format
+ (simple-format/safe
(current-error-port)
"error: sqitch command failed\n")
(exit 1)))))
@@ -3800,16 +3765,16 @@ WHERE name = :name"
(define-method (datastore-find-derivation
(datastore <sqlite-datastore>)
name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(db-find-derivation db name))))
(define-method (datastore-find-derivation-outputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3841,8 +3806,8 @@ WHERE derivation_id = :derivation_id"
(define-method (datastore-find-derivation-output-details
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3884,8 +3849,8 @@ WHERE derivation_id = :derivation_id"
(define-method (datastore-list-unbuilt-derivation-outputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3918,8 +3883,8 @@ WHERE derivation_id = :derivation_id"
(define-method (datastore-list-build-outputs
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3963,8 +3928,8 @@ WHERE builds.id = :build_id"
(define-method (datastore-find-derivation-system
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3987,8 +3952,8 @@ WHERE name = :name"
(define-method (datastore-find-derivation-inputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -4028,8 +3993,8 @@ WHERE derivations.id = :derivation_id"
(define-method (datastore-find-recursive-derivation-input-outputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -4070,8 +4035,8 @@ INNER JOIN outputs
(datastore <sqlite-datastore>)
start-derivation-name
output)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -4154,20 +4119,6 @@ SELECT system FROM systems WHERE id = :id"
(define derivation-name
(derivation-file-name derivation))
- (define (maybe-fix-fixed-output-field derivation-details)
- (let ((fixed-output? (fixed-output-derivation? derivation)))
- (unless (equal? (assq-ref derivation-details 'fixed-output?)
- fixed-output?)
- (let ((statement (sqlite-prepare
- db
- "
-UPDATE derivations SET fixed_output = :fixed_output WHERE name = :name"
- #:cache? #t)))
- (sqlite-bind-arguments statement
- #:name derivation-name
- #:fixed_output (if fixed-output? 1 0))
- (sqlite-step-and-reset statement)))))
-
(define (insert-derivation)
(let ((statement
(sqlite-prepare
@@ -4198,8 +4149,6 @@ INSERT INTO derivations (name, system_id, fixed_output)
(db-find-derivation db derivation-name)))
(if derivation-details
(begin
- (maybe-fix-fixed-output-field derivation-details)
-
(let ((derivation-outputs
(select-derivation-outputs db derivation-name)))
@@ -4508,15 +4457,15 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)"
(lambda* (uuid drv-name priority defer-until
#:key skip-updating-other-build-derived-priorities)
(define system-id
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(db-system->system-id
db
(datastore-find-derivation-system datastore drv-name)))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let* ((build-id (insert-build db drv-name uuid priority
defer-until))
@@ -4542,7 +4491,8 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)"
#:args
(list db
build-id
- derived-priority)))))))
+ derived-priority)))))
+ #:priority 'low))
rest)
#t)
@@ -4578,3 +4528,212 @@ VALUES (:agent_id, :password)"
#:password password)
(sqlite-step-and-reset statement)))
+
+(define-method (datastore-insert-background-job
+ (datastore <sqlite-datastore>)
+ type
+ args)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO background_jobs_queue (type, args)
+VALUES (:type, :args)
+RETURNING id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:type (symbol->string type)
+ #:args (call-with-output-string
+ (lambda (port)
+ (write args port))))
+
+ (match (sqlite-step-and-reset statement)
+ (#(id)
+
+ (metric-increment
+ (metrics-registry-fetch-metric
+ (slot-ref datastore 'metrics-registry)
+ "coordinator_background_job_inserted_total")
+ #:label-values `((name . ,type)))
+
+ id))))))
+
+(define-method (datastore-delete-background-job
+ (datastore <sqlite-datastore>)
+ id)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+DELETE FROM background_jobs_queue WHERE id = :id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:id id)
+
+ (sqlite-step-and-reset statement))
+ #t)
+ #:priority 'high))
+
+(define-method (datastore-select-background-jobs
+ (datastore <sqlite-datastore>)
+ .
+ args)
+ (apply
+ (lambda* (type #:key (limit 1))
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT id, args
+FROM background_jobs_queue
+WHERE type = :type
+ORDER BY id ASC
+LIMIT :limit"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:type (symbol->string type)
+ #:limit limit)
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(id args)
+ `((id . ,id)
+ (args . ,(call-with-input-string args read)))))
+ statement)))
+ (sqlite-reset statement)
+ result)))))
+ args))
+
+(define-method (datastore-count-background-jobs
+ (datastore <sqlite-datastore>))
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT type, COUNT(*)
+FROM background_jobs_queue
+GROUP BY type"
+ #:cache? #t)))
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(type count)
+ (cons type count)))
+ statement)))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define-method (datastore-check-and-correct-unprocessed-builds-all-inputs-built
+ (datastore <sqlite-datastore>)
+ .
+ args)
+ (define entries-to-check
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT build_id, builds.derivation_id
+FROM unprocessed_builds_with_derived_priorities
+INNER JOIN builds ON builds.id = build_id
+WHERE all_inputs_built = 0
+ORDER BY build_id DESC"
+ #:cache? #t)))
+ (let ((result (sqlite-map identity statement)))
+ (sqlite-reset statement)
+ result)))))
+
+ (define (all-inputs-built? derivation-id)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT 1
+FROM derivation_inputs
+INNER JOIN derivation_outputs
+ ON derivation_inputs.derivation_output_id = derivation_outputs.id
+INNER JOIN unbuilt_outputs
+ ON unbuilt_outputs.output_id = derivation_outputs.output_id
+WHERE derivation_inputs.derivation_id = :derivation_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:derivation_id derivation-id)
+
+ (match (sqlite-step-and-reset statement)
+ (#(1) #f)
+ (#f #t))))))
+
+ (define (update build-id)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+UPDATE unprocessed_builds_with_derived_priorities
+SET all_inputs_built = 1
+WHERE build_id = :build_id AND all_inputs_built = 0
+RETURNING 1"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:build_id build-id)
+
+ ;; This is to cope with the check not being transactional, so we
+ ;; might go to update a record which has just been changed to have
+ ;; all_inputs_built = 1
+ (match (sqlite-step-and-reset statement)
+ (#(1) #t)
+ (#f #f))))))
+
+ (apply
+ (lambda* (#:key (progress-reporter (const progress-reporter/silent)))
+ (let ((reporter
+ (progress-reporter (length entries-to-check))))
+ (start-progress-reporter! reporter)
+ (let loop ((entries entries-to-check)
+ (updated-count 0))
+ (if (null? entries)
+ (begin
+ (stop-progress-reporter! reporter)
+ updated-count)
+ (match (car entries)
+ (#(build-id derivation-id)
+ (progress-reporter-report! reporter)
+ (if (all-inputs-built? derivation-id)
+ (loop (cdr entries)
+ (+ updated-count
+ (if (update build-id)
+ 1
+ 0)))
+ (loop (cdr entries)
+ updated-count))))))))
+ args))
diff --git a/guix-build-coordinator/guix-data-service.scm b/guix-build-coordinator/guix-data-service.scm
index e2ecbe7..584bbbd 100644
--- a/guix-build-coordinator/guix-data-service.scm
+++ b/guix-build-coordinator/guix-data-service.scm
@@ -27,6 +27,7 @@
#:use-module (json)
#:use-module (web client)
#:use-module (web response)
+ #:use-module (knots timeout)
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator datastore)
#:export (send-build-event-to-guix-data-service
@@ -51,7 +52,8 @@
#:body body
;; Guile doesn't treat JSON as text, so decode the
;; body manually
- #:decode-body? #f))))
+ #:decode-body? #f))
+ #:timeout 10))
(code
(response-code response)))
(unless (and (>= code 200)
diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm
index bf4b576..3cd1c59 100644
--- a/guix-build-coordinator/hooks.scm
+++ b/guix-build-coordinator/hooks.scm
@@ -26,14 +26,16 @@
#:use-module (gcrypt pk-crypto)
#:use-module (zlib)
#:use-module (lzlib)
+ #:use-module (knots timeout)
#:use-module (guix pki)
#:use-module (guix store)
#:use-module (guix base32)
#:use-module (guix config)
#:use-module (guix derivations)
#:use-module (guix serialization)
- #:use-module ((guix utils) #:select (default-keyword-arguments))
- #:use-module (guix build utils)
+ #:use-module ((guix utils) #:select (default-keyword-arguments
+ call-with-decompressed-port))
+ #:use-module ((guix build utils) #:select (dump-port mkdir-p))
#:use-module (guix-build-coordinator config)
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator datastore)
@@ -81,6 +83,20 @@
build-id agent-id)
(current-error-port))))
+(define* (default-nar-compressions #:key output-filename nar-size
+ source-compression source-size)
+ (define MiB (* (expt 2 20) 1.))
+
+ (if (eq? 'none source-compression)
+ ;; If the agent didn't compress the nar, don't change that here
+ (list 'none)
+ (let* ((compression-proportion
+ (/ source-size nar-size)))
+ (if (or (> compression-proportion 0.95)
+ (< nar-size (* 0.05 MiB)))
+ '(none)
+ (list source-compression)))))
+
(define* (build-success-publish-hook
publish-directory
#:key
@@ -94,7 +110,8 @@
post-publish-hook
combined-post-publish-hook
(publish-referenced-derivation-source-files? #t)
- derivation-substitute-urls)
+ derivation-substitute-urls
+ (nar-compressions-proc default-nar-compressions))
(mkdir-p (string-append publish-directory "/nar/lzip"))
(lambda (build-coordinator build-id)
@@ -140,7 +157,8 @@
(substitute-derivation store
drv-name
#:substitute-urls
- derivation-substitute-urls)))
+ derivation-substitute-urls))
+ #:timeout 120)
(add-temp-root store drv-name))
(let* ((drv (read-derivation-from-file* drv-name))
@@ -218,8 +236,7 @@
nar-filename))
#:unwind? #t))
- (cons narinfo-filename
- nar-filename)))))
+ narinfo-filename))))
referenced-source-files))))))
(define (process-output drv-name output)
@@ -230,12 +247,6 @@
(nar-location
(build-output-file-location datastore build-id
output-name))
- (nar-filename
- (string-append "nar/lzip/"
- (basename output-filename)))
- (nar-destination
- (string-append publish-directory "/"
- nar-filename))
(narinfo-filename
(string-append (string-take (basename output-filename) 32)
".narinfo"))
@@ -246,8 +257,78 @@
(if (skip-publishing-proc narinfo-filename narinfo-directory)
#f
- (begin
- (copy-file nar-location nar-destination)
+ (let ((compressions
+ (nar-compressions-proc
+ #:output-filename output-filename
+ #:nar-size (assq-ref output 'size)
+ ;; TODO Don't hardcode this
+ #:source-compression 'lzip
+ #:source-size (stat:size (stat nar-location #f)))))
+
+ (for-each
+ (lambda (compression)
+ (if (or (and (pair? compression)
+ (eq? (car compression) 'lzip))
+ (eq? compression 'lzip))
+ ;; TODO If the agents start uploading uncompressed files
+ ;; or files compressed differently, this might not be
+ ;; right
+ (let ((nar-destination
+ (string-append publish-directory "/"
+ "nar/lzip/"
+ (basename output-filename))))
+ (copy-file nar-location nar-destination))
+ (let* ((target-compression
+ (if (pair? compression)
+ (car compression)
+ compression))
+ ;; TODO This logic should sit elsewhere
+ (nar-destination
+ (string-append
+ publish-directory "/"
+ "nar/"
+ (if (eq? compression 'none)
+ ""
+ (string-append
+ (symbol->string target-compression) "/"))
+ (basename output-filename)))
+ (temporary-destination
+ (string-append nar-destination ".tmp")))
+ (mkdir-p (dirname temporary-destination))
+ (call-with-input-file nar-location
+ (lambda (source-port)
+ (call-with-decompressed-port
+ ;; TODO Don't assume the source compression
+ 'lzip
+ source-port
+ (lambda (decompressed-source-port)
+ (let ((call-with-compressed-output-port*
+ (match target-compression
+ ('gzip
+ (@ (zlib) call-with-gzip-output-port))
+ ('lzip
+ (@ (lzlib) call-with-lzip-output-port))
+ ('zstd
+ (@ (zstd) call-with-zstd-output-port))
+ ('none
+ (lambda (port proc)
+ (proc port)
+ (close-port port))))))
+ (when (file-exists? temporary-destination)
+ (delete-file temporary-destination))
+ (apply
+ call-with-compressed-output-port*
+ (open-output-file temporary-destination
+ #:binary #t)
+ (lambda (compressed-port)
+ (dump-port decompressed-source-port
+ compressed-port))
+ (if (pair? compression)
+ (cdr compression)
+ '())))))))
+ (rename-file temporary-destination
+ nar-destination))))
+ compressions)
(call-with-output-file narinfo-location
(lambda (port)
@@ -257,7 +338,23 @@
(assq-ref output 'size)
(vector->list
(assq-ref output 'references))
- `((lzip ,(stat:size (stat nar-location #f))))
+ (map
+ (lambda (compression-details)
+ (let* ((compression
+ (if (pair? compression-details)
+ (car compression-details)
+ compression-details))
+ ;; TODO This logic should sit elsewhere
+ (file (string-append
+ publish-directory "/"
+ "nar/"
+ (if (eq? compression 'none)
+ ""
+ (string-append
+ (symbol->string compression) "/"))
+ (basename output-filename))))
+ (list compression (stat:size (stat file #f)))))
+ compressions)
#:system (datastore-find-derivation-system
datastore
drv-name)
@@ -276,18 +373,16 @@
(raise-exception exn))
(lambda ()
(post-publish-hook publish-directory
- narinfo-filename
- nar-filename))
+ narinfo-filename))
#:unwind? #t))
- (cons narinfo-filename
- nar-filename)))))
+ narinfo-filename))))
(let* ((build-details
(datastore-find-build datastore build-id))
(drv-name
(assq-ref build-details 'derivation-name))
- (narinfos-and-nars
+ (narinfos
(append
(if publish-referenced-derivation-source-files?
(process-referenced-derivation-source-files drv-name)
@@ -297,23 +392,22 @@
(process-output drv-name output))
(datastore-list-build-outputs datastore build-id)))))
(when (and combined-post-publish-hook
- (not (null? narinfos-and-nars)))
+ (not (null? narinfos)))
(with-exception-handler
(lambda (exn)
;; Rollback narinfo creation, to make this more
;; transactional
(for-each
- (match-lambda
- ((narinfo-filename . _)
- (delete-file
- (string-append
- narinfo-directory "/" narinfo-filename))))
- narinfos-and-nars)
+ (lambda
+ (narinfo-filename)
+ (delete-file
+ (string-append
+ narinfo-directory "/" narinfo-filename)))
+ narinfos)
(raise-exception exn))
(lambda ()
- (combined-post-publish-hook publish-directory
- narinfos-and-nars))
+ (combined-post-publish-hook publish-directory narinfos))
#:unwind? #t)))))
(define* (build-success-s3-publish-hook
@@ -528,24 +622,27 @@
(unless (eq? source-compression recompress-to)
(when (file-exists? tmp-output-log-file)
(delete-file tmp-output-log-file))
- (with-timeout timeout
- (raise-exception
- (make-exception-with-message "timeout recompressing log file"))
- (call-with-compressed-input-file
- source-log-file
- source-compression
- (lambda (input-port)
- (call-with-compressed-output-file
- tmp-output-log-file
- recompress-to
- (lambda (output-port)
- (dump-port input-port output-port))))))
+ (call-with-compressed-input-file
+ source-log-file
+ source-compression
+ (lambda (input-port)
+ (call-with-compressed-output-file
+ tmp-output-log-file
+ recompress-to
+ (lambda (output-port)
+ (dump-port input-port output-port)))))
(rename-file tmp-output-log-file
output-log-file)
(delete-file source-log-file)))))
-(define (default-build-missing-inputs-hook build-coordinator
- build-id missing-inputs)
+(define* (default-build-missing-inputs-hook
+ build-coordinator
+ build-id missing-inputs
+ #:key (submit-build? (lambda* (missing-input
+ #:key successful-builds
+ pending-builds)
+ (and (null? successful-builds)
+ (null? pending-builds)))))
(define datastore
(build-coordinator-datastore build-coordinator))
@@ -579,8 +676,9 @@
(not (assq-ref build-details 'processed))
(not (assq-ref build-details 'canceled))))
builds-for-output)))
- (if (and (null? successful-builds)
- (null? pending-builds))
+ (if (submit-build? missing-input
+ #:successful-builds successful-builds
+ #:pending-builds pending-builds)
(begin
(simple-format #t
"submitting build for ~A\n"
@@ -590,8 +688,7 @@
#:tags (datastore-fetch-build-tags
datastore
build-id)))
- (simple-format #t "~A builds exist for ~A, skipping\n"
- (length builds-for-output)
+ (simple-format #t "skipping submitting build for ~A\n"
missing-input)))
(begin
(simple-format (current-error-port)
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index a549f20..f4f15dc 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -14,10 +14,11 @@
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 binary-ports)
- #:use-module (ice-9 suspendable-ports)
- #:use-module ((ice-9 ports internal) #:select (port-poll))
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
+ #:use-module (oop goops)
+ #:use-module (logging logger)
+ #:use-module (logging port-log)
#:use-module (web uri)
#:use-module (web http)
#:use-module (web client)
@@ -39,24 +40,12 @@
#:use-module ((guix http-client)
#:select (http-fetch))
#:use-module (guix serialization)
- #:use-module ((guix build download)
- #:select ((open-connection-for-uri
- . guix:open-connection-for-uri)))
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:use-module (guix scripts substitute)
+ #:use-module (guix-build-coordinator utils timeout)
#:export (random-v4-uuid
- &port-timeout
- &port-read-timeout
- &port-write-timeout
-
- port-timeout-error?
- port-read-timeout-error?
- port-write-timeout-error?
-
- with-port-timeouts
-
request-query-parameters
call-with-streaming-http-request
@@ -66,7 +55,7 @@
read-derivation-from-file*
- with-store/non-blocking
+ non-blocking-port
substitute-derivation
read-derivation-through-substitutes
@@ -86,9 +75,6 @@
create-work-queue
create-thread-pool
- with-timeout
- reset-timeout
-
throttle
get-load-average
@@ -100,7 +86,13 @@
open-socket-for-uri*
- check-locale!))
+ check-locale!
+
+ display/safe
+ simple-format/safe
+ format/safe
+
+ <custom-port-log>))
(eval-when (eval load compile)
(begin
@@ -188,74 +180,6 @@
(parse-query-string query))
'())))
-(define &port-timeout
- (make-exception-type '&port-timeout
- &external-error
- '(port)))
-
-(define make-port-timeout-error
- (record-constructor &port-timeout))
-
-(define port-timeout-error?
- (record-predicate &port-timeout))
-
-(define &port-read-timeout
- (make-exception-type '&port-read-timeout
- &port-timeout
- '()))
-
-(define make-port-read-timeout-error
- (record-constructor &port-read-timeout))
-
-(define port-read-timeout-error?
- (record-predicate &port-read-timeout))
-
-(define &port-write-timeout
- (make-exception-type '&port-write-timeout
- &port-timeout
- '()))
-
-(define make-port-write-timeout-error
- (record-constructor &port-write-timeout))
-
-(define port-write-timeout-error?
- (record-predicate &port-write-timeout))
-
-(define* (with-port-timeouts thunk #:key (timeout (* 120 1000)))
-
- ;; When the GC runs, it restarts the poll syscall, but the timeout remains
- ;; unchanged! When the timeout is longer than the time between the syscall
- ;; restarting, I think this renders the timeout useless. Therefore, this
- ;; code uses a short timeout, and repeatedly calls poll while watching the
- ;; clock to see if it has timed out overall.
- (define poll-timeout-ms 200)
-
- (define (wait port mode)
- (let ((timeout-internal
- (+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
- (let loop ((poll-value
- (port-poll port mode poll-timeout-ms)))
- (if (= poll-value 0)
- (if (> (get-internal-real-time)
- timeout-internal)
- (raise-exception
- (if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
- (loop (port-poll port mode poll-timeout-ms)))
- poll-value))))
-
- (parameterize
- ((current-read-waiter
- (lambda (port)
- (wait port "r")))
- (current-write-waiter
- (lambda (port)
- (wait port "w"))))
- (thunk)))
-
(define* (call-with-streaming-http-request uri
content-length
callback
@@ -282,8 +206,8 @@
(setvbuf port 'block (expt 2 13))
(with-exception-handler
(lambda (exp)
- (simple-format #t "error: ~A ~A: ~A\n"
- method (uri-path uri) exp)
+ (simple-format/safe #t "error: ~A ~A: ~A\n"
+ method (uri-path uri) exp)
(close-port port)
(raise-exception exp))
(lambda ()
@@ -298,7 +222,8 @@
(let ((body (read-response-body response)))
(close-port port)
(values response
- body)))))))))))
+ body)))))))))
+ #:timeout 120))
(define (find-missing-substitutes-for-output store substitute-urls output)
(if (valid-path? store output)
@@ -356,7 +281,7 @@
(when (file-exists? cache-file)
(with-exception-handler
(lambda (exn)
- (simple-format
+ (simple-format/safe
(current-error-port)
"error: when deleting substitute cache file: ~A\n"
exn))
@@ -368,7 +293,18 @@
(let ((substitute-urls
(append-map (lambda (substitute-url)
(let ((log-port (open-output-string)))
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format/safe
+ (current-error-port)
+ "exception in has-substiutes-no-cache? (~A): ~A\n"
+ substitute-url exn)
+ (display/safe (string-append
+ (get-output-string log-port)
+ "\n")
+ (current-error-port))
+ (close-output-port log-port)
+ (raise-exception exn))
(lambda ()
(if (null?
;; I doubt the caching is thread safe, so
@@ -378,17 +314,7 @@
(lookup-narinfos substitute-url
(list file)))))
'()
- (list substitute-url)))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "exception in has-substiutes-no-cache? (~A) ~A: ~A\n"
- substitute-url key args)
- (display (string-append
- (get-output-string log-port)
- "\n")
- (current-error-port))
- (close-output-port log-port)))))
+ (list substitute-url))))))
substitute-urls)))
substitute-urls))
@@ -399,23 +325,6 @@
(fcntl port F_SETFL (logior O_NONBLOCK flags)))
port))
-(define (ensure-non-blocking-store-connection store)
- "Mark the file descriptor that backs STORE, a <store-connection>, as
-O_NONBLOCK."
- (match (store-connection-socket store)
- ((? file-port? port)
- (non-blocking-port port))
- (_ #f)))
-
-(define-syntax-rule (with-store/non-blocking store exp ...)
- "Like 'with-store', bind STORE to a connection to the store, but ensure that
-said connection is non-blocking (O_NONBLOCK). Evaluate EXP... in that
-context."
- (with-store store
- (ensure-non-blocking-store-connection store)
- (let ()
- exp ...)))
-
(define* (substitute-derivation store
derivation-name
#:key substitute-urls)
@@ -439,7 +348,7 @@ context."
(take-right lines 10)
lines)))
(close-output-port log-port)
- (simple-format
+ (simple-format/safe
(current-error-port)
"exception when substituting derivation: ~A:\n ~A\n"
exn (string-join last-n-lines "\n"))
@@ -449,27 +358,23 @@ context."
(ensure-path store derivation-name)))
#:unwind? #t)))
-(define read-derivation-from-file*
- (let ((%derivation-cache
- (@@ (guix derivations) %derivation-cache)))
- (lambda (file)
- (or (and file (hash-ref %derivation-cache file))
- (let ((drv
- ;; read-derivation can call read-derivation-from-file, so to
- ;; avoid having many open files when reading a derivation with
- ;; inputs, read it in to a string first.
- (call-with-input-string
- ;; Avoid calling scm_i_relativize_path in
- ;; fport_canonicalize_filename since this leads to lots
- ;; of readlink calls
- (with-fluids ((%file-port-name-canonicalization 'none))
- (call-with-input-file file
- get-string-all))
- (lambda (port)
- (set-port-filename! port file)
- (read-derivation port read-derivation-from-file*)))))
- (hash-set! %derivation-cache file drv)
- drv)))))
+(define* (read-derivation-from-file* file #:optional (drv-hash (make-hash-table)))
+ (or (and file (hash-ref drv-hash file))
+ (let ((drv
+ ;; read-derivation can call read-derivation-from-file, so to
+ ;; avoid having many open files when reading a derivation with
+ ;; inputs, read it in to a string first.
+ (call-with-input-string
+ (call-with-input-file file
+ get-string-all)
+ (lambda (port)
+ (set-port-filename! port file)
+ (read-derivation port (lambda (file)
+ (read-derivation-from-file*
+ file
+ drv-hash)))))))
+ (hash-set! drv-hash file drv)
+ drv)))
(define (read-derivation-through-substitutes derivation-name
substitute-urls)
@@ -487,10 +392,9 @@ context."
(match (assoc-ref cache key)
(#f
(let ((socket
- (guix:open-connection-for-uri
+ (open-socket-for-uri*
uri
- #:verify-certificate? verify-certificate?
- #:timeout timeout)))
+ #:verify-certificate? verify-certificate?)))
(set! cache (alist-cons key socket cache))
socket))
(socket
@@ -591,9 +495,9 @@ context."
(define* (store-item->recutils compression file-size)
(let ((url (encode-and-join-uri-path
`(,@(split-and-decode-uri-path nar-path)
- ,@(if compression
- (list (symbol->string compression))
- '())
+ ,@(if (eq? compression 'none)
+ '()
+ (list (symbol->string compression)))
,(basename store-path)))))
(format #f "URL: ~a~%Compression: ~a~%~@[FileSize: ~a~%~]"
url
@@ -663,7 +567,7 @@ References: ~a~%"
#:unwind? #t)
((#t . return-values)
(when (> attempt 1)
- (simple-format
+ (simple-format/safe
(current-error-port)
"retry success: ~A\n on attempt ~A of ~A\n"
f
@@ -674,7 +578,7 @@ References: ~a~%"
(if (>= attempt
(- times 1))
(begin
- (simple-format
+ (simple-format/safe
(current-error-port)
"error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n"
f
@@ -685,14 +589,14 @@ References: ~a~%"
(when error-hook
(error-hook attempt exn))
(sleep-impl delay)
- (simple-format
+ (simple-format/safe
(current-error-port)
"running last retry of ~A after ~A failed attempts\n"
f
attempt)
(f))
(begin
- (simple-format
+ (simple-format/safe
(current-error-port)
"error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n"
f
@@ -920,27 +824,29 @@ References: ~a~%"
(define (thread-process-job job-args)
(with-exception-handler
(lambda (exn)
- (simple-format (current-error-port)
- "~A work queue, job raised exception ~A: ~A\n"
- name job-args exn))
+ (simple-format/safe
+ (current-error-port)
+ "~A work queue, job raised exception ~A: ~A\n"
+ name job-args exn))
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format/safe
+ (current-error-port)
+ "~A work queue, exception when handling job: ~A\n"
+ name exn)
+ (let* ((stack (make-stack #t 3))
+ (backtrace
+ (call-with-output-string
+ (lambda (port)
+ (display-backtrace stack port)
+ (newline port)))))
+ (display/safe
+ backtrace
+ (current-error-port)))
+ (raise-exception exn))
(lambda ()
- (apply proc job-args))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "~A work queue, exception when handling job: ~A ~A\n"
- name key args)
- (let* ((stack (make-stack #t 3))
- (backtrace
- (call-with-output-string
- (lambda (port)
- (display-backtrace stack port)
- (newline port)))))
- (display
- backtrace
- (current-error-port))))))
+ (apply proc job-args))))
#:unwind? #t))
(define (start-thread thread-index)
@@ -1110,36 +1016,29 @@ References: ~a~%"
(define (thread-process-job job-args)
(with-exception-handler
(lambda (exn)
- (with-exception-handler
- (lambda _
- #f)
- (lambda ()
- ;; Logging may raise an exception, so try and just keep going.
- (display
- (simple-format
- #f
- "~A thread pool, job raised exception ~A: ~A\n"
- name job-args exn)
- (current-error-port)))
- #:unwind? #t))
+ (simple-format/safe
+ (current-error-port)
+ "~A thread pool, job raised exception ~A: ~A\n"
+ name job-args exn))
(lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply proc job-args))
- (lambda (key . args)
- (simple-format
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format/safe
(current-error-port)
- "~A thread pool, exception when handling job: ~A ~A\n"
- name key args)
+ "~A thread pool, exception when handling job: ~A\n"
+ name exn)
(let* ((stack (make-stack #t 3))
(backtrace
(call-with-output-string
(lambda (port)
(display-backtrace stack port)
(newline port)))))
- (display
+ (display/safe
backtrace
- (current-error-port))))))
+ (current-error-port)))
+ (raise-exception exn))
+ (lambda ()
+ (apply proc job-args))))
#:unwind? #t))
(define (start-thread thread-index)
@@ -1268,27 +1167,6 @@ References: ~a~%"
(values pool-mutex job-available count-threads list-jobs)))
-;; copied from (guix scripts substitute)
-(define-syntax-rule (with-timeout duration handler body ...)
- "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY
-again."
- (begin
- (sigaction SIGALRM
- (lambda (signum)
- (sigaction SIGALRM SIG_DFL)
- handler))
- (alarm duration)
- (call-with-values
- (lambda ()
- body ...)
- (lambda result
- (alarm 0)
- (sigaction SIGALRM SIG_DFL)
- (apply values result)))))
-
-(define (reset-timeout duration)
- (alarm duration))
-
(define (throttle min-duration thunk)
(let ((next-min-runtime 0))
(lambda ()
@@ -1400,22 +1278,18 @@ again."
(define (check-locale!)
(with-exception-handler
(lambda (exn)
- (display
- (simple-format
- #f
- "exception when calling setlocale: ~A
+ (simple-format/safe
+ (current-error-port)
+ "exception when calling setlocale: ~A
falling back to en_US.utf8\n"
- exn)
- (current-error-port))
+ exn)
(with-exception-handler
(lambda (exn)
- (display
- (simple-format
- #f
- "exception when calling setlocale with en_US.utf8: ~A\n"
- exn)
- (current-error-port))
+ (simple-format/safe
+ (current-error-port)
+ "exception when calling setlocale with en_US.utf8: ~A\n"
+ exn)
(exit 1))
(lambda _
@@ -1424,3 +1298,51 @@ falling back to en_US.utf8\n"
(lambda _
(setlocale LC_ALL ""))
#:unwind? #t))
+
+(define* (display/safe obj #:optional (port (current-output-port)))
+ ;; Try to avoid the dreaded conversion to port encoding failed error #62590
+ (put-bytevector
+ port
+ (string->utf8
+ (call-with-output-string
+ (lambda (port)
+ (display obj port)))))
+ (force-output port))
+
+(define (simple-format/safe port s . args)
+ (let ((str (apply simple-format #f s args)))
+ (if (eq? #f port)
+ str
+ (display/safe
+ str
+ (if (eq? #t port)
+ (current-output-port)
+ port)))))
+
+(define (format/safe port s . args)
+ (let ((str (apply format #f s args)))
+ (if (eq? #f port)
+ str
+ (display/safe
+ str
+ (if (eq? #t port)
+ (current-output-port)
+ port)))))
+
+(define-class <custom-port-log> (<log-handler>)
+ (port #:init-value #f #:accessor port #:init-keyword #:port))
+
+(define-method (emit-log (self <custom-port-log>) str)
+ (when (port self)
+ (put-bytevector (port self)
+ (string->utf8 str))
+ ;; Even though the port is line buffered, writing to it with
+ ;; put-bytevector doesn't cause the buffer to be flushed.
+ (force-output (port self))))
+
+(define-method (flush-log (self <custom-port-log>))
+ (and=> (port self) force-output))
+
+(define-method (close-log! (self <custom-port-log>))
+ (and=> (port self) close-port)
+ (set! (port self) #f))
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 5362b18..d836ceb 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,4 +1,6 @@
(define-module (guix-build-coordinator utils fibers)
+ #:use-module (srfi srfi-9)
+ #:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
@@ -11,284 +13,20 @@
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (fibers conditions)
+ #:use-module (knots timeout)
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
- #:export (make-worker-thread-channel
- call-with-worker-thread
- worker-thread-timeout-error?
+ #:export (spawn-port-monitoring-fiber
- call-with-sigint
+ make-discrete-priority-queueing-channels
- run-server/patched
-
- spawn-port-monitoring-fiber
-
- letpar&
-
- with-fibers-timeout
- with-fibers-port-timeouts)
+ make-reusable-condition
+ reusable-condition?
+ signal-reusable-condition!
+ reusable-condition-wait)
#:replace (retry-on-error))
-(define %worker-thread-args
- (make-parameter #f))
-
-(define* (make-worker-thread-channel initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- (duration-logger (const #f))
- destructor
- lifetime
- (log-exception? (const #t))
- (expire-on-exception? #f)
- (name "unnamed"))
- "Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
-
- (define (initializer/safe)
- (let ((args
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running initializer in worker thread (~A): ~A:\n ~A\n"
- name
- initializer
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- initializer
- (lambda args
- (backtrace))))
- #:unwind? #t)))
-
- (if args
- args
- ;; never give up, just keep retrying
- (begin
- (sleep 5)
- (initializer/safe)))))
-
- (define (destructor/safe args)
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running destructor in worker thread (~A): ~A:\n ~A\n"
- name
- destructor
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply destructor args)
- #t)
- (lambda _
- (backtrace))))
- #:unwind? #t)))
-
- (or success?
- #t
- (begin
- (sleep 5)
- (destructor/safe args)))))
-
- (define (process channel args)
- (let loop ((current-lifetime lifetime))
- (let ((exception?
- (match (get-message channel)
- (((? channel? reply) sent-time (? procedure? proc))
- (let ((time-delay
- (- (get-internal-real-time)
- sent-time)))
- (delay-logger (/ time-delay
- internal-time-units-per-second))
-
- (let* ((start-time (get-internal-real-time))
- (response
- (with-exception-handler
- (lambda (exn)
- (list 'worker-thread-error
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (call-with-values
- (lambda ()
- (start-stack
- 'worker-thread
- (apply proc args)))
- (lambda vals
- (cons (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- vals))))
- (lambda args
- (when (match args
- (('%exception exn)
- (log-exception? exn))
- (_ #t))
- (simple-format
- (current-error-port)
- "worker-thread: exception: ~A\n" args)
- (backtrace)))))
- #:unwind? #t)))
- (put-message reply
- response)
-
- (match response
- (('worker-thread-error duration _)
- (when duration-logger
- (duration-logger duration proc))
- #t)
- ((duration . _)
- (when duration-logger
- (duration-logger duration proc))
- #f))))))))
- (unless (and expire-on-exception?
- exception?)
- (if (number? current-lifetime)
- (unless (< current-lifetime 0)
- (loop (if current-lifetime
- (- current-lifetime 1)
- #f)))
- (loop #f))))))
-
- (let ((channel (make-channel)))
- (for-each
- (lambda (thread-index)
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append
- name " w t "
- (number->string thread-index))))
- (const #t))
-
- (let init ((args (initializer/safe)))
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "worker-thread-channel: exception: ~A\n" exn))
- (lambda ()
- (parameterize ((%worker-thread-args args))
- (process channel args)))
- #:unwind? #t)
-
- (when destructor
- (destructor/safe args))
-
- (init (initializer/safe))))))
- (iota parallelism))
- channel))
-
-(define &worker-thread-timeout
- (make-exception-type '&worker-thread-timeout
- &error
- '()))
-
-(define make-worker-thread-timeout-error
- (record-constructor &worker-thread-timeout))
-
-(define worker-thread-timeout-error?
- (record-predicate &worker-thread-timeout))
-
-(define* (call-with-worker-thread channel proc #:key duration-logger
- (timeout 30))
- "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
-If already in the worker thread, call PROC immediately."
- (let ((args (%worker-thread-args)))
- (if args
- (call-with-delay-logging proc #:args args)
- (let* ((reply (make-channel))
- (operation-success?
- (perform-operation
- (let ((put
- (wrap-operation
- (put-operation channel
- (list reply
- (get-internal-real-time)
- proc))
- (const #t))))
-
- (if timeout
- (choice-operation
- put
- (wrap-operation (sleep-operation timeout)
- (const #f)))
- put)))))
-
- (unless operation-success?
- (raise-exception
- (make-worker-thread-timeout-error)))
-
- (match (get-message reply)
- (('worker-thread-error duration exn)
- (when duration-logger
- (duration-logger duration))
- (raise-exception exn))
- ((duration . result)
- (when duration-logger
- (duration-logger duration))
- (apply values result)))))))
-
-;; Copied from (fibers web server)
-(define (call-with-sigint thunk cvar)
- (let ((handler #f))
- (dynamic-wind
- (lambda ()
- (set! handler
- (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
- thunk
- (lambda ()
- (if handler
- ;; restore Scheme handler, SIG_IGN or SIG_DFL.
- (sigaction SIGINT (car handler) (cdr handler))
- ;; restore original C handler.
- (sigaction SIGINT #f))))))
-
-;; This variant of run-server from the fibers library supports running
-;; multiple servers within one process.
-(define run-server/patched
- (let ((fibers-web-server-module
- (resolve-module '(fibers web server))))
-
- (define set-nonblocking!
- (module-ref fibers-web-server-module 'set-nonblocking!))
-
- (define make-default-socket
- (module-ref fibers-web-server-module 'make-default-socket))
-
- (define socket-loop
- (module-ref fibers-web-server-module 'socket-loop))
-
- (lambda* (handler
- #:key
- (host #f)
- (family AF_INET)
- (addr (if host
- (inet-pton family host)
- INADDR_LOOPBACK))
- (port 8080)
- (socket (make-default-socket family addr port)))
- ;; We use a large backlog by default. If the server is suddenly hit
- ;; with a number of connections on a small backlog, clients won't
- ;; receive confirmation for their SYN, leading them to retry --
- ;; probably successfully, but with a large latency.
- (listen socket 1024)
- (set-nonblocking! socket)
- (sigaction SIGPIPE SIG_IGN)
- (spawn-fiber (lambda () (socket-loop socket handler))))))
-
(define (spawn-port-monitoring-fiber port error-condition)
(spawn-fiber
(lambda ()
@@ -304,232 +42,17 @@ If already in the worker thread, call PROC immediately."
"port monitoring fiber error-condition unresponsive")
(primitive-exit 1))
(lambda ()
- (with-fibers-port-timeouts
+ (with-port-timeouts
(lambda ()
- (let ((sock (socket PF_INET SOCK_STREAM 0)))
+ (let ((sock
+ (non-blocking-port
+ (socket PF_INET SOCK_STREAM 0))))
(connect sock AF_INET INADDR_LOOPBACK port)
(close-port sock)))
#:timeout 20))
#:unwind? #t)
(sleep 20)))))
-(define (defer-to-fiber thunk)
- (let ((reply (make-channel)))
- (spawn-fiber
- (lambda ()
- (put-message
- reply
- (with-exception-handler
- (lambda (exn)
- (cons 'worker-fiber-error exn))
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "worker fiber: exception: ~A\n"
- exn)
- (backtrace)
- (raise-exception exn))
- (lambda ()
- (call-with-values
- thunk
- (lambda vals
- vals)))))
- #:unwind? #t)))
- #:parallel? #t)
- reply))
-
-(define (fetch-result-of-defered-thunks . reply-channels)
- (let ((responses (map get-message reply-channels)))
- (map
- (match-lambda
- (('worker-thread-error . exn)
- (raise-exception exn))
- (result
- (apply values result)))
- responses)))
-
-(define-syntax parallel-via-fibers
- (lambda (x)
- (syntax-case x ()
- ((_ e0 ...)
- (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
- #'(let ((tmp0 (defer-to-fiber
- (lambda ()
- e0)))
- ...)
- (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
-
-(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
- (call-with-values
- (lambda () (parallel-via-fibers e ...))
- (lambda (v ...)
- b0 b1 ...)))
-
-(define* (with-fibers-timeout thunk #:key timeout on-timeout)
- (let ((channel (make-channel)))
- (spawn-fiber
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (perform-operation
- (choice-operation
- (put-operation channel (cons 'exception exn))
- (sleep-operation timeout))))
- (lambda ()
- (call-with-values thunk
- (lambda vals
- (perform-operation
- (choice-operation
- (put-operation channel vals)
- (sleep-operation timeout))))))
- #:unwind? #t)))
-
- (match (perform-operation
- (choice-operation
- (get-operation channel)
- (wrap-operation (sleep-operation timeout)
- (const 'timeout))))
- ('timeout
- (on-timeout))
- (('exception . exn)
- (raise-exception exn))
- (vals
- (apply values vals)))))
-
-(define &port-timeout
- (make-exception-type '&port-timeout
- &external-error
- '(port)))
-
-(define make-port-timeout-error
- (record-constructor &port-timeout))
-
-(define port-timeout-error?
- (record-predicate &port-timeout))
-
-(define &port-read-timeout
- (make-exception-type '&port-read-timeout
- &port-timeout
- '()))
-
-(define make-port-read-timeout-error
- (record-constructor &port-read-timeout))
-
-(define port-read-timeout-error?
- (record-predicate &port-read-timeout))
-
-(define &port-write-timeout
- (make-exception-type '&port-write-timeout
- &port-timeout
- '()))
-
-(define make-port-write-timeout-error
- (record-constructor &port-write-timeout))
-
-(define port-write-timeout-error?
- (record-predicate &port-write-timeout))
-
-;; These procedure are subject to spurious wakeups.
-
-(define (readable? port)
- "Test if PORT is writable."
- (match (select (vector port) #() #() 0)
- ((#() #() #()) #f)
- ((#(_) #() #()) #t)))
-
-(define (writable? port)
- "Test if PORT is writable."
- (match (select #() (vector port) #() 0)
- ((#() #() #()) #f)
- ((#() #(_) #()) #t)))
-
-(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
- (make-base-operation #f
- (lambda _
- (and (ready? (port-ready-fd port)) values))
- (lambda (flag sched resume)
- (define (commit)
- (match (atomic-box-compare-and-swap! flag 'W 'S)
- ('W (resume values))
- ('C (commit))
- ('S #f)))
- (schedule-when-ready
- sched (port-ready-fd port) commit))))
-
-(define (wait-until-port-readable-operation port)
- "Make an operation that will succeed when PORT is readable."
- (unless (input-port? port)
- (error "refusing to wait forever for input on non-input port"))
- (make-wait-operation readable? schedule-task-when-fd-readable port
- port-read-wait-fd
- wait-until-port-readable-operation))
-
-(define (wait-until-port-writable-operation port)
- "Make an operation that will succeed when PORT is writable."
- (unless (output-port? port)
- (error "refusing to wait forever for output on non-output port"))
- (make-wait-operation writable? schedule-task-when-fd-writable port
- port-write-wait-fd
- wait-until-port-writable-operation))
-
-(define* (with-fibers-port-timeouts thunk
- #:key timeout
- (read-timeout timeout)
- (write-timeout timeout))
- (define (no-fibers-wait port mode timeout)
- (define poll-timeout-ms 200)
-
- ;; When the GC runs, it restarts the poll syscall, but the timeout
- ;; remains unchanged! When the timeout is longer than the time
- ;; between the syscall restarting, I think this renders the
- ;; timeout useless. Therefore, this code uses a short timeout, and
- ;; repeatedly calls poll while watching the clock to see if it has
- ;; timed out overall.
- (let ((timeout-internal
- (+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
- (let loop ((poll-value
- (port-poll port mode poll-timeout-ms)))
- (if (= poll-value 0)
- (if (> (get-internal-real-time)
- timeout-internal)
- (raise-exception
- (if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
- (loop (port-poll port mode poll-timeout-ms)))
- poll-value))))
-
- (parameterize
- ((current-read-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-readable-operation port)
- (wrap-operation
- (sleep-operation read-timeout)
- (lambda ()
- (raise-exception
- (make-port-read-timeout-error thunk port))))))
- (no-fibers-wait port "r" read-timeout))))
- (current-write-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-writable-operation port)
- (wrap-operation
- (sleep-operation write-timeout)
- (lambda ()
- (raise-exception
- (make-port-write-timeout-error thunk port))))))
- (no-fibers-wait port "w" write-timeout)))))
- (thunk)))
-
;; Use the fibers sleep
(define (retry-on-error . args)
(apply
@@ -537,3 +60,95 @@ If already in the worker thread, call PROC immediately."
(append
args
(list #:sleep-impl sleep))))
+
+(define (make-discrete-priority-queueing-channels channel num-priorities)
+ (define all-queues
+ (map (lambda _ (make-q))
+ (iota num-priorities)))
+
+ (define queue-channels
+ (map (lambda _ (make-channel))
+ (iota num-priorities)))
+
+ (define (stats)
+ (map (lambda (queue)
+ `((length . ,(q-length queue))))
+ all-queues))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let loop ((queues all-queues))
+ (let ((queue (car queues)))
+ (if (q-empty? queue)
+ (let ((next (cdr queues)))
+ (if (null? next)
+ (perform-operation
+ (apply
+ choice-operation
+ (map (lambda (queue queue-channel)
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val))))
+ all-queues
+ queue-channels)))
+ (loop next)))
+ (let ((front (q-front queue)))
+ (perform-operation
+ (apply
+ choice-operation
+ (cons
+ (wrap-operation (put-operation channel front)
+ (lambda _
+ (q-pop! queue)))
+ (map (lambda (queue queue-channel)
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val))))
+ all-queues
+ queue-channels)))))))))))
+ (values (list-copy queue-channels)
+ stats))
+
+(define-record-type <reusable-condition>
+ (%make-reusable-condition atomic-box channel)
+ reusable-condition?
+ (atomic-box reusable-condition-atomic-box)
+ (channel reusable-condition-channel))
+
+(define (make-reusable-condition)
+ (%make-reusable-condition (make-atomic-box #f)
+ (make-channel)))
+
+(define* (signal-reusable-condition! reusable-condition
+ #:optional (scheduler (current-scheduler)))
+ (match (atomic-box-compare-and-swap!
+ (reusable-condition-atomic-box reusable-condition)
+ #f
+ #t)
+ (#f
+ (spawn-fiber
+ (lambda ()
+ (put-message (reusable-condition-channel reusable-condition)
+ #t))
+ scheduler)
+ #t)
+ (#t #f)))
+
+(define* (reusable-condition-wait reusable-condition
+ #:key (timeout #f))
+ (let ((val
+ (if (atomic-box-ref (reusable-condition-atomic-box reusable-condition))
+ #t
+ ;; Not great as this is subject to race conditions, but it should
+ ;; roughly work
+ (if timeout
+ (perform-operation
+ (choice-operation
+ (get-operation (reusable-condition-channel reusable-condition))
+ (wrap-operation (sleep-operation timeout)
+ (const #f))))
+ (get-message (reusable-condition-channel reusable-condition))))))
+ (atomic-box-set! (reusable-condition-atomic-box reusable-condition)
+ #f)
+ val))
diff --git a/guix-build-coordinator/utils/timeout.scm b/guix-build-coordinator/utils/timeout.scm
new file mode 100644
index 0000000..bb133d7
--- /dev/null
+++ b/guix-build-coordinator/utils/timeout.scm
@@ -0,0 +1,81 @@
+(define-module (guix-build-coordinator utils timeout)
+ #:use-module (ice-9 exceptions)
+ #:use-module (ice-9 suspendable-ports)
+ #:use-module ((ice-9 ports internal) #:select (port-poll))
+ #:export (&port-timeout
+ &port-read-timeout
+ &port-write-timeout
+
+ port-timeout-error?
+ port-read-timeout-error?
+ port-write-timeout-error?
+
+ with-port-timeouts))
+
+(define &port-timeout
+ (make-exception-type '&port-timeout
+ &external-error
+ '(port)))
+
+(define make-port-timeout-error
+ (record-constructor &port-timeout))
+
+(define port-timeout-error?
+ (record-predicate &port-timeout))
+
+(define &port-read-timeout
+ (make-exception-type '&port-read-timeout
+ &port-timeout
+ '()))
+
+(define make-port-read-timeout-error
+ (record-constructor &port-read-timeout))
+
+(define port-read-timeout-error?
+ (record-predicate &port-read-timeout))
+
+(define &port-write-timeout
+ (make-exception-type '&port-write-timeout
+ &port-timeout
+ '()))
+
+(define make-port-write-timeout-error
+ (record-constructor &port-write-timeout))
+
+(define port-write-timeout-error?
+ (record-predicate &port-write-timeout))
+
+(define* (with-port-timeouts thunk #:key timeout)
+
+ ;; When the GC runs, it restarts the poll syscall, but the timeout remains
+ ;; unchanged! When the timeout is longer than the time between the syscall
+ ;; restarting, I think this renders the timeout useless. Therefore, this
+ ;; code uses a short timeout, and repeatedly calls poll while watching the
+ ;; clock to see if it has timed out overall.
+ (define poll-timeout-ms 200)
+
+ (define (wait port mode)
+ (let ((timeout-internal
+ (+ (get-internal-real-time)
+ (* internal-time-units-per-second timeout))))
+ (let loop ((poll-value
+ (port-poll port mode poll-timeout-ms)))
+ (if (= poll-value 0)
+ (if (> (get-internal-real-time)
+ timeout-internal)
+ (raise-exception
+ (if (string=? mode "r")
+ (make-port-read-timeout-error port)
+ (make-port-write-timeout-error port)))
+ (loop (port-poll port mode poll-timeout-ms)))
+ poll-value))))
+
+ (parameterize
+ ((current-read-waiter
+ (lambda (port)
+ (wait port "r")))
+ (current-write-waiter
+ (lambda (port)
+ (wait port "w"))))
+ (thunk)))
+
diff --git a/guix-dev.scm b/guix-dev.scm
index 35869dc..011d9e0 100644
--- a/guix-dev.scm
+++ b/guix-dev.scm
@@ -44,6 +44,39 @@
(gnu packages texinfo)
(srfi srfi-1))
+(define guile-knots
+ (let ((commit "d572f591a3c136bfc7b23160e16381c92588f8d9")
+ (revision "1"))
+ (package
+ (name "guile-knots")
+ (version (git-version "0" revision commit))
+ (source (origin
+ (method git-fetch)
+ (uri (git-reference
+ (url "https://git.cbaines.net/git/guile/knots")
+ (commit commit)))
+ (sha256
+ (base32
+ "0g85frfniblxb2cl81fg558ic3cxvla7fvml08scjgbbxn8151gv"))
+ (file-name (string-append name "-" version "-checkout"))))
+ (build-system gnu-build-system)
+ (native-inputs
+ (list pkg-config
+ autoconf
+ automake
+ guile-3.0
+ guile-lib
+ guile-fibers))
+ (inputs
+ (list guile-3.0))
+ (propagated-inputs
+ (list guile-fibers))
+ (home-page "https://git.cbaines.net/guile/knots")
+ (synopsis "Patterns and functionality to use with Guile Fibers")
+ (description
+ "")
+ (license license:gpl3+))))
+
(package
(name "guix-build-coordinator")
(version "0.0.0")
@@ -53,6 +86,7 @@
(list guix
guile-json-4
guile-fibers-1.3
+ guile-knots
guile-gcrypt
guile-readline
guile-lib
diff --git a/scripts/guix-build-coordinator-agent.in b/scripts/guix-build-coordinator-agent.in
index 2f9e774..69ecf8a 100644
--- a/scripts/guix-build-coordinator-agent.in
+++ b/scripts/guix-build-coordinator-agent.in
@@ -126,6 +126,11 @@
(lambda (opt name arg result)
(alist-cons 'metrics-file
arg
+ result)))
+ (option '("timestamp-log-output") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'timestamp-log-output?
+ (string=? arg "true")
result)))))
(define %option-defaults
@@ -140,7 +145,8 @@
(* 3 (/ (total-processor-count) 4)))
1))
(dynamic-auth-token
- . ,(getenv "GUIX_BUILD_COORDINATOR_DYNAMIC_AUTH_TOKEN"))))
+ . ,(getenv "GUIX_BUILD_COORDINATOR_DYNAMIC_AUTH_TOKEN"))
+ (timestamp-log-output? . #t)))
(define (parse-options options defaults args)
(args-fold
@@ -207,4 +213,5 @@
(or (assq-ref opts 'non-derivation-substitute-urls)
(assq-ref opts 'substitute-urls))
(assq-ref opts 'metrics-file)
- (assq-ref opts 'max-1min-load-average)))))
+ (assq-ref opts 'max-1min-load-average)
+ (assq-ref opts 'timestamp-log-output?)))))
diff --git a/scripts/guix-build-coordinator.in b/scripts/guix-build-coordinator.in
index 4756bea..0c06579 100644
--- a/scripts/guix-build-coordinator.in
+++ b/scripts/guix-build-coordinator.in
@@ -58,15 +58,6 @@
(guix-build-coordinator build-allocator)
(guix-build-coordinator client-communication))
-(install-suspendable-ports!)
-
-;; TODO Work around this causing problems with backtraces
-;; https://github.com/wingo/fibers/issues/76
-(set-record-type-printer!
- (@@ (fibers scheduler) <scheduler>)
- (lambda (scheduler port)
- (display "#<scheduler>" port)))
-
(define %base-options
;; Specifications of the command-line options
(list (option '("secret-key-base-file") #t #f
@@ -83,6 +74,13 @@
(option '("update-database") #f #f
(lambda (opt name _ result)
(alist-cons 'update-database #t result)))
+ (option '("listen-repl") #f #t
+ (lambda (opt name arg result)
+ (alist-cons 'listen-repl
+ (if arg
+ (string->number arg)
+ #t)
+ (alist-delete 'listen-repl result))))
(option '("show-error-details") #f #f
(lambda (opt name _ result)
(alist-cons 'show-error-details #t result)))))
@@ -295,6 +293,16 @@
(or (assq-ref result 'not-systems)
'()))
(alist-delete 'not-systems result))))
+ (option '("created-at-gt") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'created-at->
+ (datastore-validate-datetime-string arg)
+ result)))
+ (option '("created-at-lt") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'created-at-<
+ (datastore-validate-datetime-string arg)
+ result)))
(option '("skip-updating-derived-priorities") #f #f
(lambda (opt name _ result)
(alist-cons 'skip-updating-derived-priorities
@@ -414,7 +422,12 @@
"error: ~A is not a known allocation strategy\n"
arg)
(exit 1)))
- result))))
+ result)))
+ (option '("timestamp-log-output") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'timestamp-log-output?
+ (string=? arg "true")
+ result))))
(append-map
(lambda (hook)
(list
@@ -502,6 +515,7 @@ canceled?: ~A
(vector->list
(assoc-ref build-details "tags")))
"\n"))
+ (newline)
(let ((derivation-inputs
(vector->list
@@ -659,6 +673,8 @@ tags:
#:not-tags (assq-ref opts 'not-tags)
#:systems (assq-ref opts 'systems)
#:not-systems (assq-ref opts 'not-systems)
+ #:created-at-< (assq-ref opts 'created-at-<)
+ #:created-at-> (assq-ref opts 'created-at->)
#:processed #f
#:canceled #f
#:relationship (assq-ref opts 'relationship)))
@@ -695,7 +711,14 @@ tags:
(assq-ref
opts 'ignore-if-build-required-by-another)))
#:times 6
- #:delay 5)))
+ #:delay 5
+ #:ignore
+ (lambda (exn)
+ (and (exception-with-message? exn)
+ (string=?
+ "build-already-canceled"
+ (assoc-ref (exception-message exn)
+ "error")))))))
(unless (string=? (assoc-ref result "result")
"build-canceled")
(simple-format #t "~A\n"
@@ -794,7 +817,8 @@ tags:
(let ((response
(send-submit-build-request
(assq-ref opts 'coordinator)
- derivation-file
+ ;; Whitespace like \r can creep in here, so strip it
+ (string-trim-both derivation-file)
(assq-ref opts 'derivation-substitute-urls)
#f ; TODO requested uuid
(assq-ref opts 'priority)
@@ -1083,7 +1107,9 @@ tags:
#:database-uri-string (assq-ref opts 'database)
#:hooks hooks-with-defaults
#:allocation-strategy
- (assq-ref opts 'allocation-strategy))))
+ (assq-ref opts 'allocation-strategy)
+ #:timestamp-log-output?
+ (assq-ref opts 'timestamp-log-output?))))
(parameterize ((%show-error-details
(assoc-ref opts 'show-error-details)))
@@ -1094,4 +1120,5 @@ tags:
#:pid-file (assq-ref opts 'pid-file)
#:agent-communication-uri (assq-ref opts 'agent-communication)
#:client-communication-uri (assq-ref opts 'client-communication)
- #:parallel-hooks (assq-ref opts 'parallel-hooks)))))))
+ #:parallel-hooks (assq-ref opts 'parallel-hooks)
+ #:listen-repl (assoc-ref opts 'listen-repl)))))))
diff --git a/sqitch/pg/deploy/allocated_builds_submit_outputs.sql b/sqitch/pg/deploy/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..a2ebe1e
--- /dev/null
+++ b/sqitch/pg/deploy/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Deploy guix-build-coordinator:allocated_builds_submit_outputs to pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/deploy/background-jobs-queue.sql b/sqitch/pg/deploy/background-jobs-queue.sql
new file mode 100644
index 0000000..63dd8d4
--- /dev/null
+++ b/sqitch/pg/deploy/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Deploy guix-build-coordinator:background-jobs-queue to pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/deploy/builds_replace_unprocessed_index.sql b/sqitch/pg/deploy/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..782cf6b
--- /dev/null
+++ b/sqitch/pg/deploy/builds_replace_unprocessed_index.sql
@@ -0,0 +1,7 @@
+-- Deploy guix-build-coordinator:builds_replace_unprocessed_index to pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/revert/allocated_builds_submit_outputs.sql b/sqitch/pg/revert/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..255efb1
--- /dev/null
+++ b/sqitch/pg/revert/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:allocated_builds_submit_outputs from pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/revert/background-jobs-queue.sql b/sqitch/pg/revert/background-jobs-queue.sql
new file mode 100644
index 0000000..46b0761
--- /dev/null
+++ b/sqitch/pg/revert/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:background-jobs-queue from pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/revert/builds_replace_unprocessed_index.sql b/sqitch/pg/revert/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..4395f90
--- /dev/null
+++ b/sqitch/pg/revert/builds_replace_unprocessed_index.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:builds_replace_unprocessed_index from pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/verify/allocated_builds_submit_outputs.sql b/sqitch/pg/verify/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..e95a717
--- /dev/null
+++ b/sqitch/pg/verify/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:allocated_builds_submit_outputs on pg
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/pg/verify/background-jobs-queue.sql b/sqitch/pg/verify/background-jobs-queue.sql
new file mode 100644
index 0000000..a36097e
--- /dev/null
+++ b/sqitch/pg/verify/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:background-jobs-queue on pg
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/pg/verify/builds_replace_unprocessed_index.sql b/sqitch/pg/verify/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..050ef75
--- /dev/null
+++ b/sqitch/pg/verify/builds_replace_unprocessed_index.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:builds_replace_unprocessed_index on pg
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/sqitch.plan b/sqitch/sqitch.plan
index cfb5b9d..f4f538b 100644
--- a/sqitch/sqitch.plan
+++ b/sqitch/sqitch.plan
@@ -46,3 +46,6 @@ agent_status_add_processor_count 2023-03-24T09:28:47Z Chris <chris@felis> # Add
remove_build_allocation_plan 2023-04-23T19:50:23Z Chris <chris@felis> # Remove build_allocation_plan
system_uptime 2023-05-05T18:18:35Z Chris <chris@felis> # Add system uptime
build_starts_index 2023-11-24T16:30:13Z Chris <chris@felis> # build_starts index
+background-jobs-queue 2025-02-06T10:49:08Z Chris <chris@fang> # Add background_jobs_queue
+builds_replace_unprocessed_index 2025-02-19T11:19:42Z Chris <chris@fang> # Replace builds_unprocessed
+allocated_builds_submit_outputs 2025-03-02T08:22:48Z Chris <chris@fang> # Add allocated_builds.submit_outputs
diff --git a/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql b/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..66d6b45
--- /dev/null
+++ b/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Deploy guix-build-coordinator:allocated_builds_submit_outputs to sqlite
+
+BEGIN;
+
+ALTER TABLE allocated_builds ADD COLUMN submit_outputs BOOLEAN DEFAULT NULL;
+
+COMMIT;
diff --git a/sqitch/sqlite/deploy/background-jobs-queue.sql b/sqitch/sqlite/deploy/background-jobs-queue.sql
new file mode 100644
index 0000000..1cb35f0
--- /dev/null
+++ b/sqitch/sqlite/deploy/background-jobs-queue.sql
@@ -0,0 +1,11 @@
+-- Deploy guix-build-coordinator:background-jobs-queue to sqlite
+
+BEGIN;
+
+CREATE TABLE background_jobs_queue (
+ id INTEGER PRIMARY KEY,
+ type TEXT NOT NULL,
+ args TEXT NOT NULL
+);
+
+COMMIT;
diff --git a/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql b/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..a404f31
--- /dev/null
+++ b/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql
@@ -0,0 +1,9 @@
+-- Deploy guix-build-coordinator:builds_replace_unprocessed_index to sqlite
+
+BEGIN;
+
+DROP INDEX builds_unprocessed;
+
+CREATE INDEX builds_live ON builds (id) WHERE processed = 0 AND canceled = 0;
+
+COMMIT;
diff --git a/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql b/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..240de22
--- /dev/null
+++ b/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:allocated_builds_submit_outputs from sqlite
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/sqlite/revert/background-jobs-queue.sql b/sqitch/sqlite/revert/background-jobs-queue.sql
new file mode 100644
index 0000000..10ef1ff
--- /dev/null
+++ b/sqitch/sqlite/revert/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:background-jobs-queue from sqlite
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql b/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..b02dd3c
--- /dev/null
+++ b/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:builds_replace_unprocessed_index from sqlite
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql b/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..0b1331e
--- /dev/null
+++ b/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:allocated_builds_submit_outputs on sqlite
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/sqlite/verify/background-jobs-queue.sql b/sqitch/sqlite/verify/background-jobs-queue.sql
new file mode 100644
index 0000000..1cf9965
--- /dev/null
+++ b/sqitch/sqlite/verify/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:background-jobs-queue on sqlite
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql b/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..d0dd086
--- /dev/null
+++ b/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:builds_replace_unprocessed_index on sqlite
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;