aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.dir-locals.el2
-rw-r--r--Makefile.am3
-rw-r--r--guix-build-coordinator/agent-messaging/http.scm21
-rw-r--r--guix-build-coordinator/agent-messaging/http/server.scm68
-rw-r--r--guix-build-coordinator/agent.scm155
-rw-r--r--guix-build-coordinator/build-allocator.scm97
-rw-r--r--guix-build-coordinator/client-communication.scm134
-rw-r--r--guix-build-coordinator/coordinator.scm996
-rw-r--r--guix-build-coordinator/datastore.scm17
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm1187
-rw-r--r--guix-build-coordinator/guix-data-service.scm4
-rw-r--r--guix-build-coordinator/hooks.scm25
-rw-r--r--guix-build-coordinator/utils.scm316
-rw-r--r--guix-build-coordinator/utils/fibers.scm587
-rw-r--r--guix-build-coordinator/utils/timeout.scm81
-rw-r--r--guix-dev.scm34
-rw-r--r--scripts/guix-build-coordinator.in31
-rw-r--r--sqitch/pg/deploy/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/pg/deploy/background-jobs-queue.sql7
-rw-r--r--sqitch/pg/deploy/builds_replace_unprocessed_index.sql7
-rw-r--r--sqitch/pg/revert/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/pg/revert/background-jobs-queue.sql7
-rw-r--r--sqitch/pg/revert/builds_replace_unprocessed_index.sql7
-rw-r--r--sqitch/pg/verify/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/pg/verify/background-jobs-queue.sql7
-rw-r--r--sqitch/pg/verify/builds_replace_unprocessed_index.sql7
-rw-r--r--sqitch/sqitch.plan3
-rw-r--r--sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/sqlite/deploy/background-jobs-queue.sql11
-rw-r--r--sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql9
-rw-r--r--sqitch/sqlite/revert/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/sqlite/revert/background-jobs-queue.sql7
-rw-r--r--sqitch/sqlite/revert/builds_replace_unprocessed_index.sql7
-rw-r--r--sqitch/sqlite/verify/allocated_builds_submit_outputs.sql7
-rw-r--r--sqitch/sqlite/verify/background-jobs-queue.sql7
-rw-r--r--sqitch/sqlite/verify/builds_replace_unprocessed_index.sql7
36 files changed, 2172 insertions, 1721 deletions
diff --git a/.dir-locals.el b/.dir-locals.el
index 60601df..1c73c7a 100644
--- a/.dir-locals.el
+++ b/.dir-locals.el
@@ -10,6 +10,6 @@
(eval put 'with-db-worker-thread 'scheme-indent-function 1)
(eval put 'with-time-logging 'scheme-indent-function 1)
(eval put 'with-timeout 'scheme-indent-function 1)
- (eval put 'letpar& 'scheme-indent-function 1)
+ (eval put 'fibers-let 'scheme-indent-function 1)
(eval . (put 'call-with-lzip-output-port 'scheme-indent-function 1))
(eval . (put 'with-store 'scheme-indent-function 1))))
diff --git a/Makefile.am b/Makefile.am
index 5081695..8a9af41 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -10,7 +10,8 @@ MINIMALSOURCES = \
guix-build-coordinator/agent-messaging/http.scm \
guix-build-coordinator/agent.scm \
guix-build-coordinator/config.scm \
- guix-build-coordinator/utils.scm
+ guix-build-coordinator/utils.scm \
+ guix-build-coordinator/utils/timeout.scm
ALLSOURCES = \
$(MINIMALSOURCES) \
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm
index 0baa75b..33fb717 100644
--- a/guix-build-coordinator/agent-messaging/http.scm
+++ b/guix-build-coordinator/agent-messaging/http.scm
@@ -50,6 +50,7 @@
#:use-module (guix base64)
#:use-module (guix build utils)
#:use-module (guix-build-coordinator utils)
+ #:use-module (guix-build-coordinator utils timeout)
#:use-module (guix-build-coordinator agent-messaging abstract)
#:export (make-http-agent-interface
@@ -73,10 +74,10 @@
password)
(let* ((gnutls-ver (gnutls-version))
(guix-ver %guix-version))
- (simple-format (current-error-port)
- "(gnutls version: ~A, guix version: ~A)\n"
- gnutls-ver
- guix-ver))
+ (simple-format/safe (current-error-port)
+ "(gnutls version: ~A, guix version: ~A)\n"
+ gnutls-ver
+ guix-ver))
(make <http-agent-interface>
#:coordinator-uri coordinator-uri
@@ -204,7 +205,8 @@
response)))))
(retry-on-error (lambda ()
- (with-port-timeouts make-request))
+ (with-port-timeouts make-request
+ #:timeout 120))
#:times retry-times
#:delay 10
#:no-retry agent-error-from-coordinator?))
@@ -420,10 +422,11 @@
(lambda (uploaded-bytes)
(= uploaded-bytes file-size)))
(retry-on-error (lambda ()
- (with-throw-handler #t
- perform-upload
- (lambda _
- (backtrace))))
+ (with-exception-handler
+ (lambda (exn)
+ (backtrace)
+ (raise-exception exn))
+ perform-upload))
#:times 100
#:delay 60
#:error-hook
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm
index 2b325d7..fa227da 100644
--- a/guix-build-coordinator/agent-messaging/http/server.scm
+++ b/guix-build-coordinator/agent-messaging/http/server.scm
@@ -46,13 +46,15 @@
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers operations)
+ #:use-module (knots)
+ #:use-module (knots timeout)
+ #:use-module (knots web-server)
+ #:use-module (knots thread-pool)
#:use-module (prometheus)
#:use-module (guix base32)
#:use-module (guix base64)
#:use-module (guix progress)
#:use-module (guix build utils)
- #:use-module ((guix build syscalls)
- #:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator utils fibers)
#:use-module (guix-build-coordinator datastore)
@@ -145,18 +147,15 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(define process-metrics-updater
(get-process-metrics-updater plain-metrics-registry))
- (define thread-metric
- (make-gauge-metric
- (build-coordinator-metrics-registry build-coordinator)
- "guile_threads_total"))
-
(define datastore-metrics-updater
(base-datastore-metrics-updater build-coordinator))
+ (define (build-coordinator-metrics-updater)
+ (build-coordinator-update-metrics build-coordinator))
+
(define (update-managed-metrics!)
+ (call-with-delay-logging build-coordinator-metrics-updater)
(call-with-delay-logging gc-metrics-updater)
- (metric-set thread-metric
- (length (all-threads)))
(call-with-delay-logging process-metrics-updater)
(call-with-delay-logging datastore-metrics-updater))
@@ -167,8 +166,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
"exception when starting: ~A\n" exn)
(primitive-exit 1))
(lambda ()
- (run-server/patched
- (lambda (request body)
+ (run-knots-web-server
+ (lambda (request)
(log-msg (build-coordinator-logger build-coordinator)
'INFO
(format #f "~4a ~a\n"
@@ -179,7 +178,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(cons (request-method request)
(split-and-decode-uri-path
(uri-path (request-uri request))))
- body
secret-key-base
build-coordinator
output-hash-channel
@@ -609,7 +607,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(define (controller request
method-and-path-components
- body
secret-key-base
build-coordinator
output-hash-channel
@@ -639,6 +636,10 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(define logger
(build-coordinator-logger build-coordinator))
+ ;; TODO Handle this in the controller
+ (define body
+ (read-request-body request))
+
(define (controller-thunk)
(match method-and-path-components
(('GET "agent" uuid)
@@ -707,14 +708,11 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(('POST "agent" uuid "fetch-builds")
(if (authenticated? uuid request)
(let* ((json-body (json-string->scm (utf8->string body)))
- ;; count is deprecated, use target_count instead
- (count (assoc-ref json-body "count"))
(target-count (assoc-ref json-body "target_count"))
(systems (assoc-ref json-body "systems"))
(builds (fetch-builds build-coordinator uuid
(vector->list systems)
- target-count
- count)))
+ target-count)))
(render-json
`((builds . ,(list->vector builds)))))
(render-json
@@ -1009,7 +1007,7 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(render-json
`((error . chunked-input-ended-prematurely))
#:code 400))
- ((worker-thread-timeout-error? exn)
+ ((thread-pool-timeout-error? exn)
(render-json
`((error . ,(simple-format #f "~A" exn)))
#:code 503))
@@ -1018,30 +1016,20 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
`((error . ,(simple-format #f "~A" exn)))
#:code 500))))
(lambda ()
- (with-throw-handler #t
- controller-thunk
- (lambda (key . args)
- (unless (and (eq? '%exception key)
- (or (agent-error? (car args))
- (worker-thread-timeout-error? (car args))
- (chunked-input-ended-prematurely-error? (car args))))
+ (with-exception-handler
+ (lambda (exn)
+ (unless (or (agent-error? exn)
+ (thread-pool-timeout-error? exn)
+ (chunked-input-ended-prematurely-error? exn))
(match method-and-path-components
((method path-components ...)
(simple-format
(current-error-port)
- "error: when processing: /~A ~A\n ~A ~A\n"
- method (string-join path-components "/")
- key args)))
-
- (let* ((stack (make-stack #t 4))
- (backtrace
- (call-with-output-string
- (lambda (port)
- (display "\nBacktrace:\n" port)
- (display-backtrace stack port)
- (newline port)
- (newline port)))))
- (display
- backtrace
- (current-error-port)))))))
+ "error: when processing: /~A ~A\n"
+ method (string-join path-components "/"))))
+
+ (print-backtrace-and-exception/knots exn))
+
+ (raise-exception exn))
+ controller-thunk))
#:unwind? #t))
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index 7afd549..47a7c5f 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -48,6 +48,7 @@
#:use-module ((guix build syscalls)
#:select (set-thread-name free-disk-space))
#:use-module (guix-build-coordinator utils)
+ #:use-module (guix-build-coordinator utils timeout)
#:use-module (guix-build-coordinator agent-messaging)
#:use-module (guix-build-coordinator agent-messaging abstract)
#:export (run-agent))
@@ -80,7 +81,7 @@
max-1min-load-average
timestamp-log-output?)
(define lgr (make <logger>))
- (define port-log (make <port-log>
+ (define port-log (make <custom-port-log>
#:port (current-output-port)
#:formatter
;; In guile-lib v0.2.8 onwards, the formatter is
@@ -99,13 +100,13 @@
(let ((directory (dirname metrics-file)))
(or (file-exists? directory)
(begin
- (simple-format (current-error-port)
- "skipping writing metrics as ~A does not exist\n"
- directory)
+ (simple-format/safe (current-error-port)
+ "skipping writing metrics as ~A does not exist\n"
+ directory)
#f)))
(with-exception-handler
(lambda (exn)
- (simple-format
+ (simple-format/safe
(current-error-port)
"skipping writing metrics, encountered exception ~A\n"
exn)
@@ -381,9 +382,16 @@
list-jobs
(create-work-queue current-max-builds
(lambda (build)
- (process-job build
- perform-post-build-actions
- uploads-updater))
+ ;; This poorly handles cases where the
+ ;; connection to the daemon fails, or other
+ ;; errors occur
+ (retry-on-error
+ (lambda ()
+ (process-job build
+ perform-post-build-actions
+ uploads-updater))
+ #:times 3
+ #:delay 10))
#:thread-start-delay
(make-time
time-duration
@@ -399,48 +407,47 @@
20)
#:name "job")))
(define (display-info)
- (display
- (simple-format
- #f "current threads: ~A current jobs: ~A\n~A\n"
- (count-threads)
- (+ (count-jobs) (count-post-build-jobs))
- (string-append
- (string-join
- (map (match-lambda
- ((build-details)
- (simple-format
- #f " - ~A (derived priority: ~A)
+ (simple-format/safe
+ (current-error-port)
+ "current threads: ~A current jobs: ~A\n~A\n"
+ (count-threads)
+ (+ (count-jobs) (count-post-build-jobs))
+ (string-append
+ (string-join
+ (map (match-lambda
+ ((build-details)
+ (simple-format
+ #f " - ~A (derived priority: ~A)
~A"
- (assoc-ref build-details "uuid")
- (assoc-ref build-details "derived_priority")
- (or
- (assoc-ref build-details "derivation_name")
- (assoc-ref build-details "derivation-name")))))
- (list-jobs))
- "\n")
- "\n"
- (string-join
- (map (match-lambda
- ((build-details upload-progress _)
- (simple-format
- #f " - ~A (derived priority: ~A)
+ (assoc-ref build-details "uuid")
+ (assoc-ref build-details "derived_priority")
+ (or
+ (assoc-ref build-details "derivation_name")
+ (assoc-ref build-details "derivation-name")))))
+ (list-jobs))
+ "\n")
+ "\n"
+ (string-join
+ (map (match-lambda
+ ((build-details upload-progress _)
+ (simple-format
+ #f " - ~A (derived priority: ~A)
~A~A"
- (assoc-ref build-details "uuid")
- (assoc-ref build-details "derived_priority")
- (or
- (assoc-ref build-details "derivation_name")
- (assoc-ref build-details "derivation-name"))
- (if (upload-progress-file upload-progress)
- (simple-format
- #f "
+ (assoc-ref build-details "uuid")
+ (assoc-ref build-details "derived_priority")
+ (or
+ (assoc-ref build-details "derivation_name")
+ (assoc-ref build-details "derivation-name"))
+ (if (upload-progress-file upload-progress)
+ (simple-format/safe
+ #f "
uploading ~A (~A/~A)"
- (upload-progress-file upload-progress)
- (upload-progress-bytes-sent upload-progress)
- (upload-progress-total-bytes upload-progress))
- ""))))
- (list-post-build-jobs))
- "\n")))
- (current-error-port)))
+ (upload-progress-file upload-progress)
+ (upload-progress-bytes-sent upload-progress)
+ (upload-progress-total-bytes upload-progress))
+ ""))))
+ (list-post-build-jobs))
+ "\n"))))
(let ((details (submit-status coordinator-interface
'idle
@@ -699,7 +706,15 @@ but the guix-daemon claims it's unavailable"
#:timeout ,(* 60 60)))
(let ((log-port (open-output-string)))
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg lgr 'ERROR
+ "exception when fetching missing paths: "
+ exn)
+ (display/safe (get-output-string log-port))
+ (display/safe "\n")
+ (close-output-port log-port)
+ (raise-exception exn))
(lambda ()
(with-port-timeouts
(lambda ()
@@ -707,14 +722,7 @@ but the guix-daemon claims it's unavailable"
((current-build-output-port log-port))
(build-things fetch-substitute-store
missing-paths)))
- #:timeout (* 60 10)))
- (lambda (key . args)
- (log-msg lgr 'ERROR
- "exception when fetching missing paths "
- key ": " args)
- (display (get-output-string log-port))
- (display (newline))
- (close-output-port log-port)))))
+ #:timeout (* 60 10))))))
(for-each (lambda (missing-path)
(add-temp-root store missing-path))
missing-paths))
@@ -776,7 +784,7 @@ but the guix-daemon claims it's unavailable"
(delete-paths store output-file-names)))
#t)
(lambda (key args)
- (display (get-output-string log-port))
+ (display/safe (get-output-string log-port))
(log-msg lgr 'ERROR
"delete-outputs: "
key args)
@@ -865,25 +873,30 @@ but the guix-daemon claims it's unavailable"
(raise-exception exn)))
#f)
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (unless (and (store-protocol-error? exn)
+ (let ((status (store-protocol-error-status exn)))
+ (or (= status 1)
+ (= status 100)
+ (= status 101))))
+ (simple-format/safe (current-error-port)
+ "exception when performing build: ~A\n"
+ exn)
+ (let* ((stack (make-stack #t))
+ (backtrace
+ (call-with-output-string
+ (lambda (port)
+ (display-backtrace stack port)
+ (newline port)))))
+ (display/safe backtrace)))
+ (raise-exception exn))
(lambda ()
(build-things store (list (derivation-file-name derivation)))
(for-each (lambda (output)
(add-temp-root store output))
(map derivation-output-path
- (map cdr (derivation-outputs derivation)))))
- (lambda (key . args)
- (unless (and (eq? key '%exception)
- (store-protocol-error? (car args))
- (let ((status (store-protocol-error-status
- (car args))))
- (or (= status 1)
- (= status 100)
- (= status 101))))
- (simple-format (current-error-port)
- "exception when performing build: ~A ~A\n"
- key args)
- (backtrace))))
+ (map cdr (derivation-outputs derivation))))))
#t)
#:unwind? #t)))
diff --git a/guix-build-coordinator/build-allocator.scm b/guix-build-coordinator/build-allocator.scm
index 588f213..8c08144 100644
--- a/guix-build-coordinator/build-allocator.scm
+++ b/guix-build-coordinator/build-allocator.scm
@@ -492,18 +492,6 @@
(let ((prioritised-builds
(datastore-fetch-prioritised-unprocessed-builds datastore)))
- (define systems-for-builds
- ;; TODO Should be one query
- (let ((table (make-hash-table)))
- (for-each (lambda (build-id)
- (hash-set! table
- build-id
- (datastore-find-build-derivation-system
- datastore
- build-id)))
- prioritised-builds)
- table))
-
(define tags-for-build
(let ((build-tags (make-hash-table)))
(lambda (build-id)
@@ -545,40 +533,41 @@
(else
(error "Unknown setup failure " failure-reason)))))
- (lambda (build-id)
- (log "build:" build-id)
- (and
- (or (null? requested-systems)
- (let ((build-system (hash-ref systems-for-builds build-id)))
- (member build-system requested-systems)))
- (agent-tags-match-build-tags agent-tags tags-for-build
- agent-id build-id)
- (let* ((setup-failures-for-build
- (or (hash-ref setup-failures-hash build-id)
- '()))
- (relevant-setup-failures
- (filter relevant-setup-failure?
- setup-failures-for-build)))
- (log "relevant setup failures:" relevant-setup-failures)
- (if (null? relevant-setup-failures)
- #t
- #f)))))
-
- (when metrics-registry
- (let ((counts
- (hash-fold
- (lambda (_ system result)
- `(,@(alist-delete system result)
- (,system . ,(+ 1 (or (assoc-ref result system) 0)))))
- '()
- systems-for-builds)))
- (for-each
- (match-lambda
- ((system . count)
- (metric-set allocator-considered-builds-metric
- count
- #:label-values `((system . ,system)))))
- counts)))
+ (match-lambda
+ (#(build-id build-system)
+ (log "build:" build-id)
+ (and
+ (or (null? requested-systems)
+ (member build-system requested-systems))
+ (agent-tags-match-build-tags agent-tags tags-for-build
+ agent-id build-id)
+ (let* ((setup-failures-for-build
+ (or (hash-ref setup-failures-hash build-id)
+ '()))
+ (relevant-setup-failures
+ (filter relevant-setup-failure?
+ setup-failures-for-build)))
+ (log "relevant setup failures:" relevant-setup-failures)
+ (if (null? relevant-setup-failures)
+ #t
+ #f))))))
+
+ ;; TODO Restore this in a more performant way
+ ;; (when metrics-registry
+ ;; (let ((counts
+ ;; (hash-fold
+ ;; (lambda (_ system result)
+ ;; `(,@(alist-delete system result)
+ ;; (,system . ,(+ 1 (or (assoc-ref result system) 0)))))
+ ;; '()
+ ;; systems-for-builds)))
+ ;; (for-each
+ ;; (match-lambda
+ ;; ((system . count)
+ ;; (metric-set allocator-considered-builds-metric
+ ;; count
+ ;; #:label-values `((system . ,system)))))
+ ;; counts)))
(let ((result
(map
@@ -589,21 +578,23 @@
(build-ids
(let loop ((count 0)
(build-ids '())
- (potential-build-ids prioritised-builds))
+ (potential-builds prioritised-builds))
(if (or (and planned-builds-for-agent-limit
(>= count planned-builds-for-agent-limit))
- (null? potential-build-ids))
+ (null? potential-builds))
(reverse build-ids) ;; highest priority last, so
;; reverse
- (let ((potential-build (first potential-build-ids)))
- (if (filter-proc potential-build)
+ (let ((potential-build-details (first potential-builds)))
+ (if (filter-proc potential-build-details)
(loop (+ 1 count)
- (cons potential-build
+ (cons (vector-ref
+ potential-build-details
+ 0)
build-ids)
- (cdr potential-build-ids))
+ (cdr potential-builds))
(loop count
build-ids
- (cdr potential-build-ids))))))))
+ (cdr potential-builds))))))))
(cons agent-id build-ids)))
(map (lambda (agent)
(assq-ref agent 'uuid))
diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm
index 47a289d..6ec6578 100644
--- a/guix-build-coordinator/client-communication.scm
+++ b/guix-build-coordinator/client-communication.scm
@@ -32,6 +32,10 @@
#:use-module (json)
#:use-module (logging logger)
#:use-module (gcrypt random)
+ #:use-module (knots)
+ #:use-module (knots timeout)
+ #:use-module (knots web-server)
+ #:use-module (knots thread-pool)
#:use-module (prometheus)
#:use-module (web uri)
#:use-module (web client)
@@ -67,9 +71,9 @@
host
port
build-coordinator
- utility-thread-pool-channel)
- (run-server/patched
- (lambda (request body)
+ utility-thread-pool)
+ (run-knots-web-server
+ (lambda (request)
(log-msg (build-coordinator-logger build-coordinator)
'INFO
(format #f "~4a ~a\n"
@@ -80,10 +84,10 @@
(cons (request-method request)
(split-and-decode-uri-path
(uri-path (request-uri request))))
- body
+ (read-request-body request)
secret-key-base
build-coordinator
- utility-thread-pool-channel)))
+ utility-thread-pool)))
#:host host
#:port port))
@@ -92,7 +96,7 @@
raw-body
secret-key-base
build-coordinator
- utility-thread-pool-channel)
+ utility-thread-pool)
(define datastore
(build-coordinator-datastore build-coordinator))
@@ -414,6 +418,14 @@
(or (and=> (assq-ref query-parameters 'priority_lt)
string->number)
'unset)
+ #:created-at->
+ (or (and=> (assq-ref query-parameters 'created_at_gt)
+ datastore-validate-datetime-string)
+ 'unset)
+ #:created-at-<
+ (or (and=> (assq-ref query-parameters 'created_at_lt)
+ datastore-validate-datetime-string)
+ 'unset)
#:relationship
(or (and=> (assq-ref query-parameters 'relationship)
string->symbol)
@@ -490,16 +502,25 @@
(store-path-base derivation-file))
(define (read-drv/substitute derivation-file)
- (with-store/non-blocking store
- (unless (valid-path? store derivation-file)
- (substitute-derivation store
- derivation-file
- #:substitute-urls substitute-urls)))
+ (call-with-delay-logging
+ (lambda ()
+ (with-store/non-blocking store
+ (unless (valid-path? store derivation-file)
+ (with-port-timeouts
+ (lambda ()
+ (substitute-derivation store
+ derivation-file
+ #:substitute-urls substitute-urls))
+ #:timeout 60)))))
;; Read the derivation in a thread to avoid blocking fibers
- (call-with-worker-thread
- utility-thread-pool-channel
+ (call-with-thread
+ utility-thread-pool
(lambda ()
- (read-derivation-from-file* derivation-file))))
+ (read-derivation-from-file* derivation-file))
+ #:duration-logger
+ (lambda (duration)
+ (log-delay read-derivation-from-file*
+ duration))))
(let ((submit-build-result
(call-with-delay-logging
@@ -508,29 +529,36 @@
`(,build-coordinator
,derivation-file
#:read-drv
- ,(lambda (derivation-file)
- (with-exception-handler
- (lambda (exn)
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'WARN
- "exception substituting derivation " derivation-file
- ": " exn)
- (raise-exception exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (retry-on-error
+ ,(let ((memoized-drv #f))
+ (lambda (derivation-file)
+ (or
+ memoized-drv
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg
+ (build-coordinator-logger build-coordinator)
+ 'WARN
+ "exception substituting derivation " derivation-file
+ ": " exn)
+ (raise-exception exn))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
(lambda ()
- (read-drv/substitute derivation-file))
- #:times 2
- #:delay 3
- #:error-hook
- (lambda _
- (metric-increment read-drv-error-count-metric))))
- (lambda args
- (backtrace))))
- #:unwind? #t))
+ (let ((result
+ (retry-on-error
+ (lambda ()
+ (read-drv/substitute derivation-file))
+ #:times 2
+ #:delay 3
+ #:error-hook
+ (lambda _
+ (metric-increment read-drv-error-count-metric)))))
+ (set! memoized-drv result)
+ result))))
+ #:unwind? #t))))
,@(let ((priority (assoc-ref body "priority")))
(if priority
`(#:priority ,priority)
@@ -622,7 +650,9 @@
(datastore-list-agent-builds
datastore
(assq-ref agent-details 'uuid))))))))
- (datastore-list-agents datastore))))))))))
+ (datastore-list-agents datastore)))))))
+ #:priority 'high
+ #:duration-metric-name "get_state")))
(('GET "events")
(let ((headers (request-headers request)))
(list (build-response
@@ -667,7 +697,7 @@
(render-json
`((error . ,(client-error-details exn)))
#:code 400))
- ((worker-thread-timeout-error? exn)
+ ((thread-pool-timeout-error? exn)
(render-json
`((error . ,(simple-format #f "~A" exn)))
#:code 503))
@@ -676,20 +706,18 @@
`((error . 500))
#:code 500))))
(lambda ()
- (with-throw-handler #t
- controller-thunk
- (lambda (key . args)
- (unless (and (eq? '%exception key)
- (or
- (worker-thread-timeout-error? (car args))
- (client-error? (car args))))
+ (with-exception-handler
+ (lambda (exn)
+ (unless (or
+ (thread-pool-timeout-error? exn)
+ (client-error? exn))
(match method-and-path-components
((method path-components ...)
(simple-format
(current-error-port)
- "error: when processing client request: /~A ~A\n ~A ~A\n"
+ "error: when processing client request: /~A ~A\n ~A\n"
method (string-join path-components "/")
- key args)))
+ exn)))
(let* ((stack (make-stack #t 4))
(backtrace
@@ -701,7 +729,9 @@
(newline port)))))
(display
backtrace
- (current-error-port)))))))
+ (current-error-port))))
+ (raise-exception exn))
+ controller-thunk))
#:unwind? #t))
(define* (render-json json #:key (extra-headers '())
@@ -871,6 +901,8 @@
(canceled 'unset)
(priority-> 'unset)
(priority-< 'unset)
+ (created-at-> 'unset)
+ (created-at-< 'unset)
(relationship 'unset)
(after-id #f)
(limit #f))
@@ -919,6 +951,12 @@
,@(if (number? priority-<)
(list (simple-format #f "priority_lt=~A" priority-<))
'())
+ ,@(if (string? created-at->)
+ (list (simple-format #f "created_at_gt=~A" created-at->))
+ '())
+ ,@(if (string? created-at-<)
+ (list (simple-format #f "created_at_lt=~A" created-at-<))
+ '())
,@(if (and relationship (not (eq? 'unset relationship)))
(list (simple-format #f "relationship=~A" relationship))
'())
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index a7fb664..adb7575 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -32,12 +32,16 @@
#:use-module (ice-9 match)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 binary-ports)
+ #:use-module (ice-9 suspendable-ports)
#:use-module (ice-9 format)
#:use-module (ice-9 atomic)
#:use-module (ice-9 control)
#:use-module (ice-9 threads)
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
+ #:use-module (system repl server)
+ #:use-module (system repl command)
+ #:use-module (system repl debug)
#:use-module (web uri)
#:use-module (web http)
#:use-module (oop goops)
@@ -45,13 +49,18 @@
#:use-module (logging port-log)
#:use-module (gcrypt random)
#:use-module (fibers)
+ #:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers conditions)
+ #:use-module (fibers operations)
+ #:use-module (knots)
+ #:use-module (knots timeout)
+ #:use-module (knots parallelism)
+ #:use-module (knots thread-pool)
#:use-module (prometheus)
- #:use-module ((guix build syscalls)
- #:select (set-thread-name))
#:use-module (guix store)
+ #:use-module (guix progress)
#:use-module (guix derivations)
#:use-module (guix build utils)
#:use-module (guix-build-coordinator utils)
@@ -68,6 +77,8 @@
client-error?
client-error-details
+ %build-coordinator
+
make-build-coordinator
build-coordinator-datastore
build-coordinator-hooks
@@ -99,6 +110,8 @@
build-coordinator-prompt-hook-processing-for-event
start-hook-processing-threads
+ build-coordinator-update-metrics
+
build-coordinator-allocation-plan-stats
build-coordinator-trigger-build-allocation
build-coordinator-list-allocation-plan-builds
@@ -108,7 +121,9 @@
build-log-file-location
handle-build-start-report
handle-build-result
- handle-setup-failure-report))
+ handle-setup-failure-report
+
+ build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built))
(define-exception-type &agent-error &error
make-agent-error
@@ -120,6 +135,9 @@
client-error?
(details client-error-details))
+(define %build-coordinator
+ (make-parameter #f))
+
(define-record-type <build-coordinator>
(make-build-coordinator-record datastore hooks metrics-registry
allocation-strategy allocator-channel
@@ -129,6 +147,9 @@
(hooks build-coordinator-hooks)
(hook-condvars build-coordinator-hook-condvars
set-build-coordinator-hook-condvars!)
+ (background-job-conditions
+ build-coordinator-background-job-conditions
+ set-build-coordinator-background-job-conditions!)
(metrics-registry build-coordinator-metrics-registry)
(allocation-strategy build-coordinator-allocation-strategy)
(trigger-build-allocation
@@ -141,30 +162,17 @@
(get-state-id build-coordinator-get-state-id-proc
set-build-coordinator-get-state-id-proc!)
(scheduler build-coordinator-scheduler
- set-build-coordinator-scheduler!))
+ set-build-coordinator-scheduler!)
+ (utility-thread-pool build-coordinator-utility-thread-pool
+ set-build-coordinator-utility-thread-pool!))
(set-record-type-printer!
<build-coordinator>
(lambda (build-coordinator port)
(display "#<build-coordinator>" port)))
-(define-class <custom-port-log> (<log-handler>)
- (port #:init-value #f #:accessor port #:init-keyword #:port))
-
-(define-method (emit-log (self <custom-port-log>) str)
- (when (port self)
- (put-bytevector (port self)
- (string->utf8 str))
- ;; Even though the port is line buffered, writing to it with
- ;; put-bytevector doesn't cause the buffer to be flushed.
- (force-output (port self))))
-
-(define-method (flush-log (self <custom-port-log>))
- (and=> (port self) force-output))
-
-(define-method (close-log! (self <custom-port-log>))
- (and=> (port self) close-port)
- (set! (port self) #f))
+(define %command-duration-histogram-buckets
+ (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf)))
(define %known-hooks
'(build-submitted
@@ -210,7 +218,15 @@
#f)
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (if (and
+ (exception-with-origin? exn)
+ (string=? (exception-origin exn)
+ "fport_write"))
+ #f
+ (print-backtrace-and-exception/knots exn))
+ (raise-exception exn))
(lambda ()
(match (atomic-box-ref
current-state-id-and-event-buffer-index-box)
@@ -234,16 +250,7 @@
(iota event-count-to-send
(+ 1 last-sent-state-id))))
- current-state-id)))
- (lambda (key . args)
- (if (and
- (eq? key 'system-error)
- (match args
- (("fport_write" "~A" ("Broken pipe") rest ...)
- #t)
- (_ #f)))
- #f
- (backtrace)))))
+ current-state-id)))))
#:unwind? #t)))
(unless (eq? #f new-state-id)
@@ -285,7 +292,8 @@
(if (> requested-after-state-id
current-state-id)
current-state-id
- requested-after-state-id)
+ (max 0
+ requested-after-state-id))
current-state-id)))))
(atomic-box-set!
listener-channels-box
@@ -355,6 +363,27 @@
(define (build-coordinator-get-state-id build-coordinator)
((build-coordinator-get-state-id-proc build-coordinator)))
+(define (build-coordinator-update-metrics build-coordinator)
+ (define metrics-registry
+ (build-coordinator-metrics-registry build-coordinator))
+
+ (let ((utility-thread-pool-used-thread-metric
+ (or (metrics-registry-fetch-metric
+ metrics-registry
+ "utility_thread_pool_used_thread_total")
+ (make-gauge-metric
+ metrics-registry
+ "utility_thread_pool_used_thread_total"))))
+
+ (and=> (build-coordinator-utility-thread-pool build-coordinator)
+ (lambda (utility-thread-pool)
+ (metric-set
+ utility-thread-pool-used-thread-metric
+ (vector-count
+ (lambda (_ proc)
+ (->bool proc))
+ (thread-pool-proc-vector utility-thread-pool)))))))
+
(define* (make-build-coordinator
#:key
database-uri-string
@@ -366,7 +395,7 @@
(database-uri->datastore
database-uri-string
#:metrics-registry metrics-registry
- #:worker-thread-log-exception?
+ #:thread-pool-log-exception?
(lambda (exn)
(and (not (agent-error? exn))
(not (client-error? exn))))))
@@ -443,6 +472,16 @@
(setrlimit 'core #f #f))
#:unwind? #t)
+ (let ((core-file
+ (string-append (getcwd) "/core"))
+ (metric
+ (make-gauge-metric (build-coordinator-metrics-registry
+ build-coordinator)
+ "core_dump_file_last_modified_seconds")))
+ (when (file-exists? core-file)
+ (metric-set metric
+ (stat:mtime (stat core-file)))))
+
(with-exception-handler
(lambda (exn)
(simple-format #t "failed increasing open file limit: ~A\n" exn))
@@ -457,10 +496,30 @@
(lambda (scheduler port)
(display "#<scheduler>" port)))
- (when pid-file
- (call-with-output-file pid-file
- (lambda (port)
- (simple-format port "~A\n" (getpid)))))
+ (call-with-new-thread
+ (lambda ()
+ (set-thread-name
+ (string-append "gc watcher"))
+
+ (add-hook!
+ after-gc-hook
+ (let ((last-gc-time-taken
+ (assq-ref (gc-stats) 'gc-time-taken)))
+ (lambda ()
+ (let* ((gc-time-taken
+ (assq-ref (gc-stats) 'gc-time-taken))
+ (time-since-last
+ (/ (- gc-time-taken
+ last-gc-time-taken)
+ internal-time-units-per-second)))
+ (when (> time-since-last 0.1)
+ (format (current-error-port)
+ "after gc (additional time taken: ~f)\n"
+ time-since-last))
+ (set! last-gc-time-taken
+ (assq-ref (gc-stats) 'gc-time-taken))))))
+ (while #t
+ (sleep 0.1))))
(when update-datastore?
(datastore-update (build-coordinator-datastore build-coordinator)))
@@ -469,17 +528,17 @@
build-coordinator
(make-build-allocator-thread build-coordinator))
- (set-build-coordinator-hook-condvars!
- build-coordinator
- (start-hook-processing-threads build-coordinator
- parallel-hooks))
-
(when trigger-build-allocation?
(trigger-build-allocation build-coordinator)))
(define %default-agent-uri (string->uri "http://0.0.0.0:8745"))
(define %default-client-uri (string->uri "http://127.0.0.1:8746"))
+(define %default-repl-server-port
+ ;; Default port to run REPL server on, if --listen-repl is provided
+ ;; but no port is mentioned
+ 37146)
+
(define* (run-coordinator-service build-coordinator
#:key
(update-datastore? #t)
@@ -487,7 +546,10 @@
(agent-communication-uri %default-agent-uri)
(client-communication-uri %default-client-uri)
secret-key-base
- (parallel-hooks '()))
+ (parallel-hooks '())
+ listen-repl)
+ (install-suspendable-ports!)
+
(with-fluids ((%file-port-name-canonicalization 'none))
(perform-coordinator-service-startup
build-coordinator
@@ -495,23 +557,45 @@
#:pid-file pid-file
#:parallel-hooks parallel-hooks)
+ (when listen-repl
+ (parameterize ((%build-coordinator build-coordinator))
+ (cond
+ ((or (eq? #t listen-repl)
+ (number? listen-repl))
+ (let ((listen-repl
+ (if (eq? #t listen-repl)
+ %default-repl-server-port
+ listen-repl)))
+ (format (current-error-port)
+ "REPL server listening on port ~a~%"
+ listen-repl)
+ (spawn-server (make-tcp-server-socket
+ #:port
+ (if (eq? #t listen-repl)
+ %default-repl-server-port
+ listen-repl)))))
+ (else
+ (format (current-error-port)
+ "REPL server listening on ~a~%"
+ listen-repl)
+ (spawn-server (make-unix-domain-server-socket #:path listen-repl))))))
+
;; Create some worker thread channels, which need to be created prior
;; to run-fibers being called.
(let ((output-hash-channel
(make-output-hash-channel
build-coordinator))
- (utility-thread-pool-channel
- (make-worker-thread-channel
- (const '())
+ (utility-thread-pool
+ (make-thread-pool
+ 18
#:name "utility"
- #:parallelism 10
#:delay-logger
(let ((delay-metric
(make-histogram-metric
(build-coordinator-metrics-registry build-coordinator)
"utility_thread_pool_delay_seconds")))
- (lambda (seconds-delayed)
+ (lambda (seconds-delayed proc)
(log-delay "utility thread channel"
seconds-delayed)
(metric-observe delay-metric seconds-delayed)
@@ -521,6 +605,18 @@
"warning: utility thread channel delayed by ~1,2f seconds~%"
seconds-delayed)))))))
+ (let ((metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "utility_thread_pool_thread_total")))
+ (metric-set metric
+ (vector-length
+ (thread-pool-proc-vector utility-thread-pool))))
+
+ (set-build-coordinator-utility-thread-pool!
+ build-coordinator
+ utility-thread-pool)
+
(let ((finished? (make-condition)))
(call-with-sigint
(lambda ()
@@ -542,6 +638,9 @@
(iota (length schedulers))
schedulers))
+ (set-build-coordinator-scheduler! build-coordinator
+ (current-scheduler))
+
(log-msg (build-coordinator-logger build-coordinator)
'INFO
"initialising metrics")
@@ -553,13 +652,19 @@
(datastore-spawn-fibers
(build-coordinator-datastore build-coordinator))
+ (set-build-coordinator-hook-condvars!
+ build-coordinator
+ (start-hook-processing-threads build-coordinator
+ parallel-hooks))
+
+ (set-build-coordinator-background-job-conditions!
+ build-coordinator
+ (start-background-job-processing-fibers build-coordinator))
+
(spawn-fiber-to-watch-for-deferred-builds build-coordinator)
(spawn-build-allocation-plan-management-fiber build-coordinator)
- (set-build-coordinator-scheduler! build-coordinator
- (current-scheduler))
-
(let ((events-channel
get-state-id
(make-events-channel
@@ -591,7 +696,12 @@
(uri-host client-communication-uri)
(uri-port client-communication-uri)
build-coordinator
- utility-thread-pool-channel)
+ utility-thread-pool)
+
+ (when pid-file
+ (call-with-output-file pid-file
+ (lambda (port)
+ (simple-format port "~A\n" (getpid)))))
;; Guile seems to just stop listening on ports, so try to
;; monitor that internally and just quit if it happens
@@ -650,12 +760,9 @@
derivation-file)))
(if (eq? #f system) ; derivation does not exist in database?
(build-for-output-already-exists/with-derivation?
- (with-fibers-port-timeouts
- (lambda ()
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file)))
- #:timeout 240))
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
(any
(lambda (output-details)
(let ((builds-for-output
@@ -780,12 +887,9 @@
;; derivations with no builds works
(if (datastore-find-derivation datastore derivation-file)
#f
- (with-fibers-port-timeouts
- (lambda ()
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file)))
- #:timeout 30))))
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))))
(when drv
(datastore-store-derivation datastore drv))
@@ -806,8 +910,9 @@
;; time too.
(cons related-drv (random-v4-uuid)))
related-derivations-lacking-builds))
- #:duration-metric-name
- "store_build")
+ #:priority 'low
+ #:duration-metric-name "store_build"
+ #:duration-metric-buckets %command-duration-histogram-buckets)
(#t ; build submitted
(build-coordinator-prompt-hook-processing-for-event
build-coordinator
@@ -837,7 +942,8 @@
(stop-condition
stop-condition)))))
(stop-condition
- stop-condition)))))
+ stop-condition)))
+ #:buckets %command-duration-histogram-buckets))
(define* (cancel-build build-coordinator uuid
#:key (ignore-if-build-required-by-another? #t)
@@ -850,6 +956,10 @@
datastore
(lambda (db)
(let ((build-details (datastore-find-build datastore uuid)))
+ (unless build-details
+ (raise-exception
+ (make-client-error 'build-unknown)))
+
(when (assq-ref build-details 'canceled)
(raise-exception
(make-client-error 'build-already-canceled)))
@@ -872,7 +982,10 @@
(datastore-insert-unprocessed-hook-event datastore
"build-canceled"
(list uuid))
- 'build-canceled))))
+ 'build-canceled)
+ #:priority 'low
+ #:duration-metric-name "cancel_build"
+ #:duration-metric-buckets %command-duration-histogram-buckets)))
(when (eq? val 'build-canceled)
(unless skip-updating-derived-priorities?
@@ -893,6 +1006,10 @@
val))
+ (unless (datastore-find-build datastore uuid)
+ (raise-exception
+ (make-client-error 'build-unknown)))
+
(if ignore-if-build-required-by-another?
(let ((build-required
;; Do this check here outside the transaction to avoid having to
@@ -915,6 +1032,10 @@
datastore
(lambda (db)
(let ((build-details (datastore-find-build datastore uuid)))
+ (unless build-details
+ (raise-exception
+ (make-client-error 'build-unknown)))
+
(when (assq-ref build-details 'canceled)
(raise-exception
(make-client-error 'build-already-canceled)))
@@ -929,7 +1050,10 @@
new-priority
#:skip-updating-derived-priorities?
skip-updating-derived-priorities?
- #:override-derived-priority override-derived-priority)))
+ #:override-derived-priority override-derived-priority))
+ #:priority 'low
+ #:duration-metric-name "update_build_priority"
+ #:duration-metric-buckets %command-duration-histogram-buckets)
(trigger-build-allocation build-coordinator)
@@ -1012,7 +1136,17 @@
(and=> (assoc-ref (build-coordinator-hook-condvars build-coordinator)
event-name)
(lambda (condvar)
- (signal-condition-variable condvar)
+ (cond
+ ((condition-variable? condvar)
+ (signal-condition-variable condvar))
+ ((reusable-condition? condvar)
+ (signal-reusable-condition!
+ condvar
+ (build-coordinator-scheduler build-coordinator)))
+ (else
+ (error
+ (simple-format #f "unrecognised condvar ~A"
+ condvar))))
#t)))
(define (update-build-allocation-plan build-coordinator)
@@ -1074,24 +1208,19 @@
(lambda ()
(with-exception-handler
(lambda (exn)
- (simple-format
- (current-error-port)
- "build-allocator-thread: exception: ~A\n"
- exn)
(metric-increment failure-counter-metric)
(atomic-box-set! allocation-needed #t))
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "error in build allocator thread\n")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
(lambda ()
(update-build-allocation-plan build-coordinator)
- (metric-increment success-counter-metric))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "error in build allocator thread: ~A ~A\n"
- key
- args)
- (backtrace))))
+ (metric-increment success-counter-metric))))
#:unwind? #t))
#:buckets ((@@ (prometheus) exponential-histogram-buckets) ; TODO
#:start 1
@@ -1139,12 +1268,14 @@
(lambda ()
(while #t
(with-exception-handler
- (lambda (exn)
- (simple-format (current-error-port)
- "exception in allocation plan fiber: ~A\n"
- exn))
+ (lambda _ #f)
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "exception in allocation plan fiber\n")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
(lambda ()
(match (get-message (build-coordinator-allocator-channel coordinator))
(('stats reply)
@@ -1187,9 +1318,7 @@
(update-build-allocation-plan-metrics!)
- (put-message reply #t))))
- (lambda _
- (backtrace))))
+ (put-message reply #t))))))
#:unwind? #t)))))
(define (build-coordinator-allocation-plan-stats coordinator)
@@ -1198,6 +1327,11 @@
(list 'stats reply))
(get-message reply)))
+(define (build-coordinator-count-allocation-plan-builds coordinator agent-id)
+ (or (assoc-ref (build-coordinator-allocation-plan-stats coordinator)
+ agent-id)
+ 0))
+
(define (build-coordinator-fetch-agent-allocation-plan coordinator agent-id)
(let ((reply (make-channel)))
(put-message (build-coordinator-allocator-channel coordinator)
@@ -1275,7 +1409,8 @@
(define* (build-coordinator-list-allocation-plan-builds coordinator
agent-id
- #:key limit)
+ #:key limit
+ filter?)
(define (take* lst i)
(if (< (length lst) i)
lst
@@ -1284,31 +1419,51 @@
(define datastore
(build-coordinator-datastore coordinator))
+ (define (build-data uuid
+ derivation-name
+ derived-priority
+ build-details)
+ `((uuid . ,uuid)
+ (derivation_name . ,derivation-name)
+ (system . ,(datastore-find-build-derivation-system
+ datastore
+ uuid))
+ (priority . ,(assq-ref build-details 'priority))
+ (derived_priority . ,derived-priority)
+ (tags . ,(vector-map
+ (lambda (_ tag)
+ (match tag
+ ((key . value)
+ `((key . ,key)
+ (value . ,value)))))
+ (datastore-fetch-build-tags
+ datastore
+ uuid)))))
+
(let ((build-ids
(build-coordinator-fetch-agent-allocation-plan coordinator
agent-id)))
(filter-map
(lambda (build-id)
- (match (datastore-fetch-build-to-allocate datastore build-id)
- (#(uuid derivation_id derivation_name derived_priority)
- (let ((build-details (datastore-find-build datastore uuid)))
- `((uuid . ,uuid)
- (derivation_name . ,derivation_name)
- (system . ,(datastore-find-build-derivation-system
- datastore
- uuid))
- (priority . ,(assq-ref build-details 'priority))
- (derived_priority . ,derived_priority)
- (tags . ,(vector-map
- (lambda (_ tag)
- (match tag
- ((key . value)
- `((key . ,key)
- (value . ,value)))))
- (datastore-fetch-build-tags
- datastore
- uuid))))))
- (#f #f)))
+ (if filter?
+ (match (datastore-fetch-build-to-allocate datastore build-id)
+ (#(uuid derivation_id derivation_name derived_priority)
+ (let ((build-details (datastore-find-build datastore uuid)))
+ (build-data uuid
+ derivation_name
+ derived_priority
+ build-details)))
+ (#f #f))
+ (let ((build-details (datastore-find-build datastore build-id))
+ (unprocessed-builds-entry
+ (datastore-find-unprocessed-build-entry
+ datastore
+ build-id)))
+ (build-data build-id
+ (assq-ref build-details 'derivation-name)
+ (assq-ref unprocessed-builds-entry
+ 'derived-priority)
+ build-details))))
(if limit
(take* build-ids limit)
build-ids))))
@@ -1359,6 +1514,18 @@
"hook_failure_total"
#:labels '(event)))
+ (define process-events-thread-pool-thread-total-metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "hook_thread_pool_thread_total"
+ #:labels '(event)))
+
+ (define process-events-thread-pool-used-thread-total-metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "hook_thread_pool_used_thread_total"
+ #:labels '(event)))
+
(define (process-event id event arguments handler)
(log-msg (build-coordinator-logger build-coordinator)
'DEBUG
@@ -1367,10 +1534,6 @@
(and
(with-exception-handler
(lambda (exn)
- (log-msg (build-coordinator-logger build-coordinator)
- 'ERROR
- "error running " event " (" id ") hook: "
- exn)
(metric-increment failure-counter-metric
#:label-values
`((event . ,event)))
@@ -1381,25 +1544,34 @@
(build-coordinator-metrics-registry build-coordinator)
"hook_duration_seconds"
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (let* ((stack
+ (match (fluid-ref %stacks)
+ ((stack-tag . prompt-tag)
+ (make-stack #t
+ 0 prompt-tag
+ 0 (and prompt-tag 1)))))
+ (backtrace
+ (call-with-output-string
+ (lambda (port)
+ (print-frames (stack->vector stack)
+ port
+ #:count (stack-length stack))
+ (print-exception
+ port
+ (stack-ref stack 4)
+ '%exception
+ (list exn))))))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'ERROR
+ "error running " event " (" id ") hook\n"
+ backtrace))
+ (raise-exception exn))
(lambda ()
(start-stack
- 'hook
- (apply handler build-coordinator arguments)))
- (lambda (key . args)
- (log-msg (build-coordinator-logger build-coordinator)
- 'ERROR
- "error running " event " (" id ") hook: "
- key " " args)
- (let* ((stack (make-stack #t 3))
- (backtrace
- (call-with-output-string
- (lambda (port)
- (display-backtrace stack port)
- (newline port)))))
- (display
- backtrace
- (current-output-port))))))
+ #t
+ (apply handler build-coordinator arguments)))))
#:labels '(event)
#:label-values `((event . ,event)))
#t)
@@ -1425,84 +1597,116 @@
(define (single-thread-process-events event-name handler)
(let ((mtx (make-mutex))
(condvar (make-condition-variable)))
- (call-with-new-thread
+ (call-with-default-io-waiters
(lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (symbol->string event-name)))
- (const #t))
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (symbol->string event-name)))
+ (const #t))
+
+ (lock-mutex mtx)
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (sleep 10))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg
+ (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "error in " event-name " hook processing thread")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
+ (lambda ()
+ (while #t
+ (match (datastore-list-unprocessed-hook-events datastore event-name 1)
+ (()
+ (wait-condition-variable condvar mtx))
+ (((id event arguments))
+ (process-event id event arguments handler)))))))
+ #:unwind? #t))))))
+ condvar))
- (lock-mutex mtx)
- (while #t
- (with-exception-handler
- (lambda (exn)
- (with-exception-handler
- (lambda _
- ;; Things are really going wrong if logging about
- ;; the hook processing thread crashing, also raises
- ;; an exception, so just try and sleep and hope
- ;; things go better next time
- (sleep 10))
- (lambda ()
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "hook processing thread " event-name
- " exception: " exn))
- #:unwind? #t)
- (sleep 10))
- (lambda ()
- (with-throw-handler #t
+ (define (thread-pool-process-events event-name handler thread-count)
+ (let ((thread-pool
+ (make-thread-pool
+ thread-count
+ #:name (simple-format #f "~A" event-name)))
+ (reusable-condition
+ (make-reusable-condition))
+ (coordination-channel
+ (make-channel)))
+
+ (metric-set process-events-thread-pool-thread-total-metric
+ thread-count
+ #:label-values `((event . ,event-name)))
+
+ (spawn-fiber
+ (lambda ()
+ (let loop ((running-ids '()))
+ (metric-set process-events-thread-pool-used-thread-total-metric
+ (vector-count
+ (lambda (_ proc)
+ (->bool proc))
+ (thread-pool-proc-vector thread-pool))
+ #:label-values `((event . ,event-name)))
+ (match (get-message coordination-channel)
+ (('process id event arguments)
+ (if (member id running-ids)
+ ;; Ignore already running jobs
+ (loop running-ids)
+ (begin
+ (spawn-fiber
+ (lambda ()
+ (call-with-thread
+ thread-pool
+ (lambda ()
+ (process-event id event arguments handler))
+ ;; TODO Make this the default through knots
+ #:timeout #f)
+ (put-message coordination-channel
+ (list 'finished id))))
+ (loop (cons id running-ids)))))
+ (('finished id)
+ (when (< (length running-ids)
+ (* 2 thread-count))
+ (signal-reusable-condition! reusable-condition))
+ (loop (delete id running-ids)))
+ (('count-running reply)
+ (let ((count (length running-ids)))
+ (spawn-fiber
(lambda ()
- (while #t
- (match (datastore-list-unprocessed-hook-events datastore event-name 1)
- (()
- (wait-condition-variable condvar mtx))
- (((id event arguments))
- (process-event id event arguments handler)))))
- (lambda (key . args)
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "error in " event-name " hook processing thread: " key " " args)
- (backtrace))))
- #:unwind? #t))))
- condvar))
+ (put-message reply count))))
+ (loop running-ids))))))
- (define (work-queue-process-events event-name handler thread-count)
- (let-values (((pool-mutex job-available count-threads list-jobs)
- (create-thread-pool
- (lambda ()
- (max
- 1
- (length
- (datastore-list-unprocessed-hook-events
- datastore
- event-name
- thread-count))))
- (lambda (running-jobs)
- (let* ((in-progress-ids
- (map car running-jobs))
- (potential-jobs
- (datastore-list-unprocessed-hook-events
- datastore
- event-name
- (+ 1 (length in-progress-ids))))
- (job
- (find
- (match-lambda
- ((id rest ...)
- (not (member id in-progress-ids))))
- potential-jobs)))
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'DEBUG
- event-name " work queue, got job " job)
- job))
- (lambda (id event arguments)
- (process-event id event arguments handler))
- #:name (symbol->string event-name))))
- job-available))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let ((count
+ (let ((channel (make-channel)))
+ (put-message coordination-channel
+ (list 'count-running channel))
+ (get-message channel))))
+ (when (< count (* 2 thread-count))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG "submitting batch of " event-name " hook events")
+ (for-each
+ (match-lambda
+ ((id event arguments)
+ (put-message coordination-channel
+ (list 'process id event arguments))))
+ (datastore-list-unprocessed-hook-events
+ datastore
+ event-name
+ (* 20 thread-count)))))
+ (reusable-condition-wait reusable-condition
+ #:timeout 60))))
+
+ reusable-condition))
(map
(match-lambda
@@ -1511,15 +1715,14 @@
(or (and=>
(assq-ref parallel-hooks event-name)
(lambda (thread-count)
- (work-queue-process-events event-name
- handler
- thread-count)))
+ (thread-pool-process-events event-name
+ handler
+ thread-count)))
(single-thread-process-events event-name
handler)))))
(build-coordinator-hooks build-coordinator)))
-(define (fetch-builds build-coordinator agent systems
- max-builds deprecated-requested-count)
+(define (fetch-builds build-coordinator agent systems max-builds)
(define datastore
(build-coordinator-datastore build-coordinator))
@@ -1528,10 +1731,14 @@
build-coordinator agent-id)))
(if build-details
(let ((build-id (assq-ref build-details 'uuid)))
- (datastore-insert-to-allocated-builds datastore agent-id (list build-id))
+ (datastore-insert-to-allocated-builds
+ datastore
+ agent-id
+ build-id)
(build-coordinator-remove-build-from-allocation-plan
build-coordinator build-id)
- build-details)
+ `(,@build-details
+ (submit_outputs . null)))
#f)))
(define (allocate-several-builds agent-id count)
@@ -1555,28 +1762,68 @@
(datastore-list-agent-builds datastore agent))
(start-count
(length initially-allocated-builds))
- (target-count (or max-builds
- (+ start-count
- deprecated-requested-count))))
+ (target-count max-builds))
(if (< start-count target-count)
(let ((new-builds
(allocate-several-builds agent
(- target-count start-count))))
- ;; Previously allocate builds just returned newly allocated
- ;; builds, but if max-builds is provided, return all the
- ;; builds. This means the agent can handle this in a idempotent
- ;; manor.
- (if max-builds
- (append initially-allocated-builds
- new-builds)
- new-builds))
- ;; Previously allocate builds just returned newly allocated builds,
- ;; but if max-builds is provided, return all the builds. This means
- ;; the agent can handle this in a idempotent manor.
- (if max-builds
- initially-allocated-builds
- '()))))
- #:duration-metric-name "allocate_builds_to_agent"))
+ (if (null? new-builds)
+ (values initially-allocated-builds
+ #f)
+ (values (append initially-allocated-builds
+ new-builds)
+ #t)))
+ (values initially-allocated-builds
+ #f))))
+ #:duration-metric-name "allocate_builds_to_agent"
+ #:duration-metric-buckets %command-duration-histogram-buckets))
+
+ (define (send-agent-builds-allocated-event builds)
+ (build-coordinator-send-event
+ build-coordinator
+ "agent-builds-allocated"
+ `((agent_id . ,agent)
+ (builds . ,(list->vector
+ (map
+ (lambda (build)
+ `(,@build
+ (tags
+ . ,(list->vector
+ (map
+ (match-lambda
+ ((key . value)
+ `((key . ,key)
+ (value . ,value))))
+ (vector->list
+ (datastore-fetch-build-tags
+ datastore
+ (assq-ref build 'uuid))))))))
+ builds))))))
+
+ (define (submit-outputs? build)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "build-submit-outputs hook raised exception")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
+ (lambda ()
+ (let ((hook-result
+ (call-with-delay-logging
+ (lambda ()
+ (build-submit-outputs-hook
+ build-coordinator
+ (assq-ref build 'uuid))))))
+ (if (boolean? hook-result)
+ hook-result
+ (begin
+ (log-msg
+ (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "build-submit-outputs hook returned non boolean: "
+ hook-result)
+ #t))))))
(call-with-duration-metric
(build-coordinator-metrics-registry build-coordinator)
@@ -1592,111 +1839,42 @@
(trigger-build-allocation build-coordinator)))
(let ((builds
- (get-builds)))
-
- (build-coordinator-send-event
- build-coordinator
- "agent-builds-allocated"
- `((agent_id . ,agent)
- (builds . ,(list->vector
- (map
- (lambda (build)
- `(,@build
- (tags
- . ,(list->vector
- (map
- (match-lambda
- ((key . value)
- `((key . ,key)
- (value . ,value))))
- (vector->list
- (datastore-fetch-build-tags
- datastore
- (assq-ref build 'uuid))))))))
- builds)))))
-
- (map (lambda (build)
- (define submit-outputs?
- (with-exception-handler
- (lambda (exn)
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook raised exception: "
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (let ((hook-result
- (call-with-delay-logging
- (lambda ()
- (build-submit-outputs-hook
- build-coordinator
- (assq-ref build 'uuid))))))
- (if (boolean? hook-result)
- hook-result
- (begin
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook returned non boolean: "
- hook-result)
- #t))))
- (lambda (key . args)
- (backtrace))))
- #:unwind? #t))
-
- `(,@build
- ;; TODO This needs reconsidering when things having been built in
- ;; the past doesn't necessarily mean they're still available.
- (submit_outputs . ,submit-outputs?)))
- builds)))))))
+ new-builds-allocated?
+ (if (= 0
+ (build-coordinator-count-allocation-plan-builds
+ build-coordinator
+ agent))
+ (values
+ (datastore-list-agent-builds datastore agent)
+ #f)
+ (get-builds))))
+
+ (when new-builds-allocated?
+ (send-agent-builds-allocated-event builds))
+
+ (map
+ (lambda (build)
+ (if (eq? 'null (assq-ref build 'submit_outputs))
+ (let ((submit-outputs? (submit-outputs? build)))
+ (datastore-update-allocated-build-submit-outputs
+ (build-coordinator-datastore build-coordinator)
+ (assq-ref build 'uuid)
+ submit-outputs?)
+
+ `(,@(alist-delete 'submit_outputs build)
+ (submit_outputs . ,submit-outputs?)))
+ build))
+ builds)))))))
(define (agent-details build-coordinator agent-id)
(define datastore
(build-coordinator-datastore build-coordinator))
- (define build-submit-outputs-hook
- (assq-ref (build-coordinator-hooks build-coordinator)
- 'build-submit-outputs))
-
- (define (submit-outputs? build)
- (with-exception-handler
- (lambda (exn)
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook raised exception: "
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (let ((hook-result
- (call-with-delay-logging
- (lambda ()
- (build-submit-outputs-hook
- build-coordinator
- (assq-ref build 'uuid))))))
- (if (boolean? hook-result)
- hook-result
- (begin
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook returned non boolean: "
- hook-result)
- #t))))
- (lambda (key . args)
- (backtrace))))
- #:unwind? #t))
-
(let ((agent (datastore-find-agent datastore agent-id))
(allocated-builds (datastore-list-agent-builds datastore agent-id)))
`(,@agent ; description
- (builds . ,(list->vector
- (map (lambda (build)
- `(,@build
- (submit_outputs . ,(submit-outputs? build))))
- allocated-builds))))))
+ (builds . ,(list->vector allocated-builds)))))
(define (build-data-location build-id )
(string-append (%config 'builds-dir) "/"
@@ -1845,10 +2023,11 @@
(list build-id))
(when success?
- (datastore-delete-relevant-outputs-from-unbuilt-outputs
+ (datastore-insert-background-job
datastore
- build-id)
- (datastore-update-unprocessed-builds-for-build-success
+ 'build-success
+ (list build-id))
+ (datastore-delete-relevant-outputs-from-unbuilt-outputs
datastore
build-id)
(datastore-store-output-metadata
@@ -1858,7 +2037,8 @@
(assoc-ref result-json "outputs"))))
#f))))
- #:duration-metric-name "store_build_result")))
+ #:duration-metric-name "store_build_result"
+ #:duration-metric-buckets %command-duration-histogram-buckets)))
(when exception
;; Raise the exception here to avoid aborting the transaction
(raise-exception exception)))
@@ -1872,6 +2052,11 @@
'build-success
'build-failure))
+ (when success?
+ (build-coordinator-trigger-background-job-processing
+ build-coordinator
+ 'build-success))
+
(build-coordinator-send-event
build-coordinator
(if success?
@@ -1884,7 +2069,8 @@
;; could change the allocation
(trigger-build-allocation build-coordinator)
- #t))))
+ #t))
+ #:buckets %command-duration-histogram-buckets))
(define (handle-build-start-report build-coordinator
agent-id
@@ -1903,7 +2089,8 @@
build-coordinator
'build-started
`((build_id . ,build-id)
- (agent_id . ,agent-id))))))
+ (agent_id . ,agent-id))))
+ #:buckets %command-duration-histogram-buckets))
(define (handle-setup-failure-report build-coordinator
agent-id build-id report-json)
@@ -1939,3 +2126,142 @@
;; Trigger build allocation, so that the allocator can handle this setup
;; failure
(trigger-build-allocation build-coordinator))
+
+(define (build-coordinator-trigger-background-job-processing
+ build-coordinator
+ type)
+ (let ((condition
+ (assq-ref (build-coordinator-background-job-conditions
+ build-coordinator)
+ type)))
+ (unless condition
+ (error
+ (simple-format #f "unknown condition ~A" type)))
+ (signal-reusable-condition! condition)))
+
+(define (start-background-job-processing-fibers build-coordinator)
+ (define %background-job-duration-histogram-buckets
+ (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf)))
+
+ (define* (start-job-fibers type proc #:key (parallelism 1))
+ (let ((coordination-channel
+ (make-channel))
+ (condition
+ (make-reusable-condition))
+ (process-in-fiber
+ (fiberize
+ (lambda args
+ (call-with-duration-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "coordinator_background_job_duration_seconds"
+ (lambda ()
+ (call-with-delay-logging proc #:args args))
+ #:labels '(name)
+ #:label-values `((name . ,type))
+ #:buckets %background-job-duration-histogram-buckets))
+ #:parallelism parallelism))
+ (job-exception-counter-metric
+ (make-counter-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "coordinator_background_job_failures_total"
+ #:labels '(name))))
+
+ (define (process id . args)
+ (spawn-fiber
+ (lambda ()
+ (let loop ((retry 0))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG
+ "processing " type " background job (id: "
+ id ", args: " args ", retry: " retry ")")
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'WARN
+ type " background job error (id: "
+ id "): " exn)
+ #f)
+ (lambda ()
+ (apply process-in-fiber args))
+ #:unwind? #t)))
+ (if success?
+ (begin
+ (datastore-delete-background-job
+ (build-coordinator-datastore build-coordinator)
+ id)
+ (put-message coordination-channel
+ (list 'job-finished id)))
+ (begin
+ (metric-increment job-exception-counter-metric
+ #:label-values `((name . ,type)))
+ (sleep 30)
+ (loop (+ 1 retry)))))))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let ((job-details
+ (datastore-select-background-jobs
+ (build-coordinator-datastore build-coordinator)
+ type
+ #:limit (* 2 parallelism))))
+
+ (unless (null? job-details)
+ (put-message coordination-channel
+ (list 'process job-details)))
+
+ (reusable-condition-wait condition
+ #:timeout 30)))))
+
+ (spawn-fiber
+ (lambda ()
+ (let loop ((running-job-ids '()))
+ (match (get-message coordination-channel)
+ (('process jobs)
+ (let* ((job-ids (map (lambda (job)
+ (assq-ref job 'id))
+ jobs))
+ (new-ids
+ (lset-difference = job-ids running-job-ids))
+ (jobs-to-start
+ (take new-ids
+ (min
+ (- parallelism
+ (length running-job-ids))
+ (length new-ids)))))
+ (for-each (lambda (job)
+ (apply process
+ (assq-ref job 'id)
+ (assq-ref job 'args)))
+ (filter
+ (lambda (job-details)
+ (member (assq-ref job-details 'id)
+ jobs-to-start))
+ jobs))
+ (loop (append running-job-ids
+ jobs-to-start))))
+ (('job-finished id)
+ ;; Maybe not very efficient, but should work
+ (signal-reusable-condition! condition)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG type
+ " background job " id
+ " finished successfully")
+ (loop (delete id running-job-ids)))))))
+
+ condition))
+
+ `((build-success . ,(start-job-fibers
+ 'build-success
+ (lambda (build-id)
+ (datastore-update-unprocessed-builds-for-build-success
+ (build-coordinator-datastore build-coordinator)
+ build-id))
+ #:parallelism 24))))
+
+(define (build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built
+ build-coordinator)
+ (datastore-check-and-correct-unprocessed-builds-all-inputs-built
+ (build-coordinator-datastore build-coordinator)
+ #:progress-reporter progress-reporter/bar))
diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm
index dc4fec6..5768630 100644
--- a/guix-build-coordinator/datastore.scm
+++ b/guix-build-coordinator/datastore.scm
@@ -6,7 +6,8 @@
#:use-module (guix-build-coordinator datastore postgresql)
#:duplicates (merge-generics)
#:export (database-uri->datastore
- datastore-find-build-output))
+ datastore-find-build-output
+ datastore-validate-datetime-string))
(re-export datastore-optimize)
(re-export datastore-spawn-fibers)
@@ -92,19 +93,24 @@
(re-export datastore-fetch-build-to-allocate)
(re-export datastore-check-if-derivation-conflicts?)
(re-export datastore-insert-to-allocated-builds)
+(re-export datastore-update-allocated-build-submit-outputs)
+(re-export datastore-insert-background-job)
+(re-export datastore-delete-background-job)
+(re-export datastore-select-background-jobs)
+(re-export datastore-check-and-correct-unprocessed-builds-all-inputs-built)
(define* (database-uri->datastore database
#:key
metrics-registry
- worker-thread-log-exception?)
+ thread-pool-log-exception?)
(cond
((string-prefix? "pg://" database)
(postgresql-datastore database))
((string-prefix? "sqlite://" database)
(sqlite-datastore database
#:metrics-registry metrics-registry
- #:worker-thread-log-exception?
- worker-thread-log-exception?))
+ #:thread-pool-log-exception?
+ thread-pool-log-exception?))
(else
(error
(simple-format #f "Unknown database ~A" database)))))
@@ -119,3 +125,6 @@
(assq-ref output 'output)
#f))
outputs)))
+
+(define (datastore-validate-datetime-string s)
+ (strftime "%F %T" (car (strptime "%F %T" s))))
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index ac04362..bb1d8e8 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -10,8 +10,11 @@
#:use-module (ice-9 exceptions)
#:use-module (sqlite3)
#:use-module (fibers)
+ #:use-module (knots parallelism)
+ #:use-module (knots thread-pool)
#:use-module (prometheus)
#:use-module (guix base16)
+ #:use-module (guix progress)
#:use-module (guix derivations)
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator utils fibers)
@@ -102,21 +105,31 @@
datastore-update-agent-requested-systems
datastore-fetch-build-to-allocate
datastore-check-if-derivation-conflicts?
- datastore-insert-to-allocated-builds))
+ datastore-insert-to-allocated-builds
+ datastore-update-allocated-build-submit-outputs
+ datastore-insert-background-job
+ datastore-delete-background-job
+ datastore-select-background-jobs
+ datastore-check-and-correct-unprocessed-builds-all-inputs-built))
+
+(define %transaction-duration-histogram-buckets
+ (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf)))
(define-class <sqlite-datastore> (<abstract-datastore>)
database-file
- worker-reader-thread-channel
- worker-reader-thread-proc-vector
- worker-writer-thread-channel
- worker-writer-thread-proc-vector
+ reader-thread-pool
+ writer-thread-pool
+ low-priority-writer-thread-channel
+ default-priority-writer-thread-channel
+ high-priority-writer-thread-channel
+ writer-thread-channel-queue-stats
metrics-registry)
(define* (sqlite-datastore database-uri
#:key
update-database?
metrics-registry
- worker-thread-log-exception?)
+ thread-pool-log-exception?)
(define database-file
(string-drop database-uri
(string-length "sqlite://")))
@@ -131,7 +144,8 @@
(let ((db (db-open database-file)))
(sqlite-exec db "PRAGMA journal_mode=WAL;")
(sqlite-exec db "PRAGMA optimize;")
- (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")
+ (with-time-logging "truncating the WAL"
+ (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);"))
(sqlite-close db))
(let ((datastore (make <sqlite-datastore>)))
@@ -139,9 +153,11 @@
(slot-set! datastore 'database-file database-file)
(slot-set! datastore 'metrics-registry metrics-registry)
- (let ((channel
- proc-vector
- (make-worker-thread-channel
+ (let ((writer-thread-pool
+ (make-thread-pool
+ ;; SQLite doesn't support parallel writes
+ 1
+ #:thread-initializer
(lambda ()
(let ((db
(db-open database-file)))
@@ -152,7 +168,7 @@
(list db)))
#:name "ds write"
- #:destructor
+ #:thread-destructor
(let ((writer-thread-destructor-counter
(make-gauge-metric metrics-registry
"datastore_writer_thread_close_total")))
@@ -164,16 +180,16 @@
(metric-increment writer-thread-destructor-counter)
(sqlite-close db)))
- #:lifetime 500
+ #:thread-lifetime 500
#:expire-on-exception? #t
- ;; SQLite doesn't support parallel writes
- #:parallelism 1
#:delay-logger (let ((delay-metric
(make-histogram-metric
metrics-registry
- "datastore_write_delay_seconds")))
- (lambda (seconds-delayed)
+ "datastore_write_delay_seconds"
+ #:buckets
+ %transaction-duration-histogram-buckets)))
+ (lambda (seconds-delayed proc)
(metric-observe delay-metric
;; TODO exact->inexact to work around
;; a bug in guile-prometheus where
@@ -182,30 +198,41 @@
(exact->inexact seconds-delayed))
(log-delay "datastore write" seconds-delayed)
(when (> seconds-delayed 1)
- (format
+ (format/safe
(current-error-port)
"warning: database write delayed by ~1,2f seconds~%"
seconds-delayed))))
#:duration-logger
(lambda (duration proc)
(when (> duration 10)
- (format
+ (format/safe
(current-error-port)
"warning: database write took ~1,2f seconds (~a)~%"
duration
proc)))
- #:log-exception? worker-thread-log-exception?)))
+ #:log-exception? thread-pool-log-exception?)))
(slot-set! datastore
- 'worker-writer-thread-channel
- channel)
+ 'writer-thread-pool
+ writer-thread-pool)
+ ;; This is changed in datastore-spawn-fibers
+ (slot-set! datastore
+ 'low-priority-writer-thread-channel
+ (thread-pool-channel writer-thread-pool))
+ (slot-set! datastore
+ 'default-priority-writer-thread-channel
+ (thread-pool-channel writer-thread-pool))
(slot-set! datastore
- 'worker-writer-thread-proc-vector
- proc-vector))
+ 'high-priority-writer-thread-channel
+ (thread-pool-channel writer-thread-pool)))
- (let ((channel
- proc-vector
- (make-worker-thread-channel
+ (let ((reader-thread-pool
+ (make-thread-pool
+ ;; Use a minimum of 8 and a maximum of 12 threads
+ (min (max (current-processor-count)
+ 8)
+ 12)
+ #:thread-initializer
(lambda ()
(let ((db
(db-open database-file #:write? #f)))
@@ -214,27 +241,24 @@
(sqlite-exec db "PRAGMA cache_size = -16000;")
(list db)))
- #:name "ds read"
- #:destructor
+ #:thread-destructor
(let ((reader-thread-destructor-counter
(make-gauge-metric metrics-registry
"datastore_reader_thread_close_total")))
(lambda (db)
(metric-increment reader-thread-destructor-counter)
(sqlite-close db)))
- #:lifetime 50000
+ #:name "ds read"
+ #:thread-lifetime 50000
#:expire-on-exception? #t
- ;; Use a minimum of 8 and a maximum of 16 threads
- #:parallelism
- (min (max (current-processor-count)
- 8)
- 16)
#:delay-logger (let ((delay-metric
(make-histogram-metric
metrics-registry
- "datastore_read_delay_seconds")))
- (lambda (seconds-delayed)
+ "datastore_read_delay_seconds"
+ #:buckets
+ %transaction-duration-histogram-buckets)))
+ (lambda (seconds-delayed proc)
(metric-observe delay-metric
;; TODO exact->inexact to work around
;; a bug in guile-prometheus where
@@ -243,28 +267,50 @@
(exact->inexact seconds-delayed))
(log-delay "datastore read" seconds-delayed)
(when (> seconds-delayed 1)
- (format
+ (format/safe
(current-error-port)
"warning: database read delayed by ~1,2f seconds~%"
seconds-delayed))))
#:duration-logger
(lambda (duration proc)
(when (> duration 30)
- (format
+ (format/safe
(current-error-port)
"warning: database read took ~1,2f seconds (~a)~%"
duration
proc)))
- #:log-exception? worker-thread-log-exception?)))
- (slot-set! datastore
- 'worker-reader-thread-channel
- channel)
+ #:log-exception? thread-pool-log-exception?)))
+
+ (let ((metric (make-gauge-metric
+ metrics-registry
+ "datastore_reader_threads_total")))
+ (metric-set metric
+ (vector-length
+ (thread-pool-proc-vector reader-thread-pool))))
+
+
(slot-set! datastore
- 'worker-reader-thread-proc-vector
- proc-vector))
+ 'reader-thread-pool
+ reader-thread-pool))
datastore))
+(define* (call-with-writer-thread
+ datastore
+ proc
+ #:key priority duration-logger)
+ (call-with-thread
+ (slot-ref datastore 'writer-thread-pool)
+ proc
+ #:duration-logger duration-logger
+ #:channel
+ (slot-ref datastore
+ (assq-ref
+ '((high . high-priority-writer-thread-channel)
+ (default . default-priority-writer-thread-channel)
+ (low . low-priority-writer-thread-channel))
+ (or priority 'default)))))
+
(define (sqlite-step-and-reset statement)
(let ((val (sqlite-step statement)))
(sqlite-reset statement)
@@ -305,12 +351,12 @@
(#(blocked? modified-page-count pages-moved-to-db)
(if (= blocked? 1)
(begin
- (simple-format
+ (simple-format/safe
(current-error-port)
"warning: wal checkpoint blocked\n")
#f)
(begin
- (simple-format
+ (simple-format/safe
(current-error-port)
"wal checkpoint completed (~A, ~A)\n"
modified-page-count
@@ -331,8 +377,8 @@ PRAGMA optimize;")
(define-method (datastore-optimize
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(db-optimize
db
@@ -343,22 +389,37 @@ PRAGMA optimize;")
(datastore <sqlite-datastore>))
;; Queue messages to the writer thread, so that they're handled in a first
;; come first served manor
- (slot-set!
- datastore
- 'worker-writer-thread-channel
- (make-queueing-channel
- (slot-ref datastore 'worker-writer-thread-channel)))
+ (let ((queues
+ queue-stats
+ (make-discrete-priority-queueing-channels
+ (thread-pool-channel
+ (slot-ref datastore 'writer-thread-pool))
+ 3)))
+ (match queues
+ ((high default low)
+ (slot-set! datastore 'high-priority-writer-thread-channel high)
+ (slot-set! datastore 'default-priority-writer-thread-channel default)
+ (slot-set! datastore 'low-priority-writer-thread-channel low)))
+ (slot-set! datastore
+ 'writer-thread-channel-queue-stats
+ queue-stats))
(spawn-fiber
(lambda ()
(while #t
(sleep 20)
- (vector-for-each
- (lambda (i proc)
- (simple-format (current-error-port)
- "reader thread ~A running: ~A\n"
- i proc))
- (slot-ref datastore 'worker-reader-thread-proc-vector)))))
+ (let ((procs
+ (vector->list
+ (thread-pool-proc-vector
+ (slot-ref datastore 'reader-thread-pool)))))
+ (when (every procedure? procs)
+ (for-each
+ (lambda (i proc)
+ (simple-format/safe (current-error-port)
+ "reader thread ~A running: ~A\n"
+ i proc))
+ (iota (length procs))
+ procs))))))
(spawn-fiber
(lambda ()
@@ -366,9 +427,9 @@ PRAGMA optimize;")
(sleep (* 60 10)) ; 10 minutes
(with-exception-handler
(lambda (exn)
- (simple-format (current-error-port)
- "exception when performing WAL checkpoint: ~A\n"
- exn))
+ (simple-format/safe (current-error-port)
+ "exception when performing WAL checkpoint: ~A\n"
+ exn))
(lambda ()
(with-time-logging
"performing regular database maintenance"
@@ -393,11 +454,18 @@ PRAGMA optimize;")
(let ((setup-failures-total
(make-gauge-metric registry
"setup_failures_total"
- #:labels '(agent_id reason))))
-
- (letpar& ((setup-failure-counts
- (with-time-logging "counting setup failures"
- (datastore-count-setup-failures datastore))))
+ #:labels '(agent_id reason)))
+ (background-jobs-inserted-total
+ (make-counter-metric registry
+ "coordinator_background_job_inserted_total"
+ #:labels '(name))))
+
+ (fibers-let ((setup-failure-counts
+ (with-time-logging "counting setup failures"
+ (datastore-count-setup-failures datastore)))
+ (background-job-counts
+ (with-time-logging "counting background jobs"
+ (datastore-count-background-jobs datastore))))
(for-each (match-lambda
(((agent-id reason) . count)
@@ -406,7 +474,14 @@ PRAGMA optimize;")
#:label-values
`((agent_id . ,agent-id)
(reason . ,reason)))))
- setup-failure-counts)))
+ setup-failure-counts)
+
+ (for-each (match-lambda
+ ((type . count)
+ (metric-increment background-jobs-inserted-total
+ #:by count
+ #:label-values `((name . ,type)))))
+ background-job-counts)))
#t)
(define-method (datastore-update-metrics!
@@ -439,12 +514,38 @@ PRAGMA optimize;")
"datastore_wal_bytes")
(make-gauge-metric
registry "datastore_wal_bytes"
- #:docstring "Size of the SQLite Write Ahead Log file"))))
-
- (letpar& ((build-counts
- (datastore-count-builds datastore))
- (build-result-counts
- (datastore-count-build-results datastore)))
+ #:docstring "Size of the SQLite Write Ahead Log file")))
+ (reader-threads-used-metric
+ (or (metrics-registry-fetch-metric registry
+ "datastore_reader_threads_used_total")
+ (make-gauge-metric
+ registry "datastore_reader_threads_used_total")))
+ (writer-queue-length-metric
+ (or (metrics-registry-fetch-metric registry
+ "datastore_writer_queue_total")
+ (make-gauge-metric
+ registry "datastore_writer_queue_total"
+ #:labels '(priority)))))
+
+ (metric-set reader-threads-used-metric
+ (vector-count
+ (lambda (_ proc)
+ (->bool proc))
+ (thread-pool-proc-vector
+ (slot-ref datastore 'reader-thread-pool))))
+
+ (for-each
+ (lambda (priority stats)
+ (metric-set writer-queue-length-metric
+ (assq-ref stats 'length)
+ #:label-values `((priority . ,priority))))
+ '(high default low)
+ ((slot-ref datastore 'writer-thread-channel-queue-stats)))
+
+ (fibers-let ((build-counts
+ (datastore-count-builds datastore))
+ (build-result-counts
+ (datastore-count-build-results datastore)))
(for-each (match-lambda
((system . count)
(metric-set builds-total
@@ -477,9 +578,10 @@ PRAGMA optimize;")
internal-time-units-per-second))
(apply values vals)))))
-(define (metric-observe-duration datastore
- thing
- duration-seconds)
+(define* (metric-observe-duration datastore
+ thing
+ duration-seconds
+ #:key (buckets %default-histogram-buckets))
(define registry (slot-ref datastore 'metrics-registry))
(define metric-name
(string-append "datastore_" thing "_duration_seconds"))
@@ -487,15 +589,25 @@ PRAGMA optimize;")
(let ((metric
(or (metrics-registry-fetch-metric registry metric-name)
(make-histogram-metric registry
- metric-name))))
+ metric-name
+ #:buckets buckets))))
(metric-observe metric duration-seconds)))
-(define (call-with-worker-thread/delay-logging channel proc)
- (call-with-worker-thread channel
+(define (call-with-thread/delay-logging thread-pool proc)
+ (call-with-thread thread-pool
+ proc
+ #:duration-logger
+ (lambda (duration)
+ (log-delay proc duration))))
+
+(define* (call-with-writer-thread/delay-logging datastore proc
+ #:key priority)
+ (call-with-writer-thread datastore
proc
#:duration-logger
(lambda (duration)
- (log-delay proc duration))))
+ (log-delay proc duration))
+ #:priority priority))
(define-exception-type &transaction-rollback-exception &exception
make-transaction-rollback-exception
@@ -509,21 +621,24 @@ PRAGMA optimize;")
#:key
readonly?
(immediate? (not readonly?))
- duration-metric-name)
+ priority
+ duration-metric-name
+ (duration-metric-buckets
+ %transaction-duration-histogram-buckets))
(define (run-proc-within-transaction db)
(define (attempt-begin)
(with-exception-handler
(lambda (exn)
(match (exception-args exn)
(('sqlite-exec 5 msg)
- (simple-format
+ (simple-format/safe
(current-error-port)
"warning: issue starting transaction (code: 5, proc: ~A): ~A\n"
proc msg)
#f)
(_
- (simple-format (current-error-port)
- "exception starting transaction\n")
+ (simple-format/safe (current-error-port)
+ "exception starting transaction\n")
(raise-exception exn))))
(lambda ()
(sqlite-exec db (if immediate?
@@ -537,14 +652,14 @@ PRAGMA optimize;")
(lambda (exn)
(match (exception-args exn)
(('sqlite-exec 5 msg)
- (simple-format
+ (simple-format/safe
(current-error-port)
"warning: attempt commit (code: 5, proc: ~A): ~A\n"
proc msg)
#f)
(_
- (simple-format (current-error-port)
- "exception committing transaction\n")
+ (simple-format/safe (current-error-port)
+ "exception committing transaction\n")
(raise-exception exn))))
(lambda ()
(sqlite-exec db "COMMIT TRANSACTION;")
@@ -552,63 +667,88 @@ PRAGMA optimize;")
#:unwind? #t))
(if (attempt-begin)
- (call-with-values
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (if (transaction-rollback-exception? exn)
- (begin
- (sqlite-exec db "ROLLBACK TRANSACTION;")
- (transaction-rollback-exception-return-value exn))
- (begin
- (simple-format (current-error-port)
- "error: sqlite rolling back transaction (~A)\n"
- exn)
- (sqlite-exec db "ROLLBACK TRANSACTION;")
- (raise-exception exn))))
+ (with-exception-handler
+ (lambda (exn)
+ (if (transaction-rollback-exception? exn)
+ (begin
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (transaction-rollback-exception-return-value exn))
+ (begin
+ (simple-format/safe
+ (current-error-port)
+ "error: sqlite rolling back transaction (~A)\n"
+ exn)
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (raise-exception exn))))
+ (lambda ()
+ (call-with-values
(lambda ()
- (parameterize ((%current-transaction-proc proc))
- (call-with-delay-logging proc #:args (list db))))
- #:unwind? #t))
- (lambda vals
- (let loop ((success? (attempt-commit)))
- (if success?
- (apply values vals)
- (loop (attempt-commit))))))
+ (with-exception-handler
+ (lambda (exn)
+ (unless (transaction-rollback-exception? exn)
+ (backtrace))
+ (raise-exception exn))
+ (lambda ()
+ (parameterize ((%current-transaction-proc proc)
+ ;; Set the arguments parameter for the
+ ;; reader thread pool so that any nested
+ ;; calls to call-with-thread for the
+ ;; reader thread pool just use the writer
+ ;; db connection and thus this
+ ;; transaction
+ ((thread-pool-arguments-parameter
+ (slot-ref datastore 'reader-thread-pool))
+ (list db)))
+ (call-with-delay-logging proc #:args (list db))))))
+ (lambda vals
+ (let loop ((success? (attempt-commit)))
+ (if success?
+ (apply values vals)
+ (loop (attempt-commit)))))))
+ #:unwind? #t)
;; Database is busy, so retry
(run-proc-within-transaction db)))
- (call-with-worker-thread
+ (call-with-thread
(slot-ref datastore (if readonly?
- 'worker-reader-thread-channel
- 'worker-writer-thread-channel))
+ 'reader-thread-pool
+ 'writer-thread-pool))
(lambda (db)
(if (%current-transaction-proc)
(call-with-delay-logging proc #:args (list db)) ; already in transaction
(run-proc-within-transaction db)))
+ #:channel
+ (if readonly?
+ (thread-pool-channel
+ (slot-ref datastore 'reader-thread-pool))
+ (slot-ref datastore
+ (assq-ref
+ '((high . high-priority-writer-thread-channel)
+ (default . default-priority-writer-thread-channel)
+ (low . low-priority-writer-thread-channel))
+ (or priority 'default))))
#:duration-logger
(lambda (duration-seconds)
(when (and (not readonly?)
(> duration-seconds 2))
- (display
- (format
- #f
- "warning: ~a:\n took ~4f seconds in transaction\n"
- proc
- duration-seconds)
- (current-error-port))
-
- (when duration-metric-name
- (metric-observe-duration datastore
- duration-metric-name
- duration-seconds))))))
+ (format/safe
+ (current-error-port)
+ "warning: ~a:\n took ~4f seconds in transaction\n"
+ proc
+ duration-seconds))
+
+ (when duration-metric-name
+ (metric-observe-duration datastore
+ duration-metric-name
+ duration-seconds
+ #:buckets duration-metric-buckets)))))
(define-method (datastore-find-agent
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -636,8 +776,8 @@ SELECT description FROM agents WHERE id = :id"
(define-method (datastore-find-agent-by-name
(datastore <sqlite-datastore>)
name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -664,8 +804,8 @@ SELECT id FROM agents WHERE name = :name"
(define-method (datastore-insert-dynamic-auth-token
(datastore <sqlite-datastore>)
token)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -683,8 +823,8 @@ INSERT INTO dynamic_auth_tokens (token) VALUES (:token)"
(define-method (datastore-dynamic-auth-token-exists?
(datastore <sqlite-datastore>)
token)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -711,8 +851,8 @@ SELECT 1 FROM dynamic_auth_tokens WHERE token = :token"
(define-method (datastore-fetch-agent-tags
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -748,16 +888,16 @@ WHERE agent_tags.agent_id = :agent_id"
uuid
name
description)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(insert-agent db uuid name description)))
#t)
(define-method (datastore-list-agents
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -785,8 +925,8 @@ SELECT id, name, description, active FROM agents ORDER BY id"
(unless (boolean? active?)
(error "datastore-set-agent-active called with non-boolean"))
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -805,8 +945,8 @@ UPDATE agents SET active = :active WHERE id = :uuid"
(define-method (datastore-find-agent-status
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -840,8 +980,8 @@ WHERE agent_id = :agent_id"
1min-load-average
system-uptime
processor-count)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -877,8 +1017,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr
(datastore <sqlite-datastore>)
agent-uuid
password)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(insert-agent-password db agent-uuid password)))
#t)
@@ -887,8 +1027,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr
(datastore <sqlite-datastore>)
uuid
password)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -910,8 +1050,8 @@ WHERE agent_id = :agent_id AND password = :password"
(define-method (datastore-agent-list-passwords
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1008,6 +1148,7 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)"
(lambda (db)
(insert-derivation-and-return-outputs db derivation)
(hash-clear! %derivation-outputs-cache))
+ #:priority 'low
#:duration-metric-name "store_derivation")
#t)
@@ -1015,8 +1156,8 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)"
(define-method (datastore-build-exists-for-derivation-outputs?
(datastore <sqlite-datastore>)
derivation)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1048,8 +1189,8 @@ WHERE derivation_outputs.derivation_id = :derivation_id
(define-method (datastore-build-required-by-another?
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1143,8 +1284,8 @@ SELECT name FROM derivations WHERE id = :id"
(match (sqlite-step-and-reset statement)
(#(name) name))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let loop ((derivation-ids (list (db-find-derivation-id db derivation)))
(result '()))
@@ -1169,8 +1310,8 @@ SELECT name FROM derivations WHERE id = :id"
args)
(apply
(lambda* (system #:key include-cancelled?)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1258,8 +1399,8 @@ FROM (
(define-method (datastore-list-builds-for-derivation-recursive-inputs
(datastore <sqlite-datastore>)
derivation)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1296,8 +1437,8 @@ INNER JOIN related_derivations
(define-method (datastore-find-unprocessed-build-entry
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1326,8 +1467,8 @@ WHERE build_id = :build_id"
(datastore <sqlite-datastore>)
build-uuid
tags)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((insert-tag-statement
(sqlite-prepare
@@ -1527,8 +1668,8 @@ WHERE build_id = :build_id"
uuid
explicit-priority-lower-bound)
(define builds-to-consider
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
;; Recursively find builds for all missing outputs that this build
;; takes as inputs. The order is important here, since we want to
@@ -1678,8 +1819,8 @@ WHERE build_id = :build_id"
override-derived-priority)
(let ((build-id
old-priority
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((build-id
(db-find-build-id db uuid)))
@@ -1738,8 +1879,8 @@ WHERE build_id = :build_id"
(define-method (datastore-count-build-results
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1797,8 +1938,8 @@ VALUES (:agent_id, :result, 1)"
#t))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1849,14 +1990,12 @@ LIMIT 1"
(#f #t)
(#(1) #f))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
- (lambda (db)
- (let ((builds-statement
- (sqlite-prepare
- db
- "
-SELECT DISTINCT unprocessed_builds.id
+ (define (all-build-ids db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT input_builds.id
FROM builds
INNER JOIN derivation_outputs
ON builds.derivation_id = derivation_outputs.derivation_id
@@ -1864,45 +2003,62 @@ INNER JOIN derivation_outputs AS all_derivation_outputs
ON all_derivation_outputs.output_id = derivation_outputs.output_id
INNER JOIN derivation_inputs
ON derivation_inputs.derivation_output_id = all_derivation_outputs.id
-INNER JOIN builds AS unprocessed_builds
- ON unprocessed_builds.processed = 0
- AND unprocessed_builds.derivation_id = derivation_inputs.derivation_id
-INNER JOIN unprocessed_builds_with_derived_priorities
- ON unprocessed_builds_with_derived_priorities.build_id = unprocessed_builds.id
- AND unprocessed_builds_with_derived_priorities.all_inputs_built = 0
+INNER JOIN builds AS input_builds
+ ON input_builds.processed = 0
+ AND input_builds.canceled = 0
+ AND input_builds.derivation_id = derivation_inputs.derivation_id
WHERE builds.id = :build_id"
- #:cache? #t))
+ #:cache? #t)))
- (update-statement
- (sqlite-prepare
- db
- "
+ (sqlite-bind-arguments
+ statement
+ #:build_id (db-find-build-id db build-uuid))
+
+ (let ((result
+ (sqlite-fold
+ (lambda (row result)
+ (match row
+ (#(build-id)
+ (if (all-inputs-built? db build-id)
+ (cons build-id result)
+ result))))
+ '()
+ statement)))
+ (sqlite-reset statement)
+
+ result)))
+
+ (let ((build-ids
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (all-build-ids db)))))
+ (call-with-writer-thread/delay-logging
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
UPDATE unprocessed_builds_with_derived_priorities
SET all_inputs_built = 1
WHERE build_id = :build_id"
- #:cache? #t)))
+ #:cache? #t)))
+
+ (for-each
+ (lambda (build-id)
+ (sqlite-bind-arguments statement
+ #:build_id build-id)
+ (sqlite-step-and-reset statement))
+ build-ids)
- (sqlite-bind-arguments builds-statement
- #:build_id (db-find-build-id db build-uuid))
-
- (sqlite-fold
- (lambda (row result)
- (match row
- (#(build-id)
- (when (all-inputs-built? db build-id)
- (sqlite-bind-arguments update-statement
- #:build_id build-id)
- (sqlite-step-and-reset update-statement))))
- #f)
- #f
- builds-statement)
- (sqlite-reset builds-statement)))))
+ #t)))))
(define-method (datastore-remove-build-allocation
(datastore <sqlite-datastore>)
build-uuid agent-id)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1924,8 +2080,8 @@ DELETE FROM allocated_builds
(define-method (datastore-mark-build-as-processed
(datastore <sqlite-datastore>)
build-uuid end-time)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1961,8 +2117,8 @@ DELETE FROM unprocessed_builds_with_derived_priorities
(define-method (datastore-delete-relevant-outputs-from-unbuilt-outputs
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1989,8 +2145,8 @@ WHERE output_id IN (
(datastore <sqlite-datastore>)
build-uuid
output-metadata)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(define (name->output-id name)
(let ((statement
@@ -2067,8 +2223,8 @@ INSERT INTO build_starts (
(define-method (datastore-find-build-starts
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2177,8 +2333,8 @@ INSERT INTO setup_failure_missing_inputs (
(define-method (datastore-list-setup-failure-missing-inputs
(datastore <sqlite-datastore>)
setup-failure-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2206,8 +2362,8 @@ WHERE setup_failure_id = :id"
build-uuid
agent-id
failure-reason)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(insert-setup-failure-and-remove-allocation db
(db-find-build-id db build-uuid)
@@ -2224,8 +2380,8 @@ WHERE setup_failure_id = :id"
(define-method (datastore-count-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2248,8 +2404,8 @@ FROM builds_counts"
(define-method (datastore-for-each-build
(datastore <sqlite-datastore>)
proc)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2288,8 +2444,8 @@ SELECT id FROM builds WHERE uuid = :uuid"
(define-method (datastore-find-build
(datastore <sqlite-datastore>)
uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2347,12 +2503,14 @@ WHERE uuid = :uuid"
(canceled 'unset)
(priority-> 'unset)
(priority-< 'unset)
+ (created-at-> 'unset)
+ (created-at-< 'unset)
(after-id #f)
(limit #f)
;; other-builds-dependent or no-dependent-builds
(relationship 'unset))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(define tag->expression
(let ((statement
@@ -2365,7 +2523,7 @@ SELECT id FROM tags WHERE key = :key AND value = :value"
(sqlite-prepare
db
"
-SELECT id FROM tags WHERE key = :key"
+SELECT 1 FROM tags WHERE key = :key LIMIT 1"
#:cache? #t)))
(lambda (tag not?)
(match tag
@@ -2386,28 +2544,27 @@ SELECT id FROM tags WHERE key = :key"
(sqlite-bind-arguments key-statement
#:key key)
- (let* ((tag-ids (sqlite-map
- (match-lambda
- (#(id) id))
- key-statement))
- (result
- (string-append
- "("
- (string-join
- (map
- (lambda (id)
+ (let ((tag-with-key-exists?
+ (->bool (sqlite-step-and-reset key-statement))))
+ (if tag-with-key-exists?
+ (let ((result
(string-append
+ "("
(if not? "NOT " "")
- "EXISTS (SELECT 1 FROM build_tags "
- "WHERE build_id = builds.id AND tag_id = "
- (number->string id)
- ")"))
- tag-ids)
- (if not? " AND " " OR "))
- ")")))
- (sqlite-reset key-statement)
-
- result))))))
+ "
+EXISTS (
+ SELECT 1
+ FROM build_tags
+ INNER JOIN tags ON build_tags.tag_id = tags.id
+ WHERE build_id = builds.id
+ AND tags.key = '"
+ key "'
+)"
+ ")")))
+ result)
+ (if not?
+ "TRUE"
+ "FALSE"))))))))
(let ((tag-expressions
(map (lambda (tag)
@@ -2429,6 +2586,8 @@ SELECT id FROM tags WHERE key = :key"
(not (null? not-systems))
(not (eq? priority-> 'unset))
(not (eq? priority-< 'unset))
+ (not (eq? created-at-> 'unset))
+ (not (eq? created-at-< 'unset))
(not (eq? processed 'unset))
(not (eq? canceled 'unset))
(not (eq? relationship 'unset))
@@ -2479,6 +2638,14 @@ INNER JOIN derivations
(list
(simple-format #f "priority < ~A" priority-<))
'())
+ (if (string? created-at->)
+ (list
+ (simple-format #f "created_at > '~A'" created-at->))
+ '())
+ (if (string? created-at-<)
+ (list
+ (simple-format #f "created_at < '~A'" created-at-<))
+ '())
(cond
((eq? processed #t) '("processed = 1"))
((eq? processed #f) '("processed = 0"))
@@ -2582,8 +2749,8 @@ WHERE derivation_outputs.derivation_id = builds.derivation_id)"))
(define-method (datastore-fetch-build-tags
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2617,8 +2784,8 @@ WHERE build_tags.build_id = :build_id"
(define-method (datastore-find-build-result
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2643,8 +2810,8 @@ WHERE build_id = :build_id"
(define-method (datastore-find-build-derivation-system
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2680,8 +2847,8 @@ WHERE builds.id = :build_id"
datastore
"list_builds_for_output"
(lambda ()
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2729,8 +2896,8 @@ WHERE derivation_outputs.output_id = :output_id"
rest)
(apply
(lambda* (output system #:key include-canceled?)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2774,8 +2941,8 @@ WHERE derivation_outputs.output_id = :output_id
rest)
(apply
(lambda* (derivation #:key (include-canceled? #t))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2810,8 +2977,8 @@ WHERE derivations.name = :derivation"
(define-method (datastore-count-setup-failures
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2835,8 +3002,8 @@ GROUP BY agent_id, failure_reason"
(define-method (datastore-list-setup-failures-for-build
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2868,8 +3035,8 @@ WHERE build_id = :build_id"
args)
(apply
(lambda* (#:key agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2920,8 +3087,8 @@ WHERE builds.processed = 0
(define-method (datastore-list-processed-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2947,8 +3114,8 @@ WHERE processed = 1"
(define-method (datastore-list-unprocessed-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -2985,8 +3152,8 @@ ORDER BY priority DESC"
(define-method (datastore-find-deferred-build
(datastore <sqlite-datastore>)
select?)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3030,9 +3197,11 @@ ORDER BY deferred_until ASC"
(sqlite-prepare
db
"
-SELECT builds.uuid
+SELECT builds.uuid, systems.system
FROM unprocessed_builds_with_derived_priorities
INNER JOIN builds ON build_id = builds.id
+INNER JOIN derivations ON builds.derivation_id = derivations.id
+INNER JOIN systems ON derivations.system_id = systems.id
WHERE all_inputs_built = 1
AND NOT EXISTS (
SELECT 1
@@ -3042,26 +3211,21 @@ WHERE all_inputs_built = 1
) ORDER BY derived_priority ASC"
#:cache? #t)))
- (let ((result (sqlite-fold
- (lambda (row result)
- (cons (vector-ref row 0)
- result))
- '()
- statement)))
+ (let ((result (sqlite-fold cons '() statement)))
(sqlite-reset statement)
result)))
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
fetch-prioritised-unprocessed-builds))
(define-method (datastore-insert-unprocessed-hook-event
(datastore <sqlite-datastore>)
event
arguments)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(insert-unprocessed-hook-event db
event
@@ -3091,8 +3255,8 @@ VALUES (:event, :arguments)"
(define-method (datastore-count-unprocessed-hook-events
(datastore <sqlite-datastore>))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3115,8 +3279,8 @@ SELECT event, COUNT(*) FROM unprocessed_hook_events GROUP BY event"
(datastore <sqlite-datastore>)
event
limit)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3149,8 +3313,8 @@ LIMIT :limit"
(define-method (datastore-find-unprocessed-hook-event
(datastore <sqlite-datastore>)
id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3176,8 +3340,8 @@ WHERE id = :id"
(define-method (datastore-delete-unprocessed-hook-event
(datastore <sqlite-datastore>)
id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread
+ datastore
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3190,13 +3354,14 @@ DELETE FROM unprocessed_hook_events WHERE id = :id"
statement
#:id id)
- (sqlite-step-and-reset statement))))
+ (sqlite-step-and-reset statement)))
+ #:priority 'high)
#t)
(define-method (datastore-count-allocated-builds
(datastore <sqlite-datastore>))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3218,8 +3383,8 @@ SELECT agent_id, COUNT(*) FROM allocated_builds GROUP BY agent_id"
(define-method (datastore-agent-requested-systems
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3293,8 +3458,8 @@ INSERT INTO build_allocation_agent_requested_systems (agent_id, system_id) VALUE
(define-method (datastore-fetch-build-to-allocate
(datastore <sqlite-datastore>)
build-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3322,8 +3487,8 @@ WHERE builds.uuid = :uuid
(datastore <sqlite-datastore>)
agent-id
derivation-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3352,98 +3517,54 @@ WHERE build_derivation_outputs.derivation_id = :derivation_id
(define-method (datastore-insert-to-allocated-builds
(datastore <sqlite-datastore>)
agent-id
- build-uuids)
- (call-with-worker-thread
- (slot-ref datastore 'worker-writer-thread-channel)
+ build-uuid)
+ (call-with-writer-thread
+ datastore
(lambda (db)
- (sqlite-exec
- db
- (string-append
- "
-INSERT INTO allocated_builds (build_id, agent_id) VALUES "
- (string-join
- (map (lambda (build-uuid)
- (simple-format
- #f
- "(~A, '~A')"
- (db-find-build-id db build-uuid)
- agent-id))
- build-uuids)
- ", ")
- ";")))))
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO allocated_builds (build_id, agent_id)
+ VALUES (:build_id, :agent_id)"
+ #:cache? #t)))
-(define-method (datastore-list-allocation-plan-builds
- (datastore <sqlite-datastore>)
- .
- rest)
- (apply
- (lambda* (agent-id #:key limit)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
- (lambda (db)
- (let ((statement
- (sqlite-prepare
- db
- (string-append
- "
-SELECT builds.uuid, derivations.name, systems.system,
- builds.priority,
- unprocessed_builds_with_derived_priorities.derived_priority
-FROM builds
-INNER JOIN derivations
- ON builds.derivation_id = derivations.id
-INNER JOIN systems
- ON derivations.system_id = systems.id
-INNER JOIN build_allocation_plan
- ON builds.id = build_allocation_plan.build_id
-LEFT JOIN unprocessed_builds_with_derived_priorities
- ON builds.id = unprocessed_builds_with_derived_priorities.build_id
-WHERE build_allocation_plan.agent_id = :agent_id
- AND builds.processed = 0
- AND builds.id NOT IN (SELECT build_id FROM allocated_builds)
-ORDER BY build_allocation_plan.ordering ASC"
- (if limit
- "
-LIMIT :limit"
- ""))
- #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:build_id (db-find-build-id db build-uuid)
+ #:agent_id agent-id)
- (apply sqlite-bind-arguments
- statement
- #:agent_id agent-id
- (if limit
- (list #:limit limit)
- '()))
+ (sqlite-step-and-reset statement))))
+ #t)
- (let ((builds (sqlite-map
- (match-lambda
- (#(uuid derivation_name system
- priority derived_priority)
- `((uuid . ,uuid)
- (derivation_name . ,derivation_name)
- (system . ,system)
- (priority . ,priority)
- (derived_priority . ,derived_priority)
- (tags . ,(vector-map
- (lambda (_ tag)
- (match tag
- ((key . value)
- `((key . ,key)
- (value . ,value)))))
- (datastore-fetch-build-tags
- datastore
- uuid))))))
- statement)))
- (sqlite-reset statement)
+(define-method (datastore-update-allocated-build-submit-outputs
+ (datastore <sqlite-datastore>)
+ build-uuid
+ submit-outputs?)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+UPDATE allocated_builds
+SET submit_outputs = :submit_outputs
+WHERE build_id = :build_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:build_id (db-find-build-id db build-uuid)
+ #:submit_outputs (if submit-outputs? 1 0))
- builds)))))
- rest))
+ (sqlite-step-and-reset statement))))
+ #t)
(define-method (datastore-list-agent-builds
(datastore <sqlite-datastore>)
agent-id)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3451,7 +3572,7 @@ LIMIT :limit"
"
SELECT builds.uuid, derivations.name,
unprocessed_builds_with_derived_priorities.derived_priority,
- builds.canceled
+ builds.canceled, allocated_builds.submit_outputs
FROM builds
INNER JOIN derivations
ON builds.derivation_id = derivations.id
@@ -3468,11 +3589,17 @@ WHERE allocated_builds.agent_id = :agent_id"
(let ((builds (sqlite-map
(match-lambda
- (#(uuid derivation_name derived_priority canceled)
+ (#(uuid derivation_name derived_priority canceled
+ submit_outputs)
`((uuid . ,uuid)
(derivation_name . ,derivation_name)
(derived_priority . ,derived_priority)
- (canceled . ,(= 1 canceled)))))
+ (canceled . ,(= 1 canceled))
+ (submit_outputs . ,(cond
+ ((not submit_outputs)
+ 'null)
+ (else
+ (= 1 submit_outputs)))))))
statement)))
(sqlite-reset statement)
@@ -3481,8 +3608,8 @@ WHERE allocated_builds.agent_id = :agent_id"
(define-method (datastore-agent-for-build
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3544,11 +3671,11 @@ WHERE build_results.build_id = :build_id"
"_sqitch_registry.db")
(string-append "db:sqlite:" database-file))))
- (simple-format #t "running command: ~A\n"
- (string-join command))
+ (simple-format/safe #t "running command: ~A\n"
+ (string-join command))
(let ((pid (spawn (%config 'sqitch) command)))
(unless (zero? (cdr (waitpid pid)))
- (simple-format
+ (simple-format/safe
(current-error-port)
"error: sqitch command failed\n")
(exit 1)))))
@@ -3638,16 +3765,16 @@ WHERE name = :name"
(define-method (datastore-find-derivation
(datastore <sqlite-datastore>)
name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(db-find-derivation db name))))
(define-method (datastore-find-derivation-outputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3679,8 +3806,8 @@ WHERE derivation_id = :derivation_id"
(define-method (datastore-find-derivation-output-details
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3722,8 +3849,8 @@ WHERE derivation_id = :derivation_id"
(define-method (datastore-list-unbuilt-derivation-outputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3756,8 +3883,8 @@ WHERE derivation_id = :derivation_id"
(define-method (datastore-list-build-outputs
(datastore <sqlite-datastore>)
build-uuid)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3801,8 +3928,8 @@ WHERE builds.id = :build_id"
(define-method (datastore-find-derivation-system
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread/delay-logging
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3825,8 +3952,8 @@ WHERE name = :name"
(define-method (datastore-find-derivation-inputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3866,8 +3993,8 @@ WHERE derivations.id = :derivation_id"
(define-method (datastore-find-recursive-derivation-input-outputs
(datastore <sqlite-datastore>)
derivation-name)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3908,8 +4035,8 @@ INNER JOIN outputs
(datastore <sqlite-datastore>)
start-derivation-name
output)
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -3992,20 +4119,6 @@ SELECT system FROM systems WHERE id = :id"
(define derivation-name
(derivation-file-name derivation))
- (define (maybe-fix-fixed-output-field derivation-details)
- (let ((fixed-output? (fixed-output-derivation? derivation)))
- (unless (equal? (assq-ref derivation-details 'fixed-output?)
- fixed-output?)
- (let ((statement (sqlite-prepare
- db
- "
-UPDATE derivations SET fixed_output = :fixed_output WHERE name = :name"
- #:cache? #t)))
- (sqlite-bind-arguments statement
- #:name derivation-name
- #:fixed_output (if fixed-output? 1 0))
- (sqlite-step-and-reset statement)))))
-
(define (insert-derivation)
(let ((statement
(sqlite-prepare
@@ -4036,8 +4149,6 @@ INSERT INTO derivations (name, system_id, fixed_output)
(db-find-derivation db derivation-name)))
(if derivation-details
(begin
- (maybe-fix-fixed-output-field derivation-details)
-
(let ((derivation-outputs
(select-derivation-outputs db derivation-name)))
@@ -4346,15 +4457,15 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)"
(lambda* (uuid drv-name priority defer-until
#:key skip-updating-other-build-derived-priorities)
(define system-id
- (call-with-worker-thread
- (slot-ref datastore 'worker-reader-thread-channel)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
(lambda (db)
(db-system->system-id
db
(datastore-find-derivation-system datastore drv-name)))))
- (call-with-worker-thread/delay-logging
- (slot-ref datastore 'worker-writer-thread-channel)
+ (call-with-writer-thread/delay-logging
+ datastore
(lambda (db)
(let* ((build-id (insert-build db drv-name uuid priority
defer-until))
@@ -4380,7 +4491,8 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)"
#:args
(list db
build-id
- derived-priority)))))))
+ derived-priority)))))
+ #:priority 'low))
rest)
#t)
@@ -4416,3 +4528,212 @@ VALUES (:agent_id, :password)"
#:password password)
(sqlite-step-and-reset statement)))
+
+(define-method (datastore-insert-background-job
+ (datastore <sqlite-datastore>)
+ type
+ args)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO background_jobs_queue (type, args)
+VALUES (:type, :args)
+RETURNING id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:type (symbol->string type)
+ #:args (call-with-output-string
+ (lambda (port)
+ (write args port))))
+
+ (match (sqlite-step-and-reset statement)
+ (#(id)
+
+ (metric-increment
+ (metrics-registry-fetch-metric
+ (slot-ref datastore 'metrics-registry)
+ "coordinator_background_job_inserted_total")
+ #:label-values `((name . ,type)))
+
+ id))))))
+
+(define-method (datastore-delete-background-job
+ (datastore <sqlite-datastore>)
+ id)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+DELETE FROM background_jobs_queue WHERE id = :id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:id id)
+
+ (sqlite-step-and-reset statement))
+ #t)
+ #:priority 'high))
+
+(define-method (datastore-select-background-jobs
+ (datastore <sqlite-datastore>)
+ .
+ args)
+ (apply
+ (lambda* (type #:key (limit 1))
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT id, args
+FROM background_jobs_queue
+WHERE type = :type
+ORDER BY id ASC
+LIMIT :limit"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:type (symbol->string type)
+ #:limit limit)
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(id args)
+ `((id . ,id)
+ (args . ,(call-with-input-string args read)))))
+ statement)))
+ (sqlite-reset statement)
+ result)))))
+ args))
+
+(define-method (datastore-count-background-jobs
+ (datastore <sqlite-datastore>))
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT type, COUNT(*)
+FROM background_jobs_queue
+GROUP BY type"
+ #:cache? #t)))
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(type count)
+ (cons type count)))
+ statement)))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define-method (datastore-check-and-correct-unprocessed-builds-all-inputs-built
+ (datastore <sqlite-datastore>)
+ .
+ args)
+ (define entries-to-check
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT build_id, builds.derivation_id
+FROM unprocessed_builds_with_derived_priorities
+INNER JOIN builds ON builds.id = build_id
+WHERE all_inputs_built = 0
+ORDER BY build_id DESC"
+ #:cache? #t)))
+ (let ((result (sqlite-map identity statement)))
+ (sqlite-reset statement)
+ result)))))
+
+ (define (all-inputs-built? derivation-id)
+ (call-with-thread
+ (slot-ref datastore 'reader-thread-pool)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT 1
+FROM derivation_inputs
+INNER JOIN derivation_outputs
+ ON derivation_inputs.derivation_output_id = derivation_outputs.id
+INNER JOIN unbuilt_outputs
+ ON unbuilt_outputs.output_id = derivation_outputs.output_id
+WHERE derivation_inputs.derivation_id = :derivation_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:derivation_id derivation-id)
+
+ (match (sqlite-step-and-reset statement)
+ (#(1) #f)
+ (#f #t))))))
+
+ (define (update build-id)
+ (call-with-writer-thread
+ datastore
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+UPDATE unprocessed_builds_with_derived_priorities
+SET all_inputs_built = 1
+WHERE build_id = :build_id AND all_inputs_built = 0
+RETURNING 1"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:build_id build-id)
+
+ ;; This is to cope with the check not being transactional, so we
+ ;; might go to update a record which has just been changed to have
+ ;; all_inputs_built = 1
+ (match (sqlite-step-and-reset statement)
+ (#(1) #t)
+ (#f #f))))))
+
+ (apply
+ (lambda* (#:key (progress-reporter (const progress-reporter/silent)))
+ (let ((reporter
+ (progress-reporter (length entries-to-check))))
+ (start-progress-reporter! reporter)
+ (let loop ((entries entries-to-check)
+ (updated-count 0))
+ (if (null? entries)
+ (begin
+ (stop-progress-reporter! reporter)
+ updated-count)
+ (match (car entries)
+ (#(build-id derivation-id)
+ (progress-reporter-report! reporter)
+ (if (all-inputs-built? derivation-id)
+ (loop (cdr entries)
+ (+ updated-count
+ (if (update build-id)
+ 1
+ 0)))
+ (loop (cdr entries)
+ updated-count))))))))
+ args))
diff --git a/guix-build-coordinator/guix-data-service.scm b/guix-build-coordinator/guix-data-service.scm
index e2ecbe7..584bbbd 100644
--- a/guix-build-coordinator/guix-data-service.scm
+++ b/guix-build-coordinator/guix-data-service.scm
@@ -27,6 +27,7 @@
#:use-module (json)
#:use-module (web client)
#:use-module (web response)
+ #:use-module (knots timeout)
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator datastore)
#:export (send-build-event-to-guix-data-service
@@ -51,7 +52,8 @@
#:body body
;; Guile doesn't treat JSON as text, so decode the
;; body manually
- #:decode-body? #f))))
+ #:decode-body? #f))
+ #:timeout 10))
(code
(response-code response)))
(unless (and (>= code 200)
diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm
index 030f310..3cd1c59 100644
--- a/guix-build-coordinator/hooks.scm
+++ b/guix-build-coordinator/hooks.scm
@@ -26,6 +26,7 @@
#:use-module (gcrypt pk-crypto)
#:use-module (zlib)
#:use-module (lzlib)
+ #:use-module (knots timeout)
#:use-module (guix pki)
#:use-module (guix store)
#:use-module (guix base32)
@@ -156,7 +157,8 @@
(substitute-derivation store
drv-name
#:substitute-urls
- derivation-substitute-urls)))
+ derivation-substitute-urls))
+ #:timeout 120)
(add-temp-root store drv-name))
(let* ((drv (read-derivation-from-file* drv-name))
@@ -620,18 +622,15 @@
(unless (eq? source-compression recompress-to)
(when (file-exists? tmp-output-log-file)
(delete-file tmp-output-log-file))
- (with-port-timeouts
- (lambda ()
- (call-with-compressed-input-file
- source-log-file
- source-compression
- (lambda (input-port)
- (call-with-compressed-output-file
- tmp-output-log-file
- recompress-to
- (lambda (output-port)
- (dump-port input-port output-port))))))
- #:timeout timeout)
+ (call-with-compressed-input-file
+ source-log-file
+ source-compression
+ (lambda (input-port)
+ (call-with-compressed-output-file
+ tmp-output-log-file
+ recompress-to
+ (lambda (output-port)
+ (dump-port input-port output-port)))))
(rename-file tmp-output-log-file
output-log-file)
(delete-file source-log-file)))))
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index d747962..f4f15dc 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -14,10 +14,11 @@
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 binary-ports)
- #:use-module (ice-9 suspendable-ports)
- #:use-module ((ice-9 ports internal) #:select (port-poll))
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
+ #:use-module (oop goops)
+ #:use-module (logging logger)
+ #:use-module (logging port-log)
#:use-module (web uri)
#:use-module (web http)
#:use-module (web client)
@@ -42,18 +43,9 @@
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:use-module (guix scripts substitute)
+ #:use-module (guix-build-coordinator utils timeout)
#:export (random-v4-uuid
- &port-timeout
- &port-read-timeout
- &port-write-timeout
-
- port-timeout-error?
- port-read-timeout-error?
- port-write-timeout-error?
-
- with-port-timeouts
-
request-query-parameters
call-with-streaming-http-request
@@ -94,7 +86,13 @@
open-socket-for-uri*
- check-locale!))
+ check-locale!
+
+ display/safe
+ simple-format/safe
+ format/safe
+
+ <custom-port-log>))
(eval-when (eval load compile)
(begin
@@ -182,73 +180,6 @@
(parse-query-string query))
'())))
-(define &port-timeout
- (make-exception-type '&port-timeout
- &external-error
- '(port)))
-
-(define make-port-timeout-error
- (record-constructor &port-timeout))
-
-(define port-timeout-error?
- (record-predicate &port-timeout))
-
-(define &port-read-timeout
- (make-exception-type '&port-read-timeout
- &port-timeout
- '()))
-
-(define make-port-read-timeout-error
- (record-constructor &port-read-timeout))
-
-(define port-read-timeout-error?
- (record-predicate &port-read-timeout))
-
-(define &port-write-timeout
- (make-exception-type '&port-write-timeout
- &port-timeout
- '()))
-
-(define make-port-write-timeout-error
- (record-constructor &port-write-timeout))
-
-(define port-write-timeout-error?
- (record-predicate &port-write-timeout))
-
-(define* (with-port-timeouts thunk #:key (timeout 120))
-
- ;; When the GC runs, it restarts the poll syscall, but the timeout remains
- ;; unchanged! When the timeout is longer than the time between the syscall
- ;; restarting, I think this renders the timeout useless. Therefore, this
- ;; code uses a short timeout, and repeatedly calls poll while watching the
- ;; clock to see if it has timed out overall.
- (define poll-timeout-ms 200)
-
- (define (wait port mode)
- (let ((timeout-internal
- (+ (get-internal-real-time)
- (* internal-time-units-per-second timeout))))
- (let loop ((poll-value
- (port-poll port mode poll-timeout-ms)))
- (if (= poll-value 0)
- (if (> (get-internal-real-time)
- timeout-internal)
- (raise-exception
- (if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
- (loop (port-poll port mode poll-timeout-ms)))
- poll-value))))
-
- (parameterize
- ((current-read-waiter
- (lambda (port)
- (wait port "r")))
- (current-write-waiter
- (lambda (port)
- (wait port "w"))))
- (thunk)))
-
(define* (call-with-streaming-http-request uri
content-length
callback
@@ -275,8 +206,8 @@
(setvbuf port 'block (expt 2 13))
(with-exception-handler
(lambda (exp)
- (simple-format #t "error: ~A ~A: ~A\n"
- method (uri-path uri) exp)
+ (simple-format/safe #t "error: ~A ~A: ~A\n"
+ method (uri-path uri) exp)
(close-port port)
(raise-exception exp))
(lambda ()
@@ -291,7 +222,8 @@
(let ((body (read-response-body response)))
(close-port port)
(values response
- body)))))))))))
+ body)))))))))
+ #:timeout 120))
(define (find-missing-substitutes-for-output store substitute-urls output)
(if (valid-path? store output)
@@ -349,7 +281,7 @@
(when (file-exists? cache-file)
(with-exception-handler
(lambda (exn)
- (simple-format
+ (simple-format/safe
(current-error-port)
"error: when deleting substitute cache file: ~A\n"
exn))
@@ -361,7 +293,18 @@
(let ((substitute-urls
(append-map (lambda (substitute-url)
(let ((log-port (open-output-string)))
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format/safe
+ (current-error-port)
+ "exception in has-substiutes-no-cache? (~A): ~A\n"
+ substitute-url exn)
+ (display/safe (string-append
+ (get-output-string log-port)
+ "\n")
+ (current-error-port))
+ (close-output-port log-port)
+ (raise-exception exn))
(lambda ()
(if (null?
;; I doubt the caching is thread safe, so
@@ -371,17 +314,7 @@
(lookup-narinfos substitute-url
(list file)))))
'()
- (list substitute-url)))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "exception in has-substiutes-no-cache? (~A) ~A: ~A\n"
- substitute-url key args)
- (display (string-append
- (get-output-string log-port)
- "\n")
- (current-error-port))
- (close-output-port log-port)))))
+ (list substitute-url))))))
substitute-urls)))
substitute-urls))
@@ -415,7 +348,7 @@
(take-right lines 10)
lines)))
(close-output-port log-port)
- (simple-format
+ (simple-format/safe
(current-error-port)
"exception when substituting derivation: ~A:\n ~A\n"
exn (string-join last-n-lines "\n"))
@@ -425,27 +358,23 @@
(ensure-path store derivation-name)))
#:unwind? #t)))
-(define read-derivation-from-file*
- (let ((%derivation-cache
- (@@ (guix derivations) %derivation-cache)))
- (lambda (file)
- (or (and file (hash-ref %derivation-cache file))
- (let ((drv
- ;; read-derivation can call read-derivation-from-file, so to
- ;; avoid having many open files when reading a derivation with
- ;; inputs, read it in to a string first.
- (call-with-input-string
- ;; Avoid calling scm_i_relativize_path in
- ;; fport_canonicalize_filename since this leads to lots
- ;; of readlink calls
- (with-fluids ((%file-port-name-canonicalization 'none))
- (call-with-input-file file
- get-string-all))
- (lambda (port)
- (set-port-filename! port file)
- (read-derivation port read-derivation-from-file*)))))
- (hash-set! %derivation-cache file drv)
- drv)))))
+(define* (read-derivation-from-file* file #:optional (drv-hash (make-hash-table)))
+ (or (and file (hash-ref drv-hash file))
+ (let ((drv
+ ;; read-derivation can call read-derivation-from-file, so to
+ ;; avoid having many open files when reading a derivation with
+ ;; inputs, read it in to a string first.
+ (call-with-input-string
+ (call-with-input-file file
+ get-string-all)
+ (lambda (port)
+ (set-port-filename! port file)
+ (read-derivation port (lambda (file)
+ (read-derivation-from-file*
+ file
+ drv-hash)))))))
+ (hash-set! drv-hash file drv)
+ drv)))
(define (read-derivation-through-substitutes derivation-name
substitute-urls)
@@ -638,7 +567,7 @@ References: ~a~%"
#:unwind? #t)
((#t . return-values)
(when (> attempt 1)
- (simple-format
+ (simple-format/safe
(current-error-port)
"retry success: ~A\n on attempt ~A of ~A\n"
f
@@ -649,7 +578,7 @@ References: ~a~%"
(if (>= attempt
(- times 1))
(begin
- (simple-format
+ (simple-format/safe
(current-error-port)
"error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n"
f
@@ -660,14 +589,14 @@ References: ~a~%"
(when error-hook
(error-hook attempt exn))
(sleep-impl delay)
- (simple-format
+ (simple-format/safe
(current-error-port)
"running last retry of ~A after ~A failed attempts\n"
f
attempt)
(f))
(begin
- (simple-format
+ (simple-format/safe
(current-error-port)
"error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n"
f
@@ -895,27 +824,29 @@ References: ~a~%"
(define (thread-process-job job-args)
(with-exception-handler
(lambda (exn)
- (simple-format (current-error-port)
- "~A work queue, job raised exception ~A: ~A\n"
- name job-args exn))
+ (simple-format/safe
+ (current-error-port)
+ "~A work queue, job raised exception ~A: ~A\n"
+ name job-args exn))
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format/safe
+ (current-error-port)
+ "~A work queue, exception when handling job: ~A\n"
+ name exn)
+ (let* ((stack (make-stack #t 3))
+ (backtrace
+ (call-with-output-string
+ (lambda (port)
+ (display-backtrace stack port)
+ (newline port)))))
+ (display/safe
+ backtrace
+ (current-error-port)))
+ (raise-exception exn))
(lambda ()
- (apply proc job-args))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "~A work queue, exception when handling job: ~A ~A\n"
- name key args)
- (let* ((stack (make-stack #t 3))
- (backtrace
- (call-with-output-string
- (lambda (port)
- (display-backtrace stack port)
- (newline port)))))
- (display
- backtrace
- (current-error-port))))))
+ (apply proc job-args))))
#:unwind? #t))
(define (start-thread thread-index)
@@ -1085,36 +1016,29 @@ References: ~a~%"
(define (thread-process-job job-args)
(with-exception-handler
(lambda (exn)
- (with-exception-handler
- (lambda _
- #f)
- (lambda ()
- ;; Logging may raise an exception, so try and just keep going.
- (display
- (simple-format
- #f
- "~A thread pool, job raised exception ~A: ~A\n"
- name job-args exn)
- (current-error-port)))
- #:unwind? #t))
+ (simple-format/safe
+ (current-error-port)
+ "~A thread pool, job raised exception ~A: ~A\n"
+ name job-args exn))
(lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply proc job-args))
- (lambda (key . args)
- (simple-format
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format/safe
(current-error-port)
- "~A thread pool, exception when handling job: ~A ~A\n"
- name key args)
+ "~A thread pool, exception when handling job: ~A\n"
+ name exn)
(let* ((stack (make-stack #t 3))
(backtrace
(call-with-output-string
(lambda (port)
(display-backtrace stack port)
(newline port)))))
- (display
+ (display/safe
backtrace
- (current-error-port))))))
+ (current-error-port)))
+ (raise-exception exn))
+ (lambda ()
+ (apply proc job-args))))
#:unwind? #t))
(define (start-thread thread-index)
@@ -1354,22 +1278,18 @@ References: ~a~%"
(define (check-locale!)
(with-exception-handler
(lambda (exn)
- (display
- (simple-format
- #f
- "exception when calling setlocale: ~A
+ (simple-format/safe
+ (current-error-port)
+ "exception when calling setlocale: ~A
falling back to en_US.utf8\n"
- exn)
- (current-error-port))
+ exn)
(with-exception-handler
(lambda (exn)
- (display
- (simple-format
- #f
- "exception when calling setlocale with en_US.utf8: ~A\n"
- exn)
- (current-error-port))
+ (simple-format/safe
+ (current-error-port)
+ "exception when calling setlocale with en_US.utf8: ~A\n"
+ exn)
(exit 1))
(lambda _
@@ -1378,3 +1298,51 @@ falling back to en_US.utf8\n"
(lambda _
(setlocale LC_ALL ""))
#:unwind? #t))
+
+(define* (display/safe obj #:optional (port (current-output-port)))
+ ;; Try to avoid the dreaded conversion to port encoding failed error #62590
+ (put-bytevector
+ port
+ (string->utf8
+ (call-with-output-string
+ (lambda (port)
+ (display obj port)))))
+ (force-output port))
+
+(define (simple-format/safe port s . args)
+ (let ((str (apply simple-format #f s args)))
+ (if (eq? #f port)
+ str
+ (display/safe
+ str
+ (if (eq? #t port)
+ (current-output-port)
+ port)))))
+
+(define (format/safe port s . args)
+ (let ((str (apply format #f s args)))
+ (if (eq? #f port)
+ str
+ (display/safe
+ str
+ (if (eq? #t port)
+ (current-output-port)
+ port)))))
+
+(define-class <custom-port-log> (<log-handler>)
+ (port #:init-value #f #:accessor port #:init-keyword #:port))
+
+(define-method (emit-log (self <custom-port-log>) str)
+ (when (port self)
+ (put-bytevector (port self)
+ (string->utf8 str))
+ ;; Even though the port is line buffered, writing to it with
+ ;; put-bytevector doesn't cause the buffer to be flushed.
+ (force-output (port self))))
+
+(define-method (flush-log (self <custom-port-log>))
+ (and=> (port self) force-output))
+
+(define-method (close-log! (self <custom-port-log>))
+ (and=> (port self) close-port)
+ (set! (port self) #f))
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 450c36b..d836ceb 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,4 +1,5 @@
(define-module (guix-build-coordinator utils fibers)
+ #:use-module (srfi srfi-9)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
@@ -12,305 +13,20 @@
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (fibers conditions)
+ #:use-module (knots timeout)
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
- #:export (make-worker-thread-channel
- %worker-thread-default-timeout
- call-with-worker-thread
- worker-thread-timeout-error?
+ #:export (spawn-port-monitoring-fiber
- call-with-sigint
+ make-discrete-priority-queueing-channels
- run-server/patched
-
- spawn-port-monitoring-fiber
-
- letpar&
-
- port-timeout-error?
- port-read-timeout-error?
- port-write-timeout-error?
- with-fibers-timeout
- with-fibers-port-timeouts
-
- make-queueing-channel
- make-discrete-priority-queueing-channels)
+ make-reusable-condition
+ reusable-condition?
+ signal-reusable-condition!
+ reusable-condition-wait)
#:replace (retry-on-error))
-(define %worker-thread-args
- (make-parameter #f))
-
-(define* (make-worker-thread-channel initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- (duration-logger (const #f))
- destructor
- lifetime
- (log-exception? (const #t))
- (expire-on-exception? #f)
- (name "unnamed"))
- "Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
- (define thread-proc-vector
- (make-vector parallelism #f))
-
- (define (initializer/safe)
- (let ((args
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running initializer in worker thread (~A): ~A:\n ~A\n"
- name
- initializer
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- initializer
- (lambda args
- (backtrace))))
- #:unwind? #t)))
-
- (if args
- args
- ;; never give up, just keep retrying
- (begin
- (sleep 1)
- (initializer/safe)))))
-
- (define (destructor/safe args)
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running destructor in worker thread (~A): ~A:\n ~A\n"
- name
- destructor
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply destructor args)
- #t)
- (lambda _
- (backtrace))))
- #:unwind? #t)))
-
- (or success?
- #t
- (begin
- (sleep 1)
- (destructor/safe args)))))
-
- (define (process thread-index channel args)
- (let loop ((current-lifetime lifetime))
- (let ((exception?
- (match (get-message channel)
- (((? channel? reply) sent-time (? procedure? proc))
- (let ((time-delay
- (- (get-internal-real-time)
- sent-time)))
- (delay-logger (/ time-delay
- internal-time-units-per-second))
-
- (let* ((start-time (get-internal-real-time))
- (response
- (with-exception-handler
- (lambda (exn)
- (list 'worker-thread-error
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- exn))
- (lambda ()
- (vector-set! thread-proc-vector
- thread-index
- proc)
- (with-throw-handler #t
- (lambda ()
- (call-with-values
- (lambda ()
- (start-stack
- 'worker-thread
- (apply proc args)))
- (lambda vals
- (cons (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- vals))))
- (lambda args
- (when (match args
- (('%exception exn)
- (log-exception? exn))
- (_ #t))
- (simple-format
- (current-error-port)
- "worker-thread: exception: ~A\n" args)
- (backtrace)))))
- #:unwind? #t)))
- (put-message reply
- response)
-
- (vector-set! thread-proc-vector
- thread-index
- #f)
-
- (match response
- (('worker-thread-error duration _)
- (when duration-logger
- (duration-logger duration proc))
- #t)
- ((duration . _)
- (when duration-logger
- (duration-logger duration proc))
- #f))))))))
- (unless (and expire-on-exception?
- exception?)
- (if (number? current-lifetime)
- (unless (< current-lifetime 0)
- (loop (if current-lifetime
- (- current-lifetime 1)
- #f)))
- (loop #f))))))
-
- (let ((channel (make-channel)))
- (for-each
- (lambda (thread-index)
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append
- name " w t "
- (number->string thread-index))))
- (const #t))
-
- (let init ((args (initializer/safe)))
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "worker-thread-channel: exception: ~A\n" exn))
- (lambda ()
- (parameterize ((%worker-thread-args args))
- (process thread-index channel args)))
- #:unwind? #t)
-
- (when destructor
- (destructor/safe args))
-
- (init (initializer/safe))))))
- (iota parallelism))
-
- (values channel
- thread-proc-vector)))
-
-(define &worker-thread-timeout
- (make-exception-type '&worker-thread-timeout
- &error
- '()))
-
-(define make-worker-thread-timeout-error
- (record-constructor &worker-thread-timeout))
-
-(define worker-thread-timeout-error?
- (record-predicate &worker-thread-timeout))
-
-(define %worker-thread-default-timeout
- (make-parameter 30))
-
-(define* (call-with-worker-thread channel proc #:key duration-logger
- (timeout (%worker-thread-default-timeout)))
- "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
-If already in the worker thread, call PROC immediately."
- (let ((args (%worker-thread-args)))
- (if args
- (call-with-delay-logging proc #:args args)
- (let* ((reply (make-channel))
- (operation-success?
- (perform-operation
- (let ((put
- (wrap-operation
- (put-operation channel
- (list reply
- (get-internal-real-time)
- proc))
- (const #t))))
-
- (if timeout
- (choice-operation
- put
- (wrap-operation (sleep-operation timeout)
- (const #f)))
- put)))))
-
- (unless operation-success?
- (raise-exception
- (make-worker-thread-timeout-error)))
-
- (match (get-message reply)
- (('worker-thread-error duration exn)
- (when duration-logger
- (duration-logger duration))
- (raise-exception exn))
- ((duration . result)
- (when duration-logger
- (duration-logger duration))
- (apply values result)))))))
-
-;; Copied from (fibers web server)
-(define (call-with-sigint thunk cvar)
- (let ((handler #f))
- (dynamic-wind
- (lambda ()
- (set! handler
- (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
- thunk
- (lambda ()
- (if handler
- ;; restore Scheme handler, SIG_IGN or SIG_DFL.
- (sigaction SIGINT (car handler) (cdr handler))
- ;; restore original C handler.
- (sigaction SIGINT #f))))))
-
-;; This variant of run-server from the fibers library supports running
-;; multiple servers within one process.
-(define run-server/patched
- (let ((fibers-web-server-module
- (resolve-module '(fibers web server))))
-
- (define set-nonblocking!
- (module-ref fibers-web-server-module 'set-nonblocking!))
-
- (define make-default-socket
- (module-ref fibers-web-server-module 'make-default-socket))
-
- (define socket-loop
- (module-ref fibers-web-server-module 'socket-loop))
-
- (lambda* (handler
- #:key
- (host #f)
- (family AF_INET)
- (addr (if host
- (inet-pton family host)
- INADDR_LOOPBACK))
- (port 8080)
- (socket (make-default-socket family addr port)))
- ;; We use a large backlog by default. If the server is suddenly hit
- ;; with a number of connections on a small backlog, clients won't
- ;; receive confirmation for their SYN, leading them to retry --
- ;; probably successfully, but with a large latency.
- (listen socket 1024)
- (set-nonblocking! socket)
- (sigaction SIGPIPE SIG_IGN)
- (spawn-fiber (lambda () (socket-loop socket handler))))))
-
(define (spawn-port-monitoring-fiber port error-condition)
(spawn-fiber
(lambda ()
@@ -326,7 +42,7 @@ If already in the worker thread, call PROC immediately."
"port monitoring fiber error-condition unresponsive")
(primitive-exit 1))
(lambda ()
- (with-fibers-port-timeouts
+ (with-port-timeouts
(lambda ()
(let ((sock
(non-blocking-port
@@ -337,218 +53,6 @@ If already in the worker thread, call PROC immediately."
#:unwind? #t)
(sleep 20)))))
-(define (defer-to-fiber thunk)
- (let ((reply (make-channel)))
- (spawn-fiber
- (lambda ()
- (put-message
- reply
- (with-exception-handler
- (lambda (exn)
- (cons 'worker-fiber-error exn))
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "worker fiber: exception: ~A\n"
- exn)
- (backtrace)
- (raise-exception exn))
- (lambda ()
- (call-with-values
- thunk
- (lambda vals
- vals)))))
- #:unwind? #t)))
- #:parallel? #t)
- reply))
-
-(define (fetch-result-of-defered-thunks . reply-channels)
- (let ((responses (map get-message reply-channels)))
- (map
- (match-lambda
- (('worker-thread-error . exn)
- (raise-exception exn))
- (result
- (apply values result)))
- responses)))
-
-(define-syntax parallel-via-fibers
- (lambda (x)
- (syntax-case x ()
- ((_ e0 ...)
- (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
- #'(let ((tmp0 (defer-to-fiber
- (lambda ()
- e0)))
- ...)
- (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
-
-(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
- (call-with-values
- (lambda () (parallel-via-fibers e ...))
- (lambda (v ...)
- b0 b1 ...)))
-
-(define* (with-fibers-timeout thunk #:key timeout on-timeout)
- (let ((channel (make-channel)))
- (spawn-fiber
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (perform-operation
- (choice-operation
- (put-operation channel (cons 'exception exn))
- (sleep-operation timeout))))
- (lambda ()
- (call-with-values thunk
- (lambda vals
- (perform-operation
- (choice-operation
- (put-operation channel vals)
- (sleep-operation timeout))))))
- #:unwind? #t)))
-
- (match (perform-operation
- (choice-operation
- (get-operation channel)
- (wrap-operation (sleep-operation timeout)
- (const 'timeout))))
- ('timeout
- (on-timeout))
- (('exception . exn)
- (raise-exception exn))
- (vals
- (apply values vals)))))
-
-(define &port-timeout
- (make-exception-type '&port-timeout
- &external-error
- '(thunk port)))
-
-(define make-port-timeout-error
- (record-constructor &port-timeout))
-
-(define port-timeout-error?
- (record-predicate &port-timeout))
-
-(define &port-read-timeout
- (make-exception-type '&port-read-timeout
- &port-timeout
- '()))
-
-(define make-port-read-timeout-error
- (record-constructor &port-read-timeout))
-
-(define port-read-timeout-error?
- (record-predicate &port-read-timeout))
-
-(define &port-write-timeout
- (make-exception-type '&port-write-timeout
- &port-timeout
- '()))
-
-(define make-port-write-timeout-error
- (record-constructor &port-write-timeout))
-
-(define port-write-timeout-error?
- (record-predicate &port-write-timeout))
-
-;; These procedure are subject to spurious wakeups.
-
-(define (readable? port)
- "Test if PORT is writable."
- (= 1 (port-poll port "r" 0)))
-
-(define (writable? port)
- "Test if PORT is writable."
- (= 1 (port-poll port "w" 0)))
-
-(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
- (make-base-operation #f
- (lambda _
- (and (ready? port) values))
- (lambda (flag sched resume)
- (define (commit)
- (match (atomic-box-compare-and-swap! flag 'W 'S)
- ('W (resume values))
- ('C (commit))
- ('S #f)))
- (schedule-when-ready
- sched (port-ready-fd port) commit))))
-
-(define (wait-until-port-readable-operation port)
- "Make an operation that will succeed when PORT is readable."
- (unless (input-port? port)
- (error "refusing to wait forever for input on non-input port"))
- (make-wait-operation readable? schedule-task-when-fd-readable port
- port-read-wait-fd
- wait-until-port-readable-operation))
-
-(define (wait-until-port-writable-operation port)
- "Make an operation that will succeed when PORT is writable."
- (unless (output-port? port)
- (error "refusing to wait forever for output on non-output port"))
- (make-wait-operation writable? schedule-task-when-fd-writable port
- port-write-wait-fd
- wait-until-port-writable-operation))
-
-(define* (with-fibers-port-timeouts thunk
- #:key timeout
- (read-timeout timeout)
- (write-timeout timeout))
- (define (no-fibers-wait thunk port mode timeout)
- (define poll-timeout-ms 200)
-
- ;; When the GC runs, it restarts the poll syscall, but the timeout
- ;; remains unchanged! When the timeout is longer than the time
- ;; between the syscall restarting, I think this renders the
- ;; timeout useless. Therefore, this code uses a short timeout, and
- ;; repeatedly calls poll while watching the clock to see if it has
- ;; timed out overall.
- (let ((timeout-internal
- (+ (get-internal-real-time)
- (* internal-time-units-per-second timeout))))
- (let loop ((poll-value
- (port-poll port mode poll-timeout-ms)))
- (if (= poll-value 0)
- (if (> (get-internal-real-time)
- timeout-internal)
- (raise-exception
- (if (string=? mode "r")
- (make-port-read-timeout-error thunk port)
- (make-port-write-timeout-error thunk port)))
- (loop (port-poll port mode poll-timeout-ms)))
- poll-value))))
-
- (parameterize
- ((current-read-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-readable-operation port)
- (wrap-operation
- (sleep-operation read-timeout)
- (lambda ()
- (raise-exception
- (make-port-read-timeout-error thunk port))))))
- (no-fibers-wait thunk port "r" read-timeout))))
- (current-write-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-writable-operation port)
- (wrap-operation
- (sleep-operation write-timeout)
- (lambda ()
- (raise-exception
- (make-port-write-timeout-error thunk port))))))
- (no-fibers-wait thunk port "w" write-timeout)))))
- (thunk)))
-
;; Use the fibers sleep
(define (retry-on-error . args)
(apply
@@ -557,28 +61,6 @@ If already in the worker thread, call PROC immediately."
args
(list #:sleep-impl sleep))))
-(define (make-queueing-channel channel)
- (define queue (make-q))
-
- (let ((queue-channel (make-channel)))
- (spawn-fiber
- (lambda ()
- (while #t
- (if (q-empty? queue)
- (enq! queue
- (perform-operation
- (get-operation queue-channel)))
- (let ((front (q-front queue)))
- (perform-operation
- (choice-operation
- (wrap-operation (get-operation queue-channel)
- (lambda (val)
- (enq! queue val)))
- (wrap-operation (put-operation channel front)
- (lambda _
- (q-pop! queue))))))))))
- queue-channel))
-
(define (make-discrete-priority-queueing-channels channel num-priorities)
(define all-queues
(map (lambda _ (make-q))
@@ -588,6 +70,11 @@ If already in the worker thread, call PROC immediately."
(map (lambda _ (make-channel))
(iota num-priorities)))
+ (define (stats)
+ (map (lambda (queue)
+ `((length . ,(q-length queue))))
+ all-queues))
+
(spawn-fiber
(lambda ()
(while #t
@@ -620,4 +107,48 @@ If already in the worker thread, call PROC immediately."
(enq! queue val))))
all-queues
queue-channels)))))))))))
- (apply values queue-channels))
+ (values (list-copy queue-channels)
+ stats))
+
+(define-record-type <reusable-condition>
+ (%make-reusable-condition atomic-box channel)
+ reusable-condition?
+ (atomic-box reusable-condition-atomic-box)
+ (channel reusable-condition-channel))
+
+(define (make-reusable-condition)
+ (%make-reusable-condition (make-atomic-box #f)
+ (make-channel)))
+
+(define* (signal-reusable-condition! reusable-condition
+ #:optional (scheduler (current-scheduler)))
+ (match (atomic-box-compare-and-swap!
+ (reusable-condition-atomic-box reusable-condition)
+ #f
+ #t)
+ (#f
+ (spawn-fiber
+ (lambda ()
+ (put-message (reusable-condition-channel reusable-condition)
+ #t))
+ scheduler)
+ #t)
+ (#t #f)))
+
+(define* (reusable-condition-wait reusable-condition
+ #:key (timeout #f))
+ (let ((val
+ (if (atomic-box-ref (reusable-condition-atomic-box reusable-condition))
+ #t
+ ;; Not great as this is subject to race conditions, but it should
+ ;; roughly work
+ (if timeout
+ (perform-operation
+ (choice-operation
+ (get-operation (reusable-condition-channel reusable-condition))
+ (wrap-operation (sleep-operation timeout)
+ (const #f))))
+ (get-message (reusable-condition-channel reusable-condition))))))
+ (atomic-box-set! (reusable-condition-atomic-box reusable-condition)
+ #f)
+ val))
diff --git a/guix-build-coordinator/utils/timeout.scm b/guix-build-coordinator/utils/timeout.scm
new file mode 100644
index 0000000..bb133d7
--- /dev/null
+++ b/guix-build-coordinator/utils/timeout.scm
@@ -0,0 +1,81 @@
+(define-module (guix-build-coordinator utils timeout)
+ #:use-module (ice-9 exceptions)
+ #:use-module (ice-9 suspendable-ports)
+ #:use-module ((ice-9 ports internal) #:select (port-poll))
+ #:export (&port-timeout
+ &port-read-timeout
+ &port-write-timeout
+
+ port-timeout-error?
+ port-read-timeout-error?
+ port-write-timeout-error?
+
+ with-port-timeouts))
+
+(define &port-timeout
+ (make-exception-type '&port-timeout
+ &external-error
+ '(port)))
+
+(define make-port-timeout-error
+ (record-constructor &port-timeout))
+
+(define port-timeout-error?
+ (record-predicate &port-timeout))
+
+(define &port-read-timeout
+ (make-exception-type '&port-read-timeout
+ &port-timeout
+ '()))
+
+(define make-port-read-timeout-error
+ (record-constructor &port-read-timeout))
+
+(define port-read-timeout-error?
+ (record-predicate &port-read-timeout))
+
+(define &port-write-timeout
+ (make-exception-type '&port-write-timeout
+ &port-timeout
+ '()))
+
+(define make-port-write-timeout-error
+ (record-constructor &port-write-timeout))
+
+(define port-write-timeout-error?
+ (record-predicate &port-write-timeout))
+
+(define* (with-port-timeouts thunk #:key timeout)
+
+ ;; When the GC runs, it restarts the poll syscall, but the timeout remains
+ ;; unchanged! When the timeout is longer than the time between the syscall
+ ;; restarting, I think this renders the timeout useless. Therefore, this
+ ;; code uses a short timeout, and repeatedly calls poll while watching the
+ ;; clock to see if it has timed out overall.
+ (define poll-timeout-ms 200)
+
+ (define (wait port mode)
+ (let ((timeout-internal
+ (+ (get-internal-real-time)
+ (* internal-time-units-per-second timeout))))
+ (let loop ((poll-value
+ (port-poll port mode poll-timeout-ms)))
+ (if (= poll-value 0)
+ (if (> (get-internal-real-time)
+ timeout-internal)
+ (raise-exception
+ (if (string=? mode "r")
+ (make-port-read-timeout-error port)
+ (make-port-write-timeout-error port)))
+ (loop (port-poll port mode poll-timeout-ms)))
+ poll-value))))
+
+ (parameterize
+ ((current-read-waiter
+ (lambda (port)
+ (wait port "r")))
+ (current-write-waiter
+ (lambda (port)
+ (wait port "w"))))
+ (thunk)))
+
diff --git a/guix-dev.scm b/guix-dev.scm
index 35869dc..011d9e0 100644
--- a/guix-dev.scm
+++ b/guix-dev.scm
@@ -44,6 +44,39 @@
(gnu packages texinfo)
(srfi srfi-1))
+(define guile-knots
+ (let ((commit "d572f591a3c136bfc7b23160e16381c92588f8d9")
+ (revision "1"))
+ (package
+ (name "guile-knots")
+ (version (git-version "0" revision commit))
+ (source (origin
+ (method git-fetch)
+ (uri (git-reference
+ (url "https://git.cbaines.net/git/guile/knots")
+ (commit commit)))
+ (sha256
+ (base32
+ "0g85frfniblxb2cl81fg558ic3cxvla7fvml08scjgbbxn8151gv"))
+ (file-name (string-append name "-" version "-checkout"))))
+ (build-system gnu-build-system)
+ (native-inputs
+ (list pkg-config
+ autoconf
+ automake
+ guile-3.0
+ guile-lib
+ guile-fibers))
+ (inputs
+ (list guile-3.0))
+ (propagated-inputs
+ (list guile-fibers))
+ (home-page "https://git.cbaines.net/guile/knots")
+ (synopsis "Patterns and functionality to use with Guile Fibers")
+ (description
+ "")
+ (license license:gpl3+))))
+
(package
(name "guix-build-coordinator")
(version "0.0.0")
@@ -53,6 +86,7 @@
(list guix
guile-json-4
guile-fibers-1.3
+ guile-knots
guile-gcrypt
guile-readline
guile-lib
diff --git a/scripts/guix-build-coordinator.in b/scripts/guix-build-coordinator.in
index 72aa8d4..0c06579 100644
--- a/scripts/guix-build-coordinator.in
+++ b/scripts/guix-build-coordinator.in
@@ -58,15 +58,6 @@
(guix-build-coordinator build-allocator)
(guix-build-coordinator client-communication))
-(install-suspendable-ports!)
-
-;; TODO Work around this causing problems with backtraces
-;; https://github.com/wingo/fibers/issues/76
-(set-record-type-printer!
- (@@ (fibers scheduler) <scheduler>)
- (lambda (scheduler port)
- (display "#<scheduler>" port)))
-
(define %base-options
;; Specifications of the command-line options
(list (option '("secret-key-base-file") #t #f
@@ -83,6 +74,13 @@
(option '("update-database") #f #f
(lambda (opt name _ result)
(alist-cons 'update-database #t result)))
+ (option '("listen-repl") #f #t
+ (lambda (opt name arg result)
+ (alist-cons 'listen-repl
+ (if arg
+ (string->number arg)
+ #t)
+ (alist-delete 'listen-repl result))))
(option '("show-error-details") #f #f
(lambda (opt name _ result)
(alist-cons 'show-error-details #t result)))))
@@ -295,6 +293,16 @@
(or (assq-ref result 'not-systems)
'()))
(alist-delete 'not-systems result))))
+ (option '("created-at-gt") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'created-at->
+ (datastore-validate-datetime-string arg)
+ result)))
+ (option '("created-at-lt") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'created-at-<
+ (datastore-validate-datetime-string arg)
+ result)))
(option '("skip-updating-derived-priorities") #f #f
(lambda (opt name _ result)
(alist-cons 'skip-updating-derived-priorities
@@ -665,6 +673,8 @@ tags:
#:not-tags (assq-ref opts 'not-tags)
#:systems (assq-ref opts 'systems)
#:not-systems (assq-ref opts 'not-systems)
+ #:created-at-< (assq-ref opts 'created-at-<)
+ #:created-at-> (assq-ref opts 'created-at->)
#:processed #f
#:canceled #f
#:relationship (assq-ref opts 'relationship)))
@@ -1110,4 +1120,5 @@ tags:
#:pid-file (assq-ref opts 'pid-file)
#:agent-communication-uri (assq-ref opts 'agent-communication)
#:client-communication-uri (assq-ref opts 'client-communication)
- #:parallel-hooks (assq-ref opts 'parallel-hooks)))))))
+ #:parallel-hooks (assq-ref opts 'parallel-hooks)
+ #:listen-repl (assoc-ref opts 'listen-repl)))))))
diff --git a/sqitch/pg/deploy/allocated_builds_submit_outputs.sql b/sqitch/pg/deploy/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..a2ebe1e
--- /dev/null
+++ b/sqitch/pg/deploy/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Deploy guix-build-coordinator:allocated_builds_submit_outputs to pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/deploy/background-jobs-queue.sql b/sqitch/pg/deploy/background-jobs-queue.sql
new file mode 100644
index 0000000..63dd8d4
--- /dev/null
+++ b/sqitch/pg/deploy/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Deploy guix-build-coordinator:background-jobs-queue to pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/deploy/builds_replace_unprocessed_index.sql b/sqitch/pg/deploy/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..782cf6b
--- /dev/null
+++ b/sqitch/pg/deploy/builds_replace_unprocessed_index.sql
@@ -0,0 +1,7 @@
+-- Deploy guix-build-coordinator:builds_replace_unprocessed_index to pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/revert/allocated_builds_submit_outputs.sql b/sqitch/pg/revert/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..255efb1
--- /dev/null
+++ b/sqitch/pg/revert/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:allocated_builds_submit_outputs from pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/revert/background-jobs-queue.sql b/sqitch/pg/revert/background-jobs-queue.sql
new file mode 100644
index 0000000..46b0761
--- /dev/null
+++ b/sqitch/pg/revert/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:background-jobs-queue from pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/revert/builds_replace_unprocessed_index.sql b/sqitch/pg/revert/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..4395f90
--- /dev/null
+++ b/sqitch/pg/revert/builds_replace_unprocessed_index.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:builds_replace_unprocessed_index from pg
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/pg/verify/allocated_builds_submit_outputs.sql b/sqitch/pg/verify/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..e95a717
--- /dev/null
+++ b/sqitch/pg/verify/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:allocated_builds_submit_outputs on pg
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/pg/verify/background-jobs-queue.sql b/sqitch/pg/verify/background-jobs-queue.sql
new file mode 100644
index 0000000..a36097e
--- /dev/null
+++ b/sqitch/pg/verify/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:background-jobs-queue on pg
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/pg/verify/builds_replace_unprocessed_index.sql b/sqitch/pg/verify/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..050ef75
--- /dev/null
+++ b/sqitch/pg/verify/builds_replace_unprocessed_index.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:builds_replace_unprocessed_index on pg
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/sqitch.plan b/sqitch/sqitch.plan
index cfb5b9d..f4f538b 100644
--- a/sqitch/sqitch.plan
+++ b/sqitch/sqitch.plan
@@ -46,3 +46,6 @@ agent_status_add_processor_count 2023-03-24T09:28:47Z Chris <chris@felis> # Add
remove_build_allocation_plan 2023-04-23T19:50:23Z Chris <chris@felis> # Remove build_allocation_plan
system_uptime 2023-05-05T18:18:35Z Chris <chris@felis> # Add system uptime
build_starts_index 2023-11-24T16:30:13Z Chris <chris@felis> # build_starts index
+background-jobs-queue 2025-02-06T10:49:08Z Chris <chris@fang> # Add background_jobs_queue
+builds_replace_unprocessed_index 2025-02-19T11:19:42Z Chris <chris@fang> # Replace builds_unprocessed
+allocated_builds_submit_outputs 2025-03-02T08:22:48Z Chris <chris@fang> # Add allocated_builds.submit_outputs
diff --git a/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql b/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..66d6b45
--- /dev/null
+++ b/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Deploy guix-build-coordinator:allocated_builds_submit_outputs to sqlite
+
+BEGIN;
+
+ALTER TABLE allocated_builds ADD COLUMN submit_outputs BOOLEAN DEFAULT NULL;
+
+COMMIT;
diff --git a/sqitch/sqlite/deploy/background-jobs-queue.sql b/sqitch/sqlite/deploy/background-jobs-queue.sql
new file mode 100644
index 0000000..1cb35f0
--- /dev/null
+++ b/sqitch/sqlite/deploy/background-jobs-queue.sql
@@ -0,0 +1,11 @@
+-- Deploy guix-build-coordinator:background-jobs-queue to sqlite
+
+BEGIN;
+
+CREATE TABLE background_jobs_queue (
+ id INTEGER PRIMARY KEY,
+ type TEXT NOT NULL,
+ args TEXT NOT NULL
+);
+
+COMMIT;
diff --git a/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql b/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..a404f31
--- /dev/null
+++ b/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql
@@ -0,0 +1,9 @@
+-- Deploy guix-build-coordinator:builds_replace_unprocessed_index to sqlite
+
+BEGIN;
+
+DROP INDEX builds_unprocessed;
+
+CREATE INDEX builds_live ON builds (id) WHERE processed = 0 AND canceled = 0;
+
+COMMIT;
diff --git a/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql b/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..240de22
--- /dev/null
+++ b/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:allocated_builds_submit_outputs from sqlite
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/sqlite/revert/background-jobs-queue.sql b/sqitch/sqlite/revert/background-jobs-queue.sql
new file mode 100644
index 0000000..10ef1ff
--- /dev/null
+++ b/sqitch/sqlite/revert/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:background-jobs-queue from sqlite
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql b/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..b02dd3c
--- /dev/null
+++ b/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql
@@ -0,0 +1,7 @@
+-- Revert guix-build-coordinator:builds_replace_unprocessed_index from sqlite
+
+BEGIN;
+
+-- XXX Add DDLs here.
+
+COMMIT;
diff --git a/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql b/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql
new file mode 100644
index 0000000..0b1331e
--- /dev/null
+++ b/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:allocated_builds_submit_outputs on sqlite
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/sqlite/verify/background-jobs-queue.sql b/sqitch/sqlite/verify/background-jobs-queue.sql
new file mode 100644
index 0000000..1cf9965
--- /dev/null
+++ b/sqitch/sqlite/verify/background-jobs-queue.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:background-jobs-queue on sqlite
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;
diff --git a/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql b/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql
new file mode 100644
index 0000000..d0dd086
--- /dev/null
+++ b/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql
@@ -0,0 +1,7 @@
+-- Verify guix-build-coordinator:builds_replace_unprocessed_index on sqlite
+
+BEGIN;
+
+-- XXX Add verifications here.
+
+ROLLBACK;