aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/coordinator.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator/coordinator.scm')
-rw-r--r--guix-build-coordinator/coordinator.scm996
1 files changed, 661 insertions, 335 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index a7fb664..adb7575 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -32,12 +32,16 @@
#:use-module (ice-9 match)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 binary-ports)
+ #:use-module (ice-9 suspendable-ports)
#:use-module (ice-9 format)
#:use-module (ice-9 atomic)
#:use-module (ice-9 control)
#:use-module (ice-9 threads)
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
+ #:use-module (system repl server)
+ #:use-module (system repl command)
+ #:use-module (system repl debug)
#:use-module (web uri)
#:use-module (web http)
#:use-module (oop goops)
@@ -45,13 +49,18 @@
#:use-module (logging port-log)
#:use-module (gcrypt random)
#:use-module (fibers)
+ #:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers conditions)
+ #:use-module (fibers operations)
+ #:use-module (knots)
+ #:use-module (knots timeout)
+ #:use-module (knots parallelism)
+ #:use-module (knots thread-pool)
#:use-module (prometheus)
- #:use-module ((guix build syscalls)
- #:select (set-thread-name))
#:use-module (guix store)
+ #:use-module (guix progress)
#:use-module (guix derivations)
#:use-module (guix build utils)
#:use-module (guix-build-coordinator utils)
@@ -68,6 +77,8 @@
client-error?
client-error-details
+ %build-coordinator
+
make-build-coordinator
build-coordinator-datastore
build-coordinator-hooks
@@ -99,6 +110,8 @@
build-coordinator-prompt-hook-processing-for-event
start-hook-processing-threads
+ build-coordinator-update-metrics
+
build-coordinator-allocation-plan-stats
build-coordinator-trigger-build-allocation
build-coordinator-list-allocation-plan-builds
@@ -108,7 +121,9 @@
build-log-file-location
handle-build-start-report
handle-build-result
- handle-setup-failure-report))
+ handle-setup-failure-report
+
+ build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built))
(define-exception-type &agent-error &error
make-agent-error
@@ -120,6 +135,9 @@
client-error?
(details client-error-details))
+(define %build-coordinator
+ (make-parameter #f))
+
(define-record-type <build-coordinator>
(make-build-coordinator-record datastore hooks metrics-registry
allocation-strategy allocator-channel
@@ -129,6 +147,9 @@
(hooks build-coordinator-hooks)
(hook-condvars build-coordinator-hook-condvars
set-build-coordinator-hook-condvars!)
+ (background-job-conditions
+ build-coordinator-background-job-conditions
+ set-build-coordinator-background-job-conditions!)
(metrics-registry build-coordinator-metrics-registry)
(allocation-strategy build-coordinator-allocation-strategy)
(trigger-build-allocation
@@ -141,30 +162,17 @@
(get-state-id build-coordinator-get-state-id-proc
set-build-coordinator-get-state-id-proc!)
(scheduler build-coordinator-scheduler
- set-build-coordinator-scheduler!))
+ set-build-coordinator-scheduler!)
+ (utility-thread-pool build-coordinator-utility-thread-pool
+ set-build-coordinator-utility-thread-pool!))
(set-record-type-printer!
<build-coordinator>
(lambda (build-coordinator port)
(display "#<build-coordinator>" port)))
-(define-class <custom-port-log> (<log-handler>)
- (port #:init-value #f #:accessor port #:init-keyword #:port))
-
-(define-method (emit-log (self <custom-port-log>) str)
- (when (port self)
- (put-bytevector (port self)
- (string->utf8 str))
- ;; Even though the port is line buffered, writing to it with
- ;; put-bytevector doesn't cause the buffer to be flushed.
- (force-output (port self))))
-
-(define-method (flush-log (self <custom-port-log>))
- (and=> (port self) force-output))
-
-(define-method (close-log! (self <custom-port-log>))
- (and=> (port self) close-port)
- (set! (port self) #f))
+(define %command-duration-histogram-buckets
+ (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf)))
(define %known-hooks
'(build-submitted
@@ -210,7 +218,15 @@
#f)
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (if (and
+ (exception-with-origin? exn)
+ (string=? (exception-origin exn)
+ "fport_write"))
+ #f
+ (print-backtrace-and-exception/knots exn))
+ (raise-exception exn))
(lambda ()
(match (atomic-box-ref
current-state-id-and-event-buffer-index-box)
@@ -234,16 +250,7 @@
(iota event-count-to-send
(+ 1 last-sent-state-id))))
- current-state-id)))
- (lambda (key . args)
- (if (and
- (eq? key 'system-error)
- (match args
- (("fport_write" "~A" ("Broken pipe") rest ...)
- #t)
- (_ #f)))
- #f
- (backtrace)))))
+ current-state-id)))))
#:unwind? #t)))
(unless (eq? #f new-state-id)
@@ -285,7 +292,8 @@
(if (> requested-after-state-id
current-state-id)
current-state-id
- requested-after-state-id)
+ (max 0
+ requested-after-state-id))
current-state-id)))))
(atomic-box-set!
listener-channels-box
@@ -355,6 +363,27 @@
(define (build-coordinator-get-state-id build-coordinator)
((build-coordinator-get-state-id-proc build-coordinator)))
+(define (build-coordinator-update-metrics build-coordinator)
+ (define metrics-registry
+ (build-coordinator-metrics-registry build-coordinator))
+
+ (let ((utility-thread-pool-used-thread-metric
+ (or (metrics-registry-fetch-metric
+ metrics-registry
+ "utility_thread_pool_used_thread_total")
+ (make-gauge-metric
+ metrics-registry
+ "utility_thread_pool_used_thread_total"))))
+
+ (and=> (build-coordinator-utility-thread-pool build-coordinator)
+ (lambda (utility-thread-pool)
+ (metric-set
+ utility-thread-pool-used-thread-metric
+ (vector-count
+ (lambda (_ proc)
+ (->bool proc))
+ (thread-pool-proc-vector utility-thread-pool)))))))
+
(define* (make-build-coordinator
#:key
database-uri-string
@@ -366,7 +395,7 @@
(database-uri->datastore
database-uri-string
#:metrics-registry metrics-registry
- #:worker-thread-log-exception?
+ #:thread-pool-log-exception?
(lambda (exn)
(and (not (agent-error? exn))
(not (client-error? exn))))))
@@ -443,6 +472,16 @@
(setrlimit 'core #f #f))
#:unwind? #t)
+ (let ((core-file
+ (string-append (getcwd) "/core"))
+ (metric
+ (make-gauge-metric (build-coordinator-metrics-registry
+ build-coordinator)
+ "core_dump_file_last_modified_seconds")))
+ (when (file-exists? core-file)
+ (metric-set metric
+ (stat:mtime (stat core-file)))))
+
(with-exception-handler
(lambda (exn)
(simple-format #t "failed increasing open file limit: ~A\n" exn))
@@ -457,10 +496,30 @@
(lambda (scheduler port)
(display "#<scheduler>" port)))
- (when pid-file
- (call-with-output-file pid-file
- (lambda (port)
- (simple-format port "~A\n" (getpid)))))
+ (call-with-new-thread
+ (lambda ()
+ (set-thread-name
+ (string-append "gc watcher"))
+
+ (add-hook!
+ after-gc-hook
+ (let ((last-gc-time-taken
+ (assq-ref (gc-stats) 'gc-time-taken)))
+ (lambda ()
+ (let* ((gc-time-taken
+ (assq-ref (gc-stats) 'gc-time-taken))
+ (time-since-last
+ (/ (- gc-time-taken
+ last-gc-time-taken)
+ internal-time-units-per-second)))
+ (when (> time-since-last 0.1)
+ (format (current-error-port)
+ "after gc (additional time taken: ~f)\n"
+ time-since-last))
+ (set! last-gc-time-taken
+ (assq-ref (gc-stats) 'gc-time-taken))))))
+ (while #t
+ (sleep 0.1))))
(when update-datastore?
(datastore-update (build-coordinator-datastore build-coordinator)))
@@ -469,17 +528,17 @@
build-coordinator
(make-build-allocator-thread build-coordinator))
- (set-build-coordinator-hook-condvars!
- build-coordinator
- (start-hook-processing-threads build-coordinator
- parallel-hooks))
-
(when trigger-build-allocation?
(trigger-build-allocation build-coordinator)))
(define %default-agent-uri (string->uri "http://0.0.0.0:8745"))
(define %default-client-uri (string->uri "http://127.0.0.1:8746"))
+(define %default-repl-server-port
+ ;; Default port to run REPL server on, if --listen-repl is provided
+ ;; but no port is mentioned
+ 37146)
+
(define* (run-coordinator-service build-coordinator
#:key
(update-datastore? #t)
@@ -487,7 +546,10 @@
(agent-communication-uri %default-agent-uri)
(client-communication-uri %default-client-uri)
secret-key-base
- (parallel-hooks '()))
+ (parallel-hooks '())
+ listen-repl)
+ (install-suspendable-ports!)
+
(with-fluids ((%file-port-name-canonicalization 'none))
(perform-coordinator-service-startup
build-coordinator
@@ -495,23 +557,45 @@
#:pid-file pid-file
#:parallel-hooks parallel-hooks)
+ (when listen-repl
+ (parameterize ((%build-coordinator build-coordinator))
+ (cond
+ ((or (eq? #t listen-repl)
+ (number? listen-repl))
+ (let ((listen-repl
+ (if (eq? #t listen-repl)
+ %default-repl-server-port
+ listen-repl)))
+ (format (current-error-port)
+ "REPL server listening on port ~a~%"
+ listen-repl)
+ (spawn-server (make-tcp-server-socket
+ #:port
+ (if (eq? #t listen-repl)
+ %default-repl-server-port
+ listen-repl)))))
+ (else
+ (format (current-error-port)
+ "REPL server listening on ~a~%"
+ listen-repl)
+ (spawn-server (make-unix-domain-server-socket #:path listen-repl))))))
+
;; Create some worker thread channels, which need to be created prior
;; to run-fibers being called.
(let ((output-hash-channel
(make-output-hash-channel
build-coordinator))
- (utility-thread-pool-channel
- (make-worker-thread-channel
- (const '())
+ (utility-thread-pool
+ (make-thread-pool
+ 18
#:name "utility"
- #:parallelism 10
#:delay-logger
(let ((delay-metric
(make-histogram-metric
(build-coordinator-metrics-registry build-coordinator)
"utility_thread_pool_delay_seconds")))
- (lambda (seconds-delayed)
+ (lambda (seconds-delayed proc)
(log-delay "utility thread channel"
seconds-delayed)
(metric-observe delay-metric seconds-delayed)
@@ -521,6 +605,18 @@
"warning: utility thread channel delayed by ~1,2f seconds~%"
seconds-delayed)))))))
+ (let ((metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "utility_thread_pool_thread_total")))
+ (metric-set metric
+ (vector-length
+ (thread-pool-proc-vector utility-thread-pool))))
+
+ (set-build-coordinator-utility-thread-pool!
+ build-coordinator
+ utility-thread-pool)
+
(let ((finished? (make-condition)))
(call-with-sigint
(lambda ()
@@ -542,6 +638,9 @@
(iota (length schedulers))
schedulers))
+ (set-build-coordinator-scheduler! build-coordinator
+ (current-scheduler))
+
(log-msg (build-coordinator-logger build-coordinator)
'INFO
"initialising metrics")
@@ -553,13 +652,19 @@
(datastore-spawn-fibers
(build-coordinator-datastore build-coordinator))
+ (set-build-coordinator-hook-condvars!
+ build-coordinator
+ (start-hook-processing-threads build-coordinator
+ parallel-hooks))
+
+ (set-build-coordinator-background-job-conditions!
+ build-coordinator
+ (start-background-job-processing-fibers build-coordinator))
+
(spawn-fiber-to-watch-for-deferred-builds build-coordinator)
(spawn-build-allocation-plan-management-fiber build-coordinator)
- (set-build-coordinator-scheduler! build-coordinator
- (current-scheduler))
-
(let ((events-channel
get-state-id
(make-events-channel
@@ -591,7 +696,12 @@
(uri-host client-communication-uri)
(uri-port client-communication-uri)
build-coordinator
- utility-thread-pool-channel)
+ utility-thread-pool)
+
+ (when pid-file
+ (call-with-output-file pid-file
+ (lambda (port)
+ (simple-format port "~A\n" (getpid)))))
;; Guile seems to just stop listening on ports, so try to
;; monitor that internally and just quit if it happens
@@ -650,12 +760,9 @@
derivation-file)))
(if (eq? #f system) ; derivation does not exist in database?
(build-for-output-already-exists/with-derivation?
- (with-fibers-port-timeouts
- (lambda ()
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file)))
- #:timeout 240))
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
(any
(lambda (output-details)
(let ((builds-for-output
@@ -780,12 +887,9 @@
;; derivations with no builds works
(if (datastore-find-derivation datastore derivation-file)
#f
- (with-fibers-port-timeouts
- (lambda ()
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file)))
- #:timeout 30))))
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))))
(when drv
(datastore-store-derivation datastore drv))
@@ -806,8 +910,9 @@
;; time too.
(cons related-drv (random-v4-uuid)))
related-derivations-lacking-builds))
- #:duration-metric-name
- "store_build")
+ #:priority 'low
+ #:duration-metric-name "store_build"
+ #:duration-metric-buckets %command-duration-histogram-buckets)
(#t ; build submitted
(build-coordinator-prompt-hook-processing-for-event
build-coordinator
@@ -837,7 +942,8 @@
(stop-condition
stop-condition)))))
(stop-condition
- stop-condition)))))
+ stop-condition)))
+ #:buckets %command-duration-histogram-buckets))
(define* (cancel-build build-coordinator uuid
#:key (ignore-if-build-required-by-another? #t)
@@ -850,6 +956,10 @@
datastore
(lambda (db)
(let ((build-details (datastore-find-build datastore uuid)))
+ (unless build-details
+ (raise-exception
+ (make-client-error 'build-unknown)))
+
(when (assq-ref build-details 'canceled)
(raise-exception
(make-client-error 'build-already-canceled)))
@@ -872,7 +982,10 @@
(datastore-insert-unprocessed-hook-event datastore
"build-canceled"
(list uuid))
- 'build-canceled))))
+ 'build-canceled)
+ #:priority 'low
+ #:duration-metric-name "cancel_build"
+ #:duration-metric-buckets %command-duration-histogram-buckets)))
(when (eq? val 'build-canceled)
(unless skip-updating-derived-priorities?
@@ -893,6 +1006,10 @@
val))
+ (unless (datastore-find-build datastore uuid)
+ (raise-exception
+ (make-client-error 'build-unknown)))
+
(if ignore-if-build-required-by-another?
(let ((build-required
;; Do this check here outside the transaction to avoid having to
@@ -915,6 +1032,10 @@
datastore
(lambda (db)
(let ((build-details (datastore-find-build datastore uuid)))
+ (unless build-details
+ (raise-exception
+ (make-client-error 'build-unknown)))
+
(when (assq-ref build-details 'canceled)
(raise-exception
(make-client-error 'build-already-canceled)))
@@ -929,7 +1050,10 @@
new-priority
#:skip-updating-derived-priorities?
skip-updating-derived-priorities?
- #:override-derived-priority override-derived-priority)))
+ #:override-derived-priority override-derived-priority))
+ #:priority 'low
+ #:duration-metric-name "update_build_priority"
+ #:duration-metric-buckets %command-duration-histogram-buckets)
(trigger-build-allocation build-coordinator)
@@ -1012,7 +1136,17 @@
(and=> (assoc-ref (build-coordinator-hook-condvars build-coordinator)
event-name)
(lambda (condvar)
- (signal-condition-variable condvar)
+ (cond
+ ((condition-variable? condvar)
+ (signal-condition-variable condvar))
+ ((reusable-condition? condvar)
+ (signal-reusable-condition!
+ condvar
+ (build-coordinator-scheduler build-coordinator)))
+ (else
+ (error
+ (simple-format #f "unrecognised condvar ~A"
+ condvar))))
#t)))
(define (update-build-allocation-plan build-coordinator)
@@ -1074,24 +1208,19 @@
(lambda ()
(with-exception-handler
(lambda (exn)
- (simple-format
- (current-error-port)
- "build-allocator-thread: exception: ~A\n"
- exn)
(metric-increment failure-counter-metric)
(atomic-box-set! allocation-needed #t))
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "error in build allocator thread\n")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
(lambda ()
(update-build-allocation-plan build-coordinator)
- (metric-increment success-counter-metric))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "error in build allocator thread: ~A ~A\n"
- key
- args)
- (backtrace))))
+ (metric-increment success-counter-metric))))
#:unwind? #t))
#:buckets ((@@ (prometheus) exponential-histogram-buckets) ; TODO
#:start 1
@@ -1139,12 +1268,14 @@
(lambda ()
(while #t
(with-exception-handler
- (lambda (exn)
- (simple-format (current-error-port)
- "exception in allocation plan fiber: ~A\n"
- exn))
+ (lambda _ #f)
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "exception in allocation plan fiber\n")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
(lambda ()
(match (get-message (build-coordinator-allocator-channel coordinator))
(('stats reply)
@@ -1187,9 +1318,7 @@
(update-build-allocation-plan-metrics!)
- (put-message reply #t))))
- (lambda _
- (backtrace))))
+ (put-message reply #t))))))
#:unwind? #t)))))
(define (build-coordinator-allocation-plan-stats coordinator)
@@ -1198,6 +1327,11 @@
(list 'stats reply))
(get-message reply)))
+(define (build-coordinator-count-allocation-plan-builds coordinator agent-id)
+ (or (assoc-ref (build-coordinator-allocation-plan-stats coordinator)
+ agent-id)
+ 0))
+
(define (build-coordinator-fetch-agent-allocation-plan coordinator agent-id)
(let ((reply (make-channel)))
(put-message (build-coordinator-allocator-channel coordinator)
@@ -1275,7 +1409,8 @@
(define* (build-coordinator-list-allocation-plan-builds coordinator
agent-id
- #:key limit)
+ #:key limit
+ filter?)
(define (take* lst i)
(if (< (length lst) i)
lst
@@ -1284,31 +1419,51 @@
(define datastore
(build-coordinator-datastore coordinator))
+ (define (build-data uuid
+ derivation-name
+ derived-priority
+ build-details)
+ `((uuid . ,uuid)
+ (derivation_name . ,derivation-name)
+ (system . ,(datastore-find-build-derivation-system
+ datastore
+ uuid))
+ (priority . ,(assq-ref build-details 'priority))
+ (derived_priority . ,derived-priority)
+ (tags . ,(vector-map
+ (lambda (_ tag)
+ (match tag
+ ((key . value)
+ `((key . ,key)
+ (value . ,value)))))
+ (datastore-fetch-build-tags
+ datastore
+ uuid)))))
+
(let ((build-ids
(build-coordinator-fetch-agent-allocation-plan coordinator
agent-id)))
(filter-map
(lambda (build-id)
- (match (datastore-fetch-build-to-allocate datastore build-id)
- (#(uuid derivation_id derivation_name derived_priority)
- (let ((build-details (datastore-find-build datastore uuid)))
- `((uuid . ,uuid)
- (derivation_name . ,derivation_name)
- (system . ,(datastore-find-build-derivation-system
- datastore
- uuid))
- (priority . ,(assq-ref build-details 'priority))
- (derived_priority . ,derived_priority)
- (tags . ,(vector-map
- (lambda (_ tag)
- (match tag
- ((key . value)
- `((key . ,key)
- (value . ,value)))))
- (datastore-fetch-build-tags
- datastore
- uuid))))))
- (#f #f)))
+ (if filter?
+ (match (datastore-fetch-build-to-allocate datastore build-id)
+ (#(uuid derivation_id derivation_name derived_priority)
+ (let ((build-details (datastore-find-build datastore uuid)))
+ (build-data uuid
+ derivation_name
+ derived_priority
+ build-details)))
+ (#f #f))
+ (let ((build-details (datastore-find-build datastore build-id))
+ (unprocessed-builds-entry
+ (datastore-find-unprocessed-build-entry
+ datastore
+ build-id)))
+ (build-data build-id
+ (assq-ref build-details 'derivation-name)
+ (assq-ref unprocessed-builds-entry
+ 'derived-priority)
+ build-details))))
(if limit
(take* build-ids limit)
build-ids))))
@@ -1359,6 +1514,18 @@
"hook_failure_total"
#:labels '(event)))
+ (define process-events-thread-pool-thread-total-metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "hook_thread_pool_thread_total"
+ #:labels '(event)))
+
+ (define process-events-thread-pool-used-thread-total-metric
+ (make-gauge-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "hook_thread_pool_used_thread_total"
+ #:labels '(event)))
+
(define (process-event id event arguments handler)
(log-msg (build-coordinator-logger build-coordinator)
'DEBUG
@@ -1367,10 +1534,6 @@
(and
(with-exception-handler
(lambda (exn)
- (log-msg (build-coordinator-logger build-coordinator)
- 'ERROR
- "error running " event " (" id ") hook: "
- exn)
(metric-increment failure-counter-metric
#:label-values
`((event . ,event)))
@@ -1381,25 +1544,34 @@
(build-coordinator-metrics-registry build-coordinator)
"hook_duration_seconds"
(lambda ()
- (with-throw-handler #t
+ (with-exception-handler
+ (lambda (exn)
+ (let* ((stack
+ (match (fluid-ref %stacks)
+ ((stack-tag . prompt-tag)
+ (make-stack #t
+ 0 prompt-tag
+ 0 (and prompt-tag 1)))))
+ (backtrace
+ (call-with-output-string
+ (lambda (port)
+ (print-frames (stack->vector stack)
+ port
+ #:count (stack-length stack))
+ (print-exception
+ port
+ (stack-ref stack 4)
+ '%exception
+ (list exn))))))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'ERROR
+ "error running " event " (" id ") hook\n"
+ backtrace))
+ (raise-exception exn))
(lambda ()
(start-stack
- 'hook
- (apply handler build-coordinator arguments)))
- (lambda (key . args)
- (log-msg (build-coordinator-logger build-coordinator)
- 'ERROR
- "error running " event " (" id ") hook: "
- key " " args)
- (let* ((stack (make-stack #t 3))
- (backtrace
- (call-with-output-string
- (lambda (port)
- (display-backtrace stack port)
- (newline port)))))
- (display
- backtrace
- (current-output-port))))))
+ #t
+ (apply handler build-coordinator arguments)))))
#:labels '(event)
#:label-values `((event . ,event)))
#t)
@@ -1425,84 +1597,116 @@
(define (single-thread-process-events event-name handler)
(let ((mtx (make-mutex))
(condvar (make-condition-variable)))
- (call-with-new-thread
+ (call-with-default-io-waiters
(lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (symbol->string event-name)))
- (const #t))
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (symbol->string event-name)))
+ (const #t))
+
+ (lock-mutex mtx)
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (sleep 10))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg
+ (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "error in " event-name " hook processing thread")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
+ (lambda ()
+ (while #t
+ (match (datastore-list-unprocessed-hook-events datastore event-name 1)
+ (()
+ (wait-condition-variable condvar mtx))
+ (((id event arguments))
+ (process-event id event arguments handler)))))))
+ #:unwind? #t))))))
+ condvar))
- (lock-mutex mtx)
- (while #t
- (with-exception-handler
- (lambda (exn)
- (with-exception-handler
- (lambda _
- ;; Things are really going wrong if logging about
- ;; the hook processing thread crashing, also raises
- ;; an exception, so just try and sleep and hope
- ;; things go better next time
- (sleep 10))
- (lambda ()
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "hook processing thread " event-name
- " exception: " exn))
- #:unwind? #t)
- (sleep 10))
- (lambda ()
- (with-throw-handler #t
+ (define (thread-pool-process-events event-name handler thread-count)
+ (let ((thread-pool
+ (make-thread-pool
+ thread-count
+ #:name (simple-format #f "~A" event-name)))
+ (reusable-condition
+ (make-reusable-condition))
+ (coordination-channel
+ (make-channel)))
+
+ (metric-set process-events-thread-pool-thread-total-metric
+ thread-count
+ #:label-values `((event . ,event-name)))
+
+ (spawn-fiber
+ (lambda ()
+ (let loop ((running-ids '()))
+ (metric-set process-events-thread-pool-used-thread-total-metric
+ (vector-count
+ (lambda (_ proc)
+ (->bool proc))
+ (thread-pool-proc-vector thread-pool))
+ #:label-values `((event . ,event-name)))
+ (match (get-message coordination-channel)
+ (('process id event arguments)
+ (if (member id running-ids)
+ ;; Ignore already running jobs
+ (loop running-ids)
+ (begin
+ (spawn-fiber
+ (lambda ()
+ (call-with-thread
+ thread-pool
+ (lambda ()
+ (process-event id event arguments handler))
+ ;; TODO Make this the default through knots
+ #:timeout #f)
+ (put-message coordination-channel
+ (list 'finished id))))
+ (loop (cons id running-ids)))))
+ (('finished id)
+ (when (< (length running-ids)
+ (* 2 thread-count))
+ (signal-reusable-condition! reusable-condition))
+ (loop (delete id running-ids)))
+ (('count-running reply)
+ (let ((count (length running-ids)))
+ (spawn-fiber
(lambda ()
- (while #t
- (match (datastore-list-unprocessed-hook-events datastore event-name 1)
- (()
- (wait-condition-variable condvar mtx))
- (((id event arguments))
- (process-event id event arguments handler)))))
- (lambda (key . args)
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "error in " event-name " hook processing thread: " key " " args)
- (backtrace))))
- #:unwind? #t))))
- condvar))
+ (put-message reply count))))
+ (loop running-ids))))))
- (define (work-queue-process-events event-name handler thread-count)
- (let-values (((pool-mutex job-available count-threads list-jobs)
- (create-thread-pool
- (lambda ()
- (max
- 1
- (length
- (datastore-list-unprocessed-hook-events
- datastore
- event-name
- thread-count))))
- (lambda (running-jobs)
- (let* ((in-progress-ids
- (map car running-jobs))
- (potential-jobs
- (datastore-list-unprocessed-hook-events
- datastore
- event-name
- (+ 1 (length in-progress-ids))))
- (job
- (find
- (match-lambda
- ((id rest ...)
- (not (member id in-progress-ids))))
- potential-jobs)))
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'DEBUG
- event-name " work queue, got job " job)
- job))
- (lambda (id event arguments)
- (process-event id event arguments handler))
- #:name (symbol->string event-name))))
- job-available))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let ((count
+ (let ((channel (make-channel)))
+ (put-message coordination-channel
+ (list 'count-running channel))
+ (get-message channel))))
+ (when (< count (* 2 thread-count))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG "submitting batch of " event-name " hook events")
+ (for-each
+ (match-lambda
+ ((id event arguments)
+ (put-message coordination-channel
+ (list 'process id event arguments))))
+ (datastore-list-unprocessed-hook-events
+ datastore
+ event-name
+ (* 20 thread-count)))))
+ (reusable-condition-wait reusable-condition
+ #:timeout 60))))
+
+ reusable-condition))
(map
(match-lambda
@@ -1511,15 +1715,14 @@
(or (and=>
(assq-ref parallel-hooks event-name)
(lambda (thread-count)
- (work-queue-process-events event-name
- handler
- thread-count)))
+ (thread-pool-process-events event-name
+ handler
+ thread-count)))
(single-thread-process-events event-name
handler)))))
(build-coordinator-hooks build-coordinator)))
-(define (fetch-builds build-coordinator agent systems
- max-builds deprecated-requested-count)
+(define (fetch-builds build-coordinator agent systems max-builds)
(define datastore
(build-coordinator-datastore build-coordinator))
@@ -1528,10 +1731,14 @@
build-coordinator agent-id)))
(if build-details
(let ((build-id (assq-ref build-details 'uuid)))
- (datastore-insert-to-allocated-builds datastore agent-id (list build-id))
+ (datastore-insert-to-allocated-builds
+ datastore
+ agent-id
+ build-id)
(build-coordinator-remove-build-from-allocation-plan
build-coordinator build-id)
- build-details)
+ `(,@build-details
+ (submit_outputs . null)))
#f)))
(define (allocate-several-builds agent-id count)
@@ -1555,28 +1762,68 @@
(datastore-list-agent-builds datastore agent))
(start-count
(length initially-allocated-builds))
- (target-count (or max-builds
- (+ start-count
- deprecated-requested-count))))
+ (target-count max-builds))
(if (< start-count target-count)
(let ((new-builds
(allocate-several-builds agent
(- target-count start-count))))
- ;; Previously allocate builds just returned newly allocated
- ;; builds, but if max-builds is provided, return all the
- ;; builds. This means the agent can handle this in a idempotent
- ;; manor.
- (if max-builds
- (append initially-allocated-builds
- new-builds)
- new-builds))
- ;; Previously allocate builds just returned newly allocated builds,
- ;; but if max-builds is provided, return all the builds. This means
- ;; the agent can handle this in a idempotent manor.
- (if max-builds
- initially-allocated-builds
- '()))))
- #:duration-metric-name "allocate_builds_to_agent"))
+ (if (null? new-builds)
+ (values initially-allocated-builds
+ #f)
+ (values (append initially-allocated-builds
+ new-builds)
+ #t)))
+ (values initially-allocated-builds
+ #f))))
+ #:duration-metric-name "allocate_builds_to_agent"
+ #:duration-metric-buckets %command-duration-histogram-buckets))
+
+ (define (send-agent-builds-allocated-event builds)
+ (build-coordinator-send-event
+ build-coordinator
+ "agent-builds-allocated"
+ `((agent_id . ,agent)
+ (builds . ,(list->vector
+ (map
+ (lambda (build)
+ `(,@build
+ (tags
+ . ,(list->vector
+ (map
+ (match-lambda
+ ((key . value)
+ `((key . ,key)
+ (value . ,value))))
+ (vector->list
+ (datastore-fetch-build-tags
+ datastore
+ (assq-ref build 'uuid))))))))
+ builds))))))
+
+ (define (submit-outputs? build)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "build-submit-outputs hook raised exception")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
+ (lambda ()
+ (let ((hook-result
+ (call-with-delay-logging
+ (lambda ()
+ (build-submit-outputs-hook
+ build-coordinator
+ (assq-ref build 'uuid))))))
+ (if (boolean? hook-result)
+ hook-result
+ (begin
+ (log-msg
+ (build-coordinator-logger build-coordinator)
+ 'CRITICAL
+ "build-submit-outputs hook returned non boolean: "
+ hook-result)
+ #t))))))
(call-with-duration-metric
(build-coordinator-metrics-registry build-coordinator)
@@ -1592,111 +1839,42 @@
(trigger-build-allocation build-coordinator)))
(let ((builds
- (get-builds)))
-
- (build-coordinator-send-event
- build-coordinator
- "agent-builds-allocated"
- `((agent_id . ,agent)
- (builds . ,(list->vector
- (map
- (lambda (build)
- `(,@build
- (tags
- . ,(list->vector
- (map
- (match-lambda
- ((key . value)
- `((key . ,key)
- (value . ,value))))
- (vector->list
- (datastore-fetch-build-tags
- datastore
- (assq-ref build 'uuid))))))))
- builds)))))
-
- (map (lambda (build)
- (define submit-outputs?
- (with-exception-handler
- (lambda (exn)
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook raised exception: "
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (let ((hook-result
- (call-with-delay-logging
- (lambda ()
- (build-submit-outputs-hook
- build-coordinator
- (assq-ref build 'uuid))))))
- (if (boolean? hook-result)
- hook-result
- (begin
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook returned non boolean: "
- hook-result)
- #t))))
- (lambda (key . args)
- (backtrace))))
- #:unwind? #t))
-
- `(,@build
- ;; TODO This needs reconsidering when things having been built in
- ;; the past doesn't necessarily mean they're still available.
- (submit_outputs . ,submit-outputs?)))
- builds)))))))
+ new-builds-allocated?
+ (if (= 0
+ (build-coordinator-count-allocation-plan-builds
+ build-coordinator
+ agent))
+ (values
+ (datastore-list-agent-builds datastore agent)
+ #f)
+ (get-builds))))
+
+ (when new-builds-allocated?
+ (send-agent-builds-allocated-event builds))
+
+ (map
+ (lambda (build)
+ (if (eq? 'null (assq-ref build 'submit_outputs))
+ (let ((submit-outputs? (submit-outputs? build)))
+ (datastore-update-allocated-build-submit-outputs
+ (build-coordinator-datastore build-coordinator)
+ (assq-ref build 'uuid)
+ submit-outputs?)
+
+ `(,@(alist-delete 'submit_outputs build)
+ (submit_outputs . ,submit-outputs?)))
+ build))
+ builds)))))))
(define (agent-details build-coordinator agent-id)
(define datastore
(build-coordinator-datastore build-coordinator))
- (define build-submit-outputs-hook
- (assq-ref (build-coordinator-hooks build-coordinator)
- 'build-submit-outputs))
-
- (define (submit-outputs? build)
- (with-exception-handler
- (lambda (exn)
- (log-msg (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook raised exception: "
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (let ((hook-result
- (call-with-delay-logging
- (lambda ()
- (build-submit-outputs-hook
- build-coordinator
- (assq-ref build 'uuid))))))
- (if (boolean? hook-result)
- hook-result
- (begin
- (log-msg
- (build-coordinator-logger build-coordinator)
- 'CRITICAL
- "build-submit-outputs hook returned non boolean: "
- hook-result)
- #t))))
- (lambda (key . args)
- (backtrace))))
- #:unwind? #t))
-
(let ((agent (datastore-find-agent datastore agent-id))
(allocated-builds (datastore-list-agent-builds datastore agent-id)))
`(,@agent ; description
- (builds . ,(list->vector
- (map (lambda (build)
- `(,@build
- (submit_outputs . ,(submit-outputs? build))))
- allocated-builds))))))
+ (builds . ,(list->vector allocated-builds)))))
(define (build-data-location build-id )
(string-append (%config 'builds-dir) "/"
@@ -1845,10 +2023,11 @@
(list build-id))
(when success?
- (datastore-delete-relevant-outputs-from-unbuilt-outputs
+ (datastore-insert-background-job
datastore
- build-id)
- (datastore-update-unprocessed-builds-for-build-success
+ 'build-success
+ (list build-id))
+ (datastore-delete-relevant-outputs-from-unbuilt-outputs
datastore
build-id)
(datastore-store-output-metadata
@@ -1858,7 +2037,8 @@
(assoc-ref result-json "outputs"))))
#f))))
- #:duration-metric-name "store_build_result")))
+ #:duration-metric-name "store_build_result"
+ #:duration-metric-buckets %command-duration-histogram-buckets)))
(when exception
;; Raise the exception here to avoid aborting the transaction
(raise-exception exception)))
@@ -1872,6 +2052,11 @@
'build-success
'build-failure))
+ (when success?
+ (build-coordinator-trigger-background-job-processing
+ build-coordinator
+ 'build-success))
+
(build-coordinator-send-event
build-coordinator
(if success?
@@ -1884,7 +2069,8 @@
;; could change the allocation
(trigger-build-allocation build-coordinator)
- #t))))
+ #t))
+ #:buckets %command-duration-histogram-buckets))
(define (handle-build-start-report build-coordinator
agent-id
@@ -1903,7 +2089,8 @@
build-coordinator
'build-started
`((build_id . ,build-id)
- (agent_id . ,agent-id))))))
+ (agent_id . ,agent-id))))
+ #:buckets %command-duration-histogram-buckets))
(define (handle-setup-failure-report build-coordinator
agent-id build-id report-json)
@@ -1939,3 +2126,142 @@
;; Trigger build allocation, so that the allocator can handle this setup
;; failure
(trigger-build-allocation build-coordinator))
+
+(define (build-coordinator-trigger-background-job-processing
+ build-coordinator
+ type)
+ (let ((condition
+ (assq-ref (build-coordinator-background-job-conditions
+ build-coordinator)
+ type)))
+ (unless condition
+ (error
+ (simple-format #f "unknown condition ~A" type)))
+ (signal-reusable-condition! condition)))
+
+(define (start-background-job-processing-fibers build-coordinator)
+ (define %background-job-duration-histogram-buckets
+ (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf)))
+
+ (define* (start-job-fibers type proc #:key (parallelism 1))
+ (let ((coordination-channel
+ (make-channel))
+ (condition
+ (make-reusable-condition))
+ (process-in-fiber
+ (fiberize
+ (lambda args
+ (call-with-duration-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "coordinator_background_job_duration_seconds"
+ (lambda ()
+ (call-with-delay-logging proc #:args args))
+ #:labels '(name)
+ #:label-values `((name . ,type))
+ #:buckets %background-job-duration-histogram-buckets))
+ #:parallelism parallelism))
+ (job-exception-counter-metric
+ (make-counter-metric
+ (build-coordinator-metrics-registry build-coordinator)
+ "coordinator_background_job_failures_total"
+ #:labels '(name))))
+
+ (define (process id . args)
+ (spawn-fiber
+ (lambda ()
+ (let loop ((retry 0))
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG
+ "processing " type " background job (id: "
+ id ", args: " args ", retry: " retry ")")
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'WARN
+ type " background job error (id: "
+ id "): " exn)
+ #f)
+ (lambda ()
+ (apply process-in-fiber args))
+ #:unwind? #t)))
+ (if success?
+ (begin
+ (datastore-delete-background-job
+ (build-coordinator-datastore build-coordinator)
+ id)
+ (put-message coordination-channel
+ (list 'job-finished id)))
+ (begin
+ (metric-increment job-exception-counter-metric
+ #:label-values `((name . ,type)))
+ (sleep 30)
+ (loop (+ 1 retry)))))))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let ((job-details
+ (datastore-select-background-jobs
+ (build-coordinator-datastore build-coordinator)
+ type
+ #:limit (* 2 parallelism))))
+
+ (unless (null? job-details)
+ (put-message coordination-channel
+ (list 'process job-details)))
+
+ (reusable-condition-wait condition
+ #:timeout 30)))))
+
+ (spawn-fiber
+ (lambda ()
+ (let loop ((running-job-ids '()))
+ (match (get-message coordination-channel)
+ (('process jobs)
+ (let* ((job-ids (map (lambda (job)
+ (assq-ref job 'id))
+ jobs))
+ (new-ids
+ (lset-difference = job-ids running-job-ids))
+ (jobs-to-start
+ (take new-ids
+ (min
+ (- parallelism
+ (length running-job-ids))
+ (length new-ids)))))
+ (for-each (lambda (job)
+ (apply process
+ (assq-ref job 'id)
+ (assq-ref job 'args)))
+ (filter
+ (lambda (job-details)
+ (member (assq-ref job-details 'id)
+ jobs-to-start))
+ jobs))
+ (loop (append running-job-ids
+ jobs-to-start))))
+ (('job-finished id)
+ ;; Maybe not very efficient, but should work
+ (signal-reusable-condition! condition)
+ (log-msg (build-coordinator-logger build-coordinator)
+ 'DEBUG type
+ " background job " id
+ " finished successfully")
+ (loop (delete id running-job-ids)))))))
+
+ condition))
+
+ `((build-success . ,(start-job-fibers
+ 'build-success
+ (lambda (build-id)
+ (datastore-update-unprocessed-builds-for-build-success
+ (build-coordinator-datastore build-coordinator)
+ build-id))
+ #:parallelism 24))))
+
+(define (build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built
+ build-coordinator)
+ (datastore-check-and-correct-unprocessed-builds-all-inputs-built
+ (build-coordinator-datastore build-coordinator)
+ #:progress-reporter progress-reporter/bar))