diff options
Diffstat (limited to 'guix-build-coordinator/coordinator.scm')
-rw-r--r-- | guix-build-coordinator/coordinator.scm | 996 |
1 files changed, 661 insertions, 335 deletions
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index a7fb664..adb7575 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -32,12 +32,16 @@ #:use-module (ice-9 match) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) + #:use-module (ice-9 suspendable-ports) #:use-module (ice-9 format) #:use-module (ice-9 atomic) #:use-module (ice-9 control) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) + #:use-module (system repl server) + #:use-module (system repl command) + #:use-module (system repl debug) #:use-module (web uri) #:use-module (web http) #:use-module (oop goops) @@ -45,13 +49,18 @@ #:use-module (logging port-log) #:use-module (gcrypt random) #:use-module (fibers) + #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers conditions) + #:use-module (fibers operations) + #:use-module (knots) + #:use-module (knots timeout) + #:use-module (knots parallelism) + #:use-module (knots thread-pool) #:use-module (prometheus) - #:use-module ((guix build syscalls) - #:select (set-thread-name)) #:use-module (guix store) + #:use-module (guix progress) #:use-module (guix derivations) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) @@ -68,6 +77,8 @@ client-error? client-error-details + %build-coordinator + make-build-coordinator build-coordinator-datastore build-coordinator-hooks @@ -99,6 +110,8 @@ build-coordinator-prompt-hook-processing-for-event start-hook-processing-threads + build-coordinator-update-metrics + build-coordinator-allocation-plan-stats build-coordinator-trigger-build-allocation build-coordinator-list-allocation-plan-builds @@ -108,7 +121,9 @@ build-log-file-location handle-build-start-report handle-build-result - handle-setup-failure-report)) + handle-setup-failure-report + + build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built)) (define-exception-type &agent-error &error make-agent-error @@ -120,6 +135,9 @@ client-error? (details client-error-details)) +(define %build-coordinator + (make-parameter #f)) + (define-record-type <build-coordinator> (make-build-coordinator-record datastore hooks metrics-registry allocation-strategy allocator-channel @@ -129,6 +147,9 @@ (hooks build-coordinator-hooks) (hook-condvars build-coordinator-hook-condvars set-build-coordinator-hook-condvars!) + (background-job-conditions + build-coordinator-background-job-conditions + set-build-coordinator-background-job-conditions!) (metrics-registry build-coordinator-metrics-registry) (allocation-strategy build-coordinator-allocation-strategy) (trigger-build-allocation @@ -141,30 +162,17 @@ (get-state-id build-coordinator-get-state-id-proc set-build-coordinator-get-state-id-proc!) (scheduler build-coordinator-scheduler - set-build-coordinator-scheduler!)) + set-build-coordinator-scheduler!) + (utility-thread-pool build-coordinator-utility-thread-pool + set-build-coordinator-utility-thread-pool!)) (set-record-type-printer! <build-coordinator> (lambda (build-coordinator port) (display "#<build-coordinator>" port))) -(define-class <custom-port-log> (<log-handler>) - (port #:init-value #f #:accessor port #:init-keyword #:port)) - -(define-method (emit-log (self <custom-port-log>) str) - (when (port self) - (put-bytevector (port self) - (string->utf8 str)) - ;; Even though the port is line buffered, writing to it with - ;; put-bytevector doesn't cause the buffer to be flushed. - (force-output (port self)))) - -(define-method (flush-log (self <custom-port-log>)) - (and=> (port self) force-output)) - -(define-method (close-log! (self <custom-port-log>)) - (and=> (port self) close-port) - (set! (port self) #f)) +(define %command-duration-histogram-buckets + (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf))) (define %known-hooks '(build-submitted @@ -210,7 +218,15 @@ #f) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (if (and + (exception-with-origin? exn) + (string=? (exception-origin exn) + "fport_write")) + #f + (print-backtrace-and-exception/knots exn)) + (raise-exception exn)) (lambda () (match (atomic-box-ref current-state-id-and-event-buffer-index-box) @@ -234,16 +250,7 @@ (iota event-count-to-send (+ 1 last-sent-state-id)))) - current-state-id))) - (lambda (key . args) - (if (and - (eq? key 'system-error) - (match args - (("fport_write" "~A" ("Broken pipe") rest ...) - #t) - (_ #f))) - #f - (backtrace))))) + current-state-id))))) #:unwind? #t))) (unless (eq? #f new-state-id) @@ -285,7 +292,8 @@ (if (> requested-after-state-id current-state-id) current-state-id - requested-after-state-id) + (max 0 + requested-after-state-id)) current-state-id))))) (atomic-box-set! listener-channels-box @@ -355,6 +363,27 @@ (define (build-coordinator-get-state-id build-coordinator) ((build-coordinator-get-state-id-proc build-coordinator))) +(define (build-coordinator-update-metrics build-coordinator) + (define metrics-registry + (build-coordinator-metrics-registry build-coordinator)) + + (let ((utility-thread-pool-used-thread-metric + (or (metrics-registry-fetch-metric + metrics-registry + "utility_thread_pool_used_thread_total") + (make-gauge-metric + metrics-registry + "utility_thread_pool_used_thread_total")))) + + (and=> (build-coordinator-utility-thread-pool build-coordinator) + (lambda (utility-thread-pool) + (metric-set + utility-thread-pool-used-thread-metric + (vector-count + (lambda (_ proc) + (->bool proc)) + (thread-pool-proc-vector utility-thread-pool))))))) + (define* (make-build-coordinator #:key database-uri-string @@ -366,7 +395,7 @@ (database-uri->datastore database-uri-string #:metrics-registry metrics-registry - #:worker-thread-log-exception? + #:thread-pool-log-exception? (lambda (exn) (and (not (agent-error? exn)) (not (client-error? exn)))))) @@ -443,6 +472,16 @@ (setrlimit 'core #f #f)) #:unwind? #t) + (let ((core-file + (string-append (getcwd) "/core")) + (metric + (make-gauge-metric (build-coordinator-metrics-registry + build-coordinator) + "core_dump_file_last_modified_seconds"))) + (when (file-exists? core-file) + (metric-set metric + (stat:mtime (stat core-file))))) + (with-exception-handler (lambda (exn) (simple-format #t "failed increasing open file limit: ~A\n" exn)) @@ -457,10 +496,30 @@ (lambda (scheduler port) (display "#<scheduler>" port))) - (when pid-file - (call-with-output-file pid-file - (lambda (port) - (simple-format port "~A\n" (getpid))))) + (call-with-new-thread + (lambda () + (set-thread-name + (string-append "gc watcher")) + + (add-hook! + after-gc-hook + (let ((last-gc-time-taken + (assq-ref (gc-stats) 'gc-time-taken))) + (lambda () + (let* ((gc-time-taken + (assq-ref (gc-stats) 'gc-time-taken)) + (time-since-last + (/ (- gc-time-taken + last-gc-time-taken) + internal-time-units-per-second))) + (when (> time-since-last 0.1) + (format (current-error-port) + "after gc (additional time taken: ~f)\n" + time-since-last)) + (set! last-gc-time-taken + (assq-ref (gc-stats) 'gc-time-taken)))))) + (while #t + (sleep 0.1)))) (when update-datastore? (datastore-update (build-coordinator-datastore build-coordinator))) @@ -469,17 +528,17 @@ build-coordinator (make-build-allocator-thread build-coordinator)) - (set-build-coordinator-hook-condvars! - build-coordinator - (start-hook-processing-threads build-coordinator - parallel-hooks)) - (when trigger-build-allocation? (trigger-build-allocation build-coordinator))) (define %default-agent-uri (string->uri "http://0.0.0.0:8745")) (define %default-client-uri (string->uri "http://127.0.0.1:8746")) +(define %default-repl-server-port + ;; Default port to run REPL server on, if --listen-repl is provided + ;; but no port is mentioned + 37146) + (define* (run-coordinator-service build-coordinator #:key (update-datastore? #t) @@ -487,7 +546,10 @@ (agent-communication-uri %default-agent-uri) (client-communication-uri %default-client-uri) secret-key-base - (parallel-hooks '())) + (parallel-hooks '()) + listen-repl) + (install-suspendable-ports!) + (with-fluids ((%file-port-name-canonicalization 'none)) (perform-coordinator-service-startup build-coordinator @@ -495,23 +557,45 @@ #:pid-file pid-file #:parallel-hooks parallel-hooks) + (when listen-repl + (parameterize ((%build-coordinator build-coordinator)) + (cond + ((or (eq? #t listen-repl) + (number? listen-repl)) + (let ((listen-repl + (if (eq? #t listen-repl) + %default-repl-server-port + listen-repl))) + (format (current-error-port) + "REPL server listening on port ~a~%" + listen-repl) + (spawn-server (make-tcp-server-socket + #:port + (if (eq? #t listen-repl) + %default-repl-server-port + listen-repl))))) + (else + (format (current-error-port) + "REPL server listening on ~a~%" + listen-repl) + (spawn-server (make-unix-domain-server-socket #:path listen-repl)))))) + ;; Create some worker thread channels, which need to be created prior ;; to run-fibers being called. (let ((output-hash-channel (make-output-hash-channel build-coordinator)) - (utility-thread-pool-channel - (make-worker-thread-channel - (const '()) + (utility-thread-pool + (make-thread-pool + 18 #:name "utility" - #:parallelism 10 #:delay-logger (let ((delay-metric (make-histogram-metric (build-coordinator-metrics-registry build-coordinator) "utility_thread_pool_delay_seconds"))) - (lambda (seconds-delayed) + (lambda (seconds-delayed proc) (log-delay "utility thread channel" seconds-delayed) (metric-observe delay-metric seconds-delayed) @@ -521,6 +605,18 @@ "warning: utility thread channel delayed by ~1,2f seconds~%" seconds-delayed))))))) + (let ((metric + (make-gauge-metric + (build-coordinator-metrics-registry build-coordinator) + "utility_thread_pool_thread_total"))) + (metric-set metric + (vector-length + (thread-pool-proc-vector utility-thread-pool)))) + + (set-build-coordinator-utility-thread-pool! + build-coordinator + utility-thread-pool) + (let ((finished? (make-condition))) (call-with-sigint (lambda () @@ -542,6 +638,9 @@ (iota (length schedulers)) schedulers)) + (set-build-coordinator-scheduler! build-coordinator + (current-scheduler)) + (log-msg (build-coordinator-logger build-coordinator) 'INFO "initialising metrics") @@ -553,13 +652,19 @@ (datastore-spawn-fibers (build-coordinator-datastore build-coordinator)) + (set-build-coordinator-hook-condvars! + build-coordinator + (start-hook-processing-threads build-coordinator + parallel-hooks)) + + (set-build-coordinator-background-job-conditions! + build-coordinator + (start-background-job-processing-fibers build-coordinator)) + (spawn-fiber-to-watch-for-deferred-builds build-coordinator) (spawn-build-allocation-plan-management-fiber build-coordinator) - (set-build-coordinator-scheduler! build-coordinator - (current-scheduler)) - (let ((events-channel get-state-id (make-events-channel @@ -591,7 +696,12 @@ (uri-host client-communication-uri) (uri-port client-communication-uri) build-coordinator - utility-thread-pool-channel) + utility-thread-pool) + + (when pid-file + (call-with-output-file pid-file + (lambda (port) + (simple-format port "~A\n" (getpid))))) ;; Guile seems to just stop listening on ports, so try to ;; monitor that internally and just quit if it happens @@ -650,12 +760,9 @@ derivation-file))) (if (eq? #f system) ; derivation does not exist in database? (build-for-output-already-exists/with-derivation? - (with-fibers-port-timeouts - (lambda () - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file))) - #:timeout 240)) + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))) (any (lambda (output-details) (let ((builds-for-output @@ -780,12 +887,9 @@ ;; derivations with no builds works (if (datastore-find-derivation datastore derivation-file) #f - (with-fibers-port-timeouts - (lambda () - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file))) - #:timeout 30)))) + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))))) (when drv (datastore-store-derivation datastore drv)) @@ -806,8 +910,9 @@ ;; time too. (cons related-drv (random-v4-uuid))) related-derivations-lacking-builds)) - #:duration-metric-name - "store_build") + #:priority 'low + #:duration-metric-name "store_build" + #:duration-metric-buckets %command-duration-histogram-buckets) (#t ; build submitted (build-coordinator-prompt-hook-processing-for-event build-coordinator @@ -837,7 +942,8 @@ (stop-condition stop-condition))))) (stop-condition - stop-condition))))) + stop-condition))) + #:buckets %command-duration-histogram-buckets)) (define* (cancel-build build-coordinator uuid #:key (ignore-if-build-required-by-another? #t) @@ -850,6 +956,10 @@ datastore (lambda (db) (let ((build-details (datastore-find-build datastore uuid))) + (unless build-details + (raise-exception + (make-client-error 'build-unknown))) + (when (assq-ref build-details 'canceled) (raise-exception (make-client-error 'build-already-canceled))) @@ -872,7 +982,10 @@ (datastore-insert-unprocessed-hook-event datastore "build-canceled" (list uuid)) - 'build-canceled)))) + 'build-canceled) + #:priority 'low + #:duration-metric-name "cancel_build" + #:duration-metric-buckets %command-duration-histogram-buckets))) (when (eq? val 'build-canceled) (unless skip-updating-derived-priorities? @@ -893,6 +1006,10 @@ val)) + (unless (datastore-find-build datastore uuid) + (raise-exception + (make-client-error 'build-unknown))) + (if ignore-if-build-required-by-another? (let ((build-required ;; Do this check here outside the transaction to avoid having to @@ -915,6 +1032,10 @@ datastore (lambda (db) (let ((build-details (datastore-find-build datastore uuid))) + (unless build-details + (raise-exception + (make-client-error 'build-unknown))) + (when (assq-ref build-details 'canceled) (raise-exception (make-client-error 'build-already-canceled))) @@ -929,7 +1050,10 @@ new-priority #:skip-updating-derived-priorities? skip-updating-derived-priorities? - #:override-derived-priority override-derived-priority))) + #:override-derived-priority override-derived-priority)) + #:priority 'low + #:duration-metric-name "update_build_priority" + #:duration-metric-buckets %command-duration-histogram-buckets) (trigger-build-allocation build-coordinator) @@ -1012,7 +1136,17 @@ (and=> (assoc-ref (build-coordinator-hook-condvars build-coordinator) event-name) (lambda (condvar) - (signal-condition-variable condvar) + (cond + ((condition-variable? condvar) + (signal-condition-variable condvar)) + ((reusable-condition? condvar) + (signal-reusable-condition! + condvar + (build-coordinator-scheduler build-coordinator))) + (else + (error + (simple-format #f "unrecognised condvar ~A" + condvar)))) #t))) (define (update-build-allocation-plan build-coordinator) @@ -1074,24 +1208,19 @@ (lambda () (with-exception-handler (lambda (exn) - (simple-format - (current-error-port) - "build-allocator-thread: exception: ~A\n" - exn) (metric-increment failure-counter-metric) (atomic-box-set! allocation-needed #t)) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "error in build allocator thread\n") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) (lambda () (update-build-allocation-plan build-coordinator) - (metric-increment success-counter-metric)) - (lambda (key . args) - (simple-format - (current-error-port) - "error in build allocator thread: ~A ~A\n" - key - args) - (backtrace)))) + (metric-increment success-counter-metric)))) #:unwind? #t)) #:buckets ((@@ (prometheus) exponential-histogram-buckets) ; TODO #:start 1 @@ -1139,12 +1268,14 @@ (lambda () (while #t (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "exception in allocation plan fiber: ~A\n" - exn)) + (lambda _ #f) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "exception in allocation plan fiber\n") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) (lambda () (match (get-message (build-coordinator-allocator-channel coordinator)) (('stats reply) @@ -1187,9 +1318,7 @@ (update-build-allocation-plan-metrics!) - (put-message reply #t)))) - (lambda _ - (backtrace)))) + (put-message reply #t)))))) #:unwind? #t))))) (define (build-coordinator-allocation-plan-stats coordinator) @@ -1198,6 +1327,11 @@ (list 'stats reply)) (get-message reply))) +(define (build-coordinator-count-allocation-plan-builds coordinator agent-id) + (or (assoc-ref (build-coordinator-allocation-plan-stats coordinator) + agent-id) + 0)) + (define (build-coordinator-fetch-agent-allocation-plan coordinator agent-id) (let ((reply (make-channel))) (put-message (build-coordinator-allocator-channel coordinator) @@ -1275,7 +1409,8 @@ (define* (build-coordinator-list-allocation-plan-builds coordinator agent-id - #:key limit) + #:key limit + filter?) (define (take* lst i) (if (< (length lst) i) lst @@ -1284,31 +1419,51 @@ (define datastore (build-coordinator-datastore coordinator)) + (define (build-data uuid + derivation-name + derived-priority + build-details) + `((uuid . ,uuid) + (derivation_name . ,derivation-name) + (system . ,(datastore-find-build-derivation-system + datastore + uuid)) + (priority . ,(assq-ref build-details 'priority)) + (derived_priority . ,derived-priority) + (tags . ,(vector-map + (lambda (_ tag) + (match tag + ((key . value) + `((key . ,key) + (value . ,value))))) + (datastore-fetch-build-tags + datastore + uuid))))) + (let ((build-ids (build-coordinator-fetch-agent-allocation-plan coordinator agent-id))) (filter-map (lambda (build-id) - (match (datastore-fetch-build-to-allocate datastore build-id) - (#(uuid derivation_id derivation_name derived_priority) - (let ((build-details (datastore-find-build datastore uuid))) - `((uuid . ,uuid) - (derivation_name . ,derivation_name) - (system . ,(datastore-find-build-derivation-system - datastore - uuid)) - (priority . ,(assq-ref build-details 'priority)) - (derived_priority . ,derived_priority) - (tags . ,(vector-map - (lambda (_ tag) - (match tag - ((key . value) - `((key . ,key) - (value . ,value))))) - (datastore-fetch-build-tags - datastore - uuid)))))) - (#f #f))) + (if filter? + (match (datastore-fetch-build-to-allocate datastore build-id) + (#(uuid derivation_id derivation_name derived_priority) + (let ((build-details (datastore-find-build datastore uuid))) + (build-data uuid + derivation_name + derived_priority + build-details))) + (#f #f)) + (let ((build-details (datastore-find-build datastore build-id)) + (unprocessed-builds-entry + (datastore-find-unprocessed-build-entry + datastore + build-id))) + (build-data build-id + (assq-ref build-details 'derivation-name) + (assq-ref unprocessed-builds-entry + 'derived-priority) + build-details)))) (if limit (take* build-ids limit) build-ids)))) @@ -1359,6 +1514,18 @@ "hook_failure_total" #:labels '(event))) + (define process-events-thread-pool-thread-total-metric + (make-gauge-metric + (build-coordinator-metrics-registry build-coordinator) + "hook_thread_pool_thread_total" + #:labels '(event))) + + (define process-events-thread-pool-used-thread-total-metric + (make-gauge-metric + (build-coordinator-metrics-registry build-coordinator) + "hook_thread_pool_used_thread_total" + #:labels '(event))) + (define (process-event id event arguments handler) (log-msg (build-coordinator-logger build-coordinator) 'DEBUG @@ -1367,10 +1534,6 @@ (and (with-exception-handler (lambda (exn) - (log-msg (build-coordinator-logger build-coordinator) - 'ERROR - "error running " event " (" id ") hook: " - exn) (metric-increment failure-counter-metric #:label-values `((event . ,event))) @@ -1381,25 +1544,34 @@ (build-coordinator-metrics-registry build-coordinator) "hook_duration_seconds" (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (let* ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))))) + (backtrace + (call-with-output-string + (lambda (port) + (print-frames (stack->vector stack) + port + #:count (stack-length stack)) + (print-exception + port + (stack-ref stack 4) + '%exception + (list exn)))))) + (log-msg (build-coordinator-logger build-coordinator) + 'ERROR + "error running " event " (" id ") hook\n" + backtrace)) + (raise-exception exn)) (lambda () (start-stack - 'hook - (apply handler build-coordinator arguments))) - (lambda (key . args) - (log-msg (build-coordinator-logger build-coordinator) - 'ERROR - "error running " event " (" id ") hook: " - key " " args) - (let* ((stack (make-stack #t 3)) - (backtrace - (call-with-output-string - (lambda (port) - (display-backtrace stack port) - (newline port))))) - (display - backtrace - (current-output-port)))))) + #t + (apply handler build-coordinator arguments))))) #:labels '(event) #:label-values `((event . ,event))) #t) @@ -1425,84 +1597,116 @@ (define (single-thread-process-events event-name handler) (let ((mtx (make-mutex)) (condvar (make-condition-variable))) - (call-with-new-thread + (call-with-default-io-waiters (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (symbol->string event-name))) - (const #t)) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (symbol->string event-name))) + (const #t)) + + (lock-mutex mtx) + (while #t + (with-exception-handler + (lambda (exn) + (sleep 10)) + (lambda () + (with-exception-handler + (lambda (exn) + (log-msg + (build-coordinator-logger build-coordinator) + 'CRITICAL + "error in " event-name " hook processing thread") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (while #t + (match (datastore-list-unprocessed-hook-events datastore event-name 1) + (() + (wait-condition-variable condvar mtx)) + (((id event arguments)) + (process-event id event arguments handler))))))) + #:unwind? #t)))))) + condvar)) - (lock-mutex mtx) - (while #t - (with-exception-handler - (lambda (exn) - (with-exception-handler - (lambda _ - ;; Things are really going wrong if logging about - ;; the hook processing thread crashing, also raises - ;; an exception, so just try and sleep and hope - ;; things go better next time - (sleep 10)) - (lambda () - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "hook processing thread " event-name - " exception: " exn)) - #:unwind? #t) - (sleep 10)) - (lambda () - (with-throw-handler #t + (define (thread-pool-process-events event-name handler thread-count) + (let ((thread-pool + (make-thread-pool + thread-count + #:name (simple-format #f "~A" event-name))) + (reusable-condition + (make-reusable-condition)) + (coordination-channel + (make-channel))) + + (metric-set process-events-thread-pool-thread-total-metric + thread-count + #:label-values `((event . ,event-name))) + + (spawn-fiber + (lambda () + (let loop ((running-ids '())) + (metric-set process-events-thread-pool-used-thread-total-metric + (vector-count + (lambda (_ proc) + (->bool proc)) + (thread-pool-proc-vector thread-pool)) + #:label-values `((event . ,event-name))) + (match (get-message coordination-channel) + (('process id event arguments) + (if (member id running-ids) + ;; Ignore already running jobs + (loop running-ids) + (begin + (spawn-fiber + (lambda () + (call-with-thread + thread-pool + (lambda () + (process-event id event arguments handler)) + ;; TODO Make this the default through knots + #:timeout #f) + (put-message coordination-channel + (list 'finished id)))) + (loop (cons id running-ids))))) + (('finished id) + (when (< (length running-ids) + (* 2 thread-count)) + (signal-reusable-condition! reusable-condition)) + (loop (delete id running-ids))) + (('count-running reply) + (let ((count (length running-ids))) + (spawn-fiber (lambda () - (while #t - (match (datastore-list-unprocessed-hook-events datastore event-name 1) - (() - (wait-condition-variable condvar mtx)) - (((id event arguments)) - (process-event id event arguments handler))))) - (lambda (key . args) - (log-msg - (build-coordinator-logger build-coordinator) - 'CRITICAL - "error in " event-name " hook processing thread: " key " " args) - (backtrace)))) - #:unwind? #t)))) - condvar)) + (put-message reply count)))) + (loop running-ids)))))) - (define (work-queue-process-events event-name handler thread-count) - (let-values (((pool-mutex job-available count-threads list-jobs) - (create-thread-pool - (lambda () - (max - 1 - (length - (datastore-list-unprocessed-hook-events - datastore - event-name - thread-count)))) - (lambda (running-jobs) - (let* ((in-progress-ids - (map car running-jobs)) - (potential-jobs - (datastore-list-unprocessed-hook-events - datastore - event-name - (+ 1 (length in-progress-ids)))) - (job - (find - (match-lambda - ((id rest ...) - (not (member id in-progress-ids)))) - potential-jobs))) - (log-msg - (build-coordinator-logger build-coordinator) - 'DEBUG - event-name " work queue, got job " job) - job)) - (lambda (id event arguments) - (process-event id event arguments handler)) - #:name (symbol->string event-name)))) - job-available)) + (spawn-fiber + (lambda () + (while #t + (let ((count + (let ((channel (make-channel))) + (put-message coordination-channel + (list 'count-running channel)) + (get-message channel)))) + (when (< count (* 2 thread-count)) + (log-msg (build-coordinator-logger build-coordinator) + 'DEBUG "submitting batch of " event-name " hook events") + (for-each + (match-lambda + ((id event arguments) + (put-message coordination-channel + (list 'process id event arguments)))) + (datastore-list-unprocessed-hook-events + datastore + event-name + (* 20 thread-count))))) + (reusable-condition-wait reusable-condition + #:timeout 60)))) + + reusable-condition)) (map (match-lambda @@ -1511,15 +1715,14 @@ (or (and=> (assq-ref parallel-hooks event-name) (lambda (thread-count) - (work-queue-process-events event-name - handler - thread-count))) + (thread-pool-process-events event-name + handler + thread-count))) (single-thread-process-events event-name handler))))) (build-coordinator-hooks build-coordinator))) -(define (fetch-builds build-coordinator agent systems - max-builds deprecated-requested-count) +(define (fetch-builds build-coordinator agent systems max-builds) (define datastore (build-coordinator-datastore build-coordinator)) @@ -1528,10 +1731,14 @@ build-coordinator agent-id))) (if build-details (let ((build-id (assq-ref build-details 'uuid))) - (datastore-insert-to-allocated-builds datastore agent-id (list build-id)) + (datastore-insert-to-allocated-builds + datastore + agent-id + build-id) (build-coordinator-remove-build-from-allocation-plan build-coordinator build-id) - build-details) + `(,@build-details + (submit_outputs . null))) #f))) (define (allocate-several-builds agent-id count) @@ -1555,28 +1762,68 @@ (datastore-list-agent-builds datastore agent)) (start-count (length initially-allocated-builds)) - (target-count (or max-builds - (+ start-count - deprecated-requested-count)))) + (target-count max-builds)) (if (< start-count target-count) (let ((new-builds (allocate-several-builds agent (- target-count start-count)))) - ;; Previously allocate builds just returned newly allocated - ;; builds, but if max-builds is provided, return all the - ;; builds. This means the agent can handle this in a idempotent - ;; manor. - (if max-builds - (append initially-allocated-builds - new-builds) - new-builds)) - ;; Previously allocate builds just returned newly allocated builds, - ;; but if max-builds is provided, return all the builds. This means - ;; the agent can handle this in a idempotent manor. - (if max-builds - initially-allocated-builds - '())))) - #:duration-metric-name "allocate_builds_to_agent")) + (if (null? new-builds) + (values initially-allocated-builds + #f) + (values (append initially-allocated-builds + new-builds) + #t))) + (values initially-allocated-builds + #f)))) + #:duration-metric-name "allocate_builds_to_agent" + #:duration-metric-buckets %command-duration-histogram-buckets)) + + (define (send-agent-builds-allocated-event builds) + (build-coordinator-send-event + build-coordinator + "agent-builds-allocated" + `((agent_id . ,agent) + (builds . ,(list->vector + (map + (lambda (build) + `(,@build + (tags + . ,(list->vector + (map + (match-lambda + ((key . value) + `((key . ,key) + (value . ,value)))) + (vector->list + (datastore-fetch-build-tags + datastore + (assq-ref build 'uuid)))))))) + builds)))))) + + (define (submit-outputs? build) + (with-exception-handler + (lambda (exn) + (log-msg (build-coordinator-logger build-coordinator) + 'CRITICAL + "build-submit-outputs hook raised exception") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (let ((hook-result + (call-with-delay-logging + (lambda () + (build-submit-outputs-hook + build-coordinator + (assq-ref build 'uuid)))))) + (if (boolean? hook-result) + hook-result + (begin + (log-msg + (build-coordinator-logger build-coordinator) + 'CRITICAL + "build-submit-outputs hook returned non boolean: " + hook-result) + #t)))))) (call-with-duration-metric (build-coordinator-metrics-registry build-coordinator) @@ -1592,111 +1839,42 @@ (trigger-build-allocation build-coordinator))) (let ((builds - (get-builds))) - - (build-coordinator-send-event - build-coordinator - "agent-builds-allocated" - `((agent_id . ,agent) - (builds . ,(list->vector - (map - (lambda (build) - `(,@build - (tags - . ,(list->vector - (map - (match-lambda - ((key . value) - `((key . ,key) - (value . ,value)))) - (vector->list - (datastore-fetch-build-tags - datastore - (assq-ref build 'uuid)))))))) - builds))))) - - (map (lambda (build) - (define submit-outputs? - (with-exception-handler - (lambda (exn) - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook raised exception: " - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (let ((hook-result - (call-with-delay-logging - (lambda () - (build-submit-outputs-hook - build-coordinator - (assq-ref build 'uuid)))))) - (if (boolean? hook-result) - hook-result - (begin - (log-msg - (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook returned non boolean: " - hook-result) - #t)))) - (lambda (key . args) - (backtrace)))) - #:unwind? #t)) - - `(,@build - ;; TODO This needs reconsidering when things having been built in - ;; the past doesn't necessarily mean they're still available. - (submit_outputs . ,submit-outputs?))) - builds))))))) + new-builds-allocated? + (if (= 0 + (build-coordinator-count-allocation-plan-builds + build-coordinator + agent)) + (values + (datastore-list-agent-builds datastore agent) + #f) + (get-builds)))) + + (when new-builds-allocated? + (send-agent-builds-allocated-event builds)) + + (map + (lambda (build) + (if (eq? 'null (assq-ref build 'submit_outputs)) + (let ((submit-outputs? (submit-outputs? build))) + (datastore-update-allocated-build-submit-outputs + (build-coordinator-datastore build-coordinator) + (assq-ref build 'uuid) + submit-outputs?) + + `(,@(alist-delete 'submit_outputs build) + (submit_outputs . ,submit-outputs?))) + build)) + builds))))))) (define (agent-details build-coordinator agent-id) (define datastore (build-coordinator-datastore build-coordinator)) - (define build-submit-outputs-hook - (assq-ref (build-coordinator-hooks build-coordinator) - 'build-submit-outputs)) - - (define (submit-outputs? build) - (with-exception-handler - (lambda (exn) - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook raised exception: " - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (let ((hook-result - (call-with-delay-logging - (lambda () - (build-submit-outputs-hook - build-coordinator - (assq-ref build 'uuid)))))) - (if (boolean? hook-result) - hook-result - (begin - (log-msg - (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook returned non boolean: " - hook-result) - #t)))) - (lambda (key . args) - (backtrace)))) - #:unwind? #t)) - (let ((agent (datastore-find-agent datastore agent-id)) (allocated-builds (datastore-list-agent-builds datastore agent-id))) `(,@agent ; description - (builds . ,(list->vector - (map (lambda (build) - `(,@build - (submit_outputs . ,(submit-outputs? build)))) - allocated-builds)))))) + (builds . ,(list->vector allocated-builds))))) (define (build-data-location build-id ) (string-append (%config 'builds-dir) "/" @@ -1845,10 +2023,11 @@ (list build-id)) (when success? - (datastore-delete-relevant-outputs-from-unbuilt-outputs + (datastore-insert-background-job datastore - build-id) - (datastore-update-unprocessed-builds-for-build-success + 'build-success + (list build-id)) + (datastore-delete-relevant-outputs-from-unbuilt-outputs datastore build-id) (datastore-store-output-metadata @@ -1858,7 +2037,8 @@ (assoc-ref result-json "outputs")))) #f)))) - #:duration-metric-name "store_build_result"))) + #:duration-metric-name "store_build_result" + #:duration-metric-buckets %command-duration-histogram-buckets))) (when exception ;; Raise the exception here to avoid aborting the transaction (raise-exception exception))) @@ -1872,6 +2052,11 @@ 'build-success 'build-failure)) + (when success? + (build-coordinator-trigger-background-job-processing + build-coordinator + 'build-success)) + (build-coordinator-send-event build-coordinator (if success? @@ -1884,7 +2069,8 @@ ;; could change the allocation (trigger-build-allocation build-coordinator) - #t)))) + #t)) + #:buckets %command-duration-histogram-buckets)) (define (handle-build-start-report build-coordinator agent-id @@ -1903,7 +2089,8 @@ build-coordinator 'build-started `((build_id . ,build-id) - (agent_id . ,agent-id)))))) + (agent_id . ,agent-id)))) + #:buckets %command-duration-histogram-buckets)) (define (handle-setup-failure-report build-coordinator agent-id build-id report-json) @@ -1939,3 +2126,142 @@ ;; Trigger build allocation, so that the allocator can handle this setup ;; failure (trigger-build-allocation build-coordinator)) + +(define (build-coordinator-trigger-background-job-processing + build-coordinator + type) + (let ((condition + (assq-ref (build-coordinator-background-job-conditions + build-coordinator) + type))) + (unless condition + (error + (simple-format #f "unknown condition ~A" type))) + (signal-reusable-condition! condition))) + +(define (start-background-job-processing-fibers build-coordinator) + (define %background-job-duration-histogram-buckets + (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf))) + + (define* (start-job-fibers type proc #:key (parallelism 1)) + (let ((coordination-channel + (make-channel)) + (condition + (make-reusable-condition)) + (process-in-fiber + (fiberize + (lambda args + (call-with-duration-metric + (build-coordinator-metrics-registry build-coordinator) + "coordinator_background_job_duration_seconds" + (lambda () + (call-with-delay-logging proc #:args args)) + #:labels '(name) + #:label-values `((name . ,type)) + #:buckets %background-job-duration-histogram-buckets)) + #:parallelism parallelism)) + (job-exception-counter-metric + (make-counter-metric + (build-coordinator-metrics-registry build-coordinator) + "coordinator_background_job_failures_total" + #:labels '(name)))) + + (define (process id . args) + (spawn-fiber + (lambda () + (let loop ((retry 0)) + (log-msg (build-coordinator-logger build-coordinator) + 'DEBUG + "processing " type " background job (id: " + id ", args: " args ", retry: " retry ")") + (let ((success? + (with-exception-handler + (lambda (exn) + (log-msg (build-coordinator-logger build-coordinator) + 'WARN + type " background job error (id: " + id "): " exn) + #f) + (lambda () + (apply process-in-fiber args)) + #:unwind? #t))) + (if success? + (begin + (datastore-delete-background-job + (build-coordinator-datastore build-coordinator) + id) + (put-message coordination-channel + (list 'job-finished id))) + (begin + (metric-increment job-exception-counter-metric + #:label-values `((name . ,type))) + (sleep 30) + (loop (+ 1 retry))))))))) + + (spawn-fiber + (lambda () + (while #t + (let ((job-details + (datastore-select-background-jobs + (build-coordinator-datastore build-coordinator) + type + #:limit (* 2 parallelism)))) + + (unless (null? job-details) + (put-message coordination-channel + (list 'process job-details))) + + (reusable-condition-wait condition + #:timeout 30))))) + + (spawn-fiber + (lambda () + (let loop ((running-job-ids '())) + (match (get-message coordination-channel) + (('process jobs) + (let* ((job-ids (map (lambda (job) + (assq-ref job 'id)) + jobs)) + (new-ids + (lset-difference = job-ids running-job-ids)) + (jobs-to-start + (take new-ids + (min + (- parallelism + (length running-job-ids)) + (length new-ids))))) + (for-each (lambda (job) + (apply process + (assq-ref job 'id) + (assq-ref job 'args))) + (filter + (lambda (job-details) + (member (assq-ref job-details 'id) + jobs-to-start)) + jobs)) + (loop (append running-job-ids + jobs-to-start)))) + (('job-finished id) + ;; Maybe not very efficient, but should work + (signal-reusable-condition! condition) + (log-msg (build-coordinator-logger build-coordinator) + 'DEBUG type + " background job " id + " finished successfully") + (loop (delete id running-job-ids))))))) + + condition)) + + `((build-success . ,(start-job-fibers + 'build-success + (lambda (build-id) + (datastore-update-unprocessed-builds-for-build-success + (build-coordinator-datastore build-coordinator) + build-id)) + #:parallelism 24)))) + +(define (build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built + build-coordinator) + (datastore-check-and-correct-unprocessed-builds-all-inputs-built + (build-coordinator-datastore build-coordinator) + #:progress-reporter progress-reporter/bar)) |