;;; Guix Build Coordinator ;;; ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of the guix-build-coordinator. ;;; ;;; The Guix Build Coordinator is free software; you can redistribute ;;; it and/or modify it under the terms of the GNU General Public ;;; License as published by the Free Software Foundation; either ;;; version 3 of the License, or (at your option) any later version. ;;; ;;; The Guix Build Coordinator is distributed in the hope that it will ;;; be useful, but WITHOUT ANY WARRANTY; without even the implied ;;; warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ;;; See the GNU General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (guix-build-coordinator coordinator) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 vlist) #:use-module (ice-9 match) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (ice-9 format) #:use-module (ice-9 atomic) #:use-module (ice-9 control) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (web uri) #:use-module (web http) #:use-module (oop goops) #:use-module (logging logger) #:use-module (logging port-log) #:use-module (gcrypt random) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers conditions) #:use-module (prometheus) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix store) #:use-module (guix derivations) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator build-allocator) #:use-module (guix-build-coordinator agent-messaging http server) #:use-module (guix-build-coordinator client-communication) #:export (make-agent-error ; TODO Remove export once unused agent-error? agent-error-details client-error? client-error-details make-build-coordinator build-coordinator-datastore build-coordinator-hooks build-coordinator-metrics-registry build-coordinator-allocation-strategy build-coordinator-logger build-coordinator-scheduler build-coordinator-listen-for-events build-coordinator-get-state-id %known-hooks %default-agent-uri %default-client-uri perform-coordinator-service-startup run-coordinator-service submit-build cancel-build update-build-priority new-agent new-agent-password set-agent-active update-agent-status fetch-builds agent-details trigger-build-allocation build-coordinator-prompt-hook-processing-for-event start-hook-processing-threads build-output-file-location build-log-file-destination build-log-file-location handle-build-start-report handle-build-result handle-setup-failure-report)) (define-exception-type &agent-error &error make-agent-error agent-error? (details agent-error-details)) (define-exception-type &client-error &error make-client-error client-error? (details client-error-details)) (define-record-type (make-build-coordinator-record datastore hooks metrics-registry allocation-strategy logger) build-coordinator? (datastore build-coordinator-datastore) (hooks build-coordinator-hooks) (hook-condvars build-coordinator-hook-condvars set-build-coordinator-hook-condvars!) (metrics-registry build-coordinator-metrics-registry) (allocation-strategy build-coordinator-allocation-strategy) (allocator-thread build-coordinator-allocator-thread set-build-coordinator-allocator-thread!) (logger build-coordinator-logger) (events-channel build-coordinator-events-channel set-build-coordinator-events-channel!) (get-state-id build-coordinator-get-state-id-proc set-build-coordinator-get-state-id-proc!) (scheduler build-coordinator-scheduler set-build-coordinator-scheduler!)) (set-record-type-printer! (lambda (build-coordinator port) (display "#" port))) (define-class () (port #:init-value #f #:accessor port #:init-keyword #:port)) (define-method (emit-log (self ) str) (when (port self) (put-bytevector (port self) (string->utf8 str)) ;; Even though the port is line buffered, writing to it with ;; put-bytevector doesn't cause the buffer to be flushed. (force-output (port self)))) (define-method (flush-log (self )) (and=> (port self) force-output)) (define-method (close-log! (self )) (and=> (port self) close-port) (set! (port self) #f)) (define %known-hooks '(build-submitted build-started build-success build-failure build-canceled build-missing-inputs build-submit-outputs)) (define (make-events-channel datastore) (let* ((submission-channel (make-channel)) (listener-channels-box (make-atomic-box vlist-null)) (buffer-size 10000) (event-buffer (make-vector buffer-size)) (current-state-id-and-event-buffer-index-box (make-atomic-box (cons 0 -1)))) (define (get-state-id) (match (atomic-box-ref current-state-id-and-event-buffer-index-box) ((current-state-id . event-buffer-index) current-state-id))) (define (spawn-fiber-for-listener callback after-state-id submission-channel listener-channel listening-finished-channel) (spawn-fiber (lambda () (let loop ((last-sent-state-id after-state-id)) (let ((new-state-id (with-exception-handler (lambda (exn) (put-message submission-channel (list 'remove-listener listener-channel)) (put-message listening-finished-channel #t) #f) (lambda () (with-throw-handler #t (lambda () (match (atomic-box-ref current-state-id-and-event-buffer-index-box) ((current-state-id . event-buffer-index) (let ((event-count-to-send (- current-state-id last-sent-state-id))) (for-each (lambda (index index-state-id) (match (vector-ref event-buffer index) ((event-state-id event-name data) (when (not (= event-state-id index-state-id)) (error "listener behind")) (callback event-state-id event-name data)))) (map (lambda (i) (modulo i buffer-size)) (iota event-count-to-send (- event-buffer-index (- event-count-to-send 1)))) (iota event-count-to-send (+ 1 last-sent-state-id)))) current-state-id))) (lambda (key . args) (if (and (eq? key 'system-error) (match args (("fport_write" "~A" ("Broken pipe") rest ...) #t) (_ #f))) #f (backtrace))))) #:unwind? #t))) (unless (eq? #f new-state-id) (get-message listener-channel) (loop new-state-id))))) #:parallel? #t)) (define (store-event event-name data) (match (atomic-box-ref current-state-id-and-event-buffer-index-box) ((current-state-id . event-buffer-index) (let ((new-state-id (+ 1 current-state-id)) (new-event-index (modulo (+ 1 event-buffer-index) buffer-size))) (vector-set! event-buffer new-event-index (list new-state-id event-name data)) (atomic-box-set! current-state-id-and-event-buffer-index-box (cons new-state-id new-event-index)))))) (spawn-fiber (lambda () (while #t (match (get-message submission-channel) (('new-listener callback requested-after-state-id listening-finished-channel) (let ((listener-channel (make-channel)) (after-state-id (match (atomic-box-ref current-state-id-and-event-buffer-index-box) ((current-state-id . event-buffer-index) (if requested-after-state-id (if (> requested-after-state-id current-state-id) current-state-id requested-after-state-id) current-state-id))))) (atomic-box-set! listener-channels-box (vhash-consq listener-channel #t (atomic-box-ref listener-channels-box))) (spawn-fiber-for-listener callback after-state-id submission-channel listener-channel listening-finished-channel))) (('remove-listener listener-channel) (atomic-box-set! listener-channels-box (vhash-delq listener-channel (atomic-box-ref listener-channels-box)))) ((event-name data) (store-event event-name data) (vhash-fold (lambda (listener-channel val res) (spawn-fiber (lambda () (put-message listener-channel #t)) #:parallel? #t) #f) #f (atomic-box-ref listener-channels-box))))))) (values submission-channel get-state-id))) (define (build-coordinator-send-event build-coordinator event-name data) (and=> (build-coordinator-scheduler build-coordinator) (lambda (scheduler) (spawn-fiber (lambda () (put-message (build-coordinator-events-channel build-coordinator) (list event-name `(,@data (timestamp . ,(date->string (time-utc->date (current-time time-utc)) "~1 ~3")))))) scheduler #:parallel? #t)))) (define* (build-coordinator-listen-for-events build-coordinator callback #:key after-state-id) (let ((listening-finished-channel (make-channel))) (put-message (build-coordinator-events-channel build-coordinator) (list 'new-listener callback after-state-id listening-finished-channel)) ;; This is designed to be useful in the controller, so this procedure ;; shouldn't exit until callback has finished being called (get-message listening-finished-channel))) (define (build-coordinator-get-state-id build-coordinator) ((build-coordinator-get-state-id-proc build-coordinator))) (define* (make-build-coordinator #:key database-uri-string (metrics-registry (make-metrics-registry #:namespace "guixbuildcoordinator")) (datastore (database-uri->datastore database-uri-string #:metrics-registry metrics-registry #:worker-thread-log-exception? (lambda (exn) (and (not (agent-error? exn)) (not (client-error? exn)))))) hooks (allocation-strategy basic-build-allocation-strategy)) (and (or (list? hooks) (begin (simple-format #t "warning: guix-build-coordinator: hooks should be a list\n") #f)) (for-each (match-lambda ((hook-name . hook) (unless (member hook-name %known-hooks) (simple-format #t "warning: guix-build-coordinator: hook name ~A not recognised it should be one of: ~A\n" hook-name (string-join (map symbol->string %known-hooks) ", "))) (unless (procedure? hook) (simple-format #t "warning: guix-build-coordinator: hook ~A value is not a procedure ~A\n" hook-name hook))) (unknown (simple-format #t "warning: guix-build-coordinator: hooks entry is not a pair: ~A\n" unknown))) hooks)) (let* ((lgr (make )) (port-log (make #:port (current-output-port) #:formatter ;; In guile-lib v0.2.8 onwards, the formatter is ;; called with more arguments (lambda args ; lvl, time, str (format #f "~a (~5a): ~a~%" (strftime "%F %H:%M:%S" (localtime (second args))) (first args) (third args))))) (build-coordinator (make-build-coordinator-record datastore hooks metrics-registry allocation-strategy lgr))) (add-handler! lgr port-log) (open-log! lgr) build-coordinator)) (define* (perform-coordinator-service-startup build-coordinator #:key (update-datastore? #t) (pid-file #f) (trigger-build-allocation? #t) (parallel-hooks '())) ;; The logger assumes this (set-port-encoding! (current-output-port) "UTF-8") ;; Work around my broken with-store/non-blocking in Guix (let ((socket-file (%daemon-socket-uri))) (%daemon-socket-uri (string-append "file://" socket-file))) (with-exception-handler (lambda (exn) (simple-format #t "failed enabling core dumps: ~A\n" exn)) (lambda () (setrlimit 'core #f #f)) #:unwind? #t) (with-exception-handler (lambda (exn) (simple-format #t "failed increasing open file limit: ~A\n" exn)) (lambda () (setrlimit 'nofile 4096 4096)) #:unwind? #t) ;; TODO Work around this causing problems with backtraces ;; https://github.com/wingo/fibers/issues/76 (set-record-type-printer! (@@ (fibers scheduler) ) (lambda (scheduler port) (display "#" port))) (when pid-file (call-with-output-file pid-file (lambda (port) (simple-format port "~A\n" (getpid))))) (when update-datastore? (datastore-update (build-coordinator-datastore build-coordinator))) (set-build-coordinator-allocator-thread! build-coordinator (make-build-allocator-thread build-coordinator)) (set-build-coordinator-hook-condvars! build-coordinator (start-hook-processing-threads build-coordinator parallel-hooks)) (when trigger-build-allocation? (trigger-build-allocation build-coordinator))) (define %default-agent-uri (string->uri "http://0.0.0.0:8745")) (define %default-client-uri (string->uri "http://127.0.0.1:8746")) (define* (run-coordinator-service build-coordinator #:key (update-datastore? #t) (pid-file #f) (agent-communication-uri %default-agent-uri) (client-communication-uri %default-client-uri) secret-key-base (parallel-hooks '())) (with-fluids ((%file-port-name-canonicalization 'none)) (perform-coordinator-service-startup build-coordinator #:update-datastore? update-datastore? #:pid-file pid-file #:parallel-hooks parallel-hooks) ;; Create some worker thread channels, which need to be created prior ;; to run-fibers being called. (let ((chunked-request-channel ;; There are fibers issues when trying to read the chunked ;; requests, so do this in dedicated threads. (make-worker-thread-channel (const '()) #:name "chunked request" #:parallelism 16 #:log-exception? (lambda (exn) (not (chunked-input-ended-prematurely-error? exn))) #:delay-logger (lambda (seconds-delayed) (log-delay "chunked request channel" seconds-delayed) (when (> seconds-delayed 0.1) (format (current-error-port) "warning: chunked request channel delayed by ~1,2f seconds~%" seconds-delayed))))) (output-hash-channel (make-output-hash-channel build-coordinator))) (let ((finished? (make-condition))) (call-with-sigint (lambda () (run-fibers (lambda () (let* ((current (current-scheduler)) (schedulers (cons current (scheduler-remote-peers current)))) (for-each (lambda (i sched) (spawn-fiber (lambda () (catch 'system-error (lambda () (set-thread-name (string-append "fibers " (number->string i)))) (const #t))) sched)) (iota (length schedulers)) schedulers)) (log-msg (build-coordinator-logger build-coordinator) 'INFO "initialising metrics") (with-time-logging "datastore initialise metrics" (datastore-initialise-metrics! (build-coordinator-datastore build-coordinator))) (datastore-spawn-fibers (build-coordinator-datastore build-coordinator)) (spawn-fiber-to-watch-for-deferred-builds build-coordinator) (set-build-coordinator-scheduler! build-coordinator (current-scheduler)) (let ((events-channel get-state-id (make-events-channel (build-coordinator-datastore build-coordinator)))) (set-build-coordinator-events-channel! build-coordinator events-channel) (set-build-coordinator-get-state-id-proc! build-coordinator get-state-id)) ;; Start the agent messaging server (match (uri-scheme agent-communication-uri) ('http (let ((host (uri-host agent-communication-uri)) (port (uri-port agent-communication-uri))) (http-agent-messaging-start-server port host secret-key-base build-coordinator chunked-request-channel output-hash-channel) (log-msg 'INFO "listening on " host ":" port)))) ;; Start the client messaging server (start-client-request-server secret-key-base (uri-host client-communication-uri) (uri-port client-communication-uri) build-coordinator) ;; Guile seems to just stop listening on ports, so try to ;; monitor that internally and just quit if it happens (spawn-port-monitoring-fiber (uri-port agent-communication-uri) finished?) (spawn-port-monitoring-fiber (uri-port client-communication-uri) finished?) (wait finished?)) #:hz 0 #:parallelism 1)) finished?))))) (define* (submit-build build-coordinator derivation-file #:key requested-uuid (priority 0) (ignore-if-build-for-derivation-exists? #f) (ignore-if-build-for-outputs-exists? #f) (ensure-all-related-derivation-outputs-have-builds? #f) (tags '()) defer-until (read-drv read-derivation-from-file*)) (define datastore (build-coordinator-datastore build-coordinator)) (define (build-for-derivation-exists?) (> (datastore-count-builds-for-derivation datastore derivation-file #:include-canceled? #f) 0)) (define (build-for-output-already-exists?) ;; Handle the derivation not existing in the database here, so that adding ;; it to the database isn't required for this code to work (let* ((system-from-database (datastore-find-derivation-system datastore derivation-file)) (derivation-exists-in-database? (not (eq? #f system-from-database))) (derivation (if derivation-exists-in-database? #f ; unnecessary to fetch derivation ;; Bit of a hack, but offload reading the derivation to a ;; thread so that it doesn't block the fibers thread, since ;; local I/O doesn't cooperate with fibers (datastore-call-with-transaction datastore (lambda _ (with-fibers-port-timeouts (lambda () (call-with-delay-logging read-drv #:threshold 10 #:args (list derivation-file))) #:timeout 240)) #:readonly? #t))) (system (or system-from-database (derivation-system derivation))) (outputs (if derivation-exists-in-database? (datastore-find-derivation-outputs datastore derivation-file) (map (match-lambda ((name . output) `((name . ,name) (output . ,(derivation-output-path output))))) (derivation-outputs derivation))))) (any (lambda (output-details) (let ((builds-for-output (datastore-list-builds-for-output-and-system datastore (assq-ref output-details 'output) system #:include-canceled? #f))) (not (null? builds-for-output)))) outputs))) (define (check-whether-to-store-build) (cond ((and ignore-if-build-for-derivation-exists? (build-for-derivation-exists?)) '((no-build-submitted . build-already-exists-for-this-derivation))) ((and ignore-if-build-for-outputs-exists? (call-with-delay-logging build-for-output-already-exists?)) '((no-build-submitted . build-already-exists-for-a-output))) (else 'continue))) (define* (store-build derivation-name uuid priority tags #:key skip-updating-other-build-derived-priorities) (datastore-insert-build datastore uuid derivation-name priority defer-until #:skip-updating-other-build-derived-priorities skip-updating-other-build-derived-priorities) (datastore-insert-unprocessed-hook-event datastore "build-submitted" (list uuid)) (unless (null? tags) (datastore-insert-build-tags datastore uuid tags)) #t) (define build-id (or requested-uuid (random-v4-uuid))) (define (build-perform-datastore-changes derivations-lacking-builds) (lambda (_) ;; Check again now, since new builds could have been added since the ;; checks were made before the start of the transaction. (match (check-whether-to-store-build) ('continue ;; Actually create a build, do this first so the derived priorities ;; for the builds inserted below are informed by this build. (store-build derivation-file build-id priority tags) (for-each (match-lambda ((related-derivation . related-uuid) ;; Double check at this point, within the transaction that no build ;; exists for this related derivation. ;; ;; This stops duplicate related builds from being submitted when ;; simultaneous submit build requests are being processed. (unless (datastore-build-exists-for-derivation-outputs? datastore related-derivation) (simple-format #t "submtiting ~A for related ~A\n" related-uuid related-derivation) (call-with-delay-logging store-build #:args (list related-derivation related-uuid ;; Let the scheduler take care of ;; the prioritisation 0 tags ;; Since this build's priority isn't important, this ;; expensive part of inserting builds can be skipped #:skip-updating-other-build-derived-priorities #t))))) derivations-lacking-builds) #t) (stop-reason stop-reason)))) (call-with-duration-metric (build-coordinator-metrics-registry build-coordinator) "coordinator_submit_build_duration_seconds" (lambda () (match (check-whether-to-store-build) ('continue ;; Store the derivation first, so that listing related derivations ;; with no builds works (unless (datastore-find-derivation datastore derivation-file) (datastore-store-derivation datastore ;; Bit of a hack, but offload reading the derivation to a thread so ;; that it doesn't block the fibers thread, since local I/O doesn't ;; cooperate with fibers (datastore-call-with-transaction datastore (lambda _ (with-fibers-port-timeouts (lambda () (call-with-delay-logging read-drv #:threshold 10 #:args (list derivation-file))) #:timeout 30)) #:readonly? #t))) (let ((related-derivations-lacking-builds (if ensure-all-related-derivation-outputs-have-builds? (datastore-list-related-derivations-with-no-build-for-outputs datastore derivation-file) '()))) (match (datastore-call-with-transaction datastore (build-perform-datastore-changes ;; Do this here so it doesn't take time in the writer thread (map (lambda (drv) ;; Generate the UUID's outside the transaction to save ;; time too. (cons drv (random-v4-uuid))) related-derivations-lacking-builds)) #:duration-metric-name "store_build") (#t ; build submitted (build-coordinator-prompt-hook-processing-for-event build-coordinator 'build-submitted) (build-coordinator-send-event build-coordinator 'build-submitted `((id . ,build-id) (derivation . ,derivation-file) (priority . ,priority) (tags . ,(list->vector (map (match-lambda ((key . value) `((key . ,key) (value . ,value)))) (if (vector? tags) (vector->list tags) tags)))) (defer_until . ,defer-until))) (trigger-build-allocation build-coordinator) `((build-submitted . ,build-id))) (stop-condition stop-condition)))) (stop-condition stop-condition))))) (define* (cancel-build build-coordinator uuid #:key (ignore-if-build-required-by-another? #t) skip-updating-derived-priorities?) (define datastore (build-coordinator-datastore build-coordinator)) (define (perform-operation) (let ((val (datastore-call-with-transaction datastore (lambda (db) (let ((build-details (datastore-find-build datastore uuid))) (when (assq-ref build-details 'canceled) (raise-exception (make-client-error 'build-already-canceled))) (when (assq-ref build-details 'processed) (raise-exception (make-client-error 'build-already-processed)))) (when (and ignore-if-build-required-by-another? (datastore-build-required-by-another? datastore uuid)) (raise-exception (make-transaction-rollback-exception 'skipped-as-build-required-by-another))) (datastore-remove-build-from-allocation-plan datastore uuid) (datastore-cancel-build datastore uuid) (datastore-insert-unprocessed-hook-event datastore "build-canceled" (list uuid)) 'build-canceled)))) (when (eq? val 'build-canceled) (unless skip-updating-derived-priorities? (datastore-update-unprocessed-builds-with-lower-derived-priorities datastore uuid #f)) (trigger-build-allocation build-coordinator) (build-coordinator-prompt-hook-processing-for-event build-coordinator 'build-canceled) (build-coordinator-send-event build-coordinator 'build-canceled `((id . ,uuid)))) val)) (if ignore-if-build-required-by-another? (let ((build-required ;; Do this check here outside the transaction to avoid having to ;; start a transaction if there are builds requiring this one ;; ;; It's important to repeat this check inside the transaction for ;; correctness. (datastore-build-required-by-another? datastore uuid))) (if build-required 'skipped-as-build-required-by-another (perform-operation))) (perform-operation))) (define* (update-build-priority build-coordinator uuid new-priority #:key skip-updating-derived-priorities? override-derived-priority) (define datastore (build-coordinator-datastore build-coordinator)) (datastore-call-with-transaction datastore (lambda (db) (let ((build-details (datastore-find-build datastore uuid))) (when (assq-ref build-details 'canceled) (raise-exception (make-client-error 'build-already-canceled))) (when (assq-ref build-details 'processed) (raise-exception (make-client-error 'build-already-processed)))) (datastore-update-build-priority datastore uuid new-priority #:skip-updating-derived-priorities? skip-updating-derived-priorities? #:override-derived-priority override-derived-priority))) (trigger-build-allocation build-coordinator) #t) (define* (new-agent datastore #:key requested-uuid name description) (let ((uuid (or requested-uuid (random-v4-uuid)))) (datastore-new-agent datastore uuid name description) uuid)) (define* (new-agent-password datastore #:key agent) (let ((password (random-token))) (datastore-new-agent-password datastore agent password) password)) (define (set-agent-active coordinator agent-uuid active?) (datastore-set-agent-active (build-coordinator-datastore coordinator) agent-uuid active?) (trigger-build-allocation coordinator)) (define* (update-agent-status coordinator agent-uuid status 1min-load-average system-uptime processor-count #:key initial-status-update?) (define datastore (build-coordinator-datastore coordinator)) (when initial-status-update? (datastore-call-with-transaction datastore (lambda _ (for-each (lambda (build) (when (assq-ref build 'canceled) (datastore-remove-build-allocation datastore (assq-ref build 'uuid) agent-uuid))) (datastore-list-agent-builds datastore agent-uuid))))) (datastore-update-agent-status datastore agent-uuid status 1min-load-average system-uptime processor-count) (build-coordinator-send-event coordinator "agent-status-update" `((agent_id . ,agent-uuid) (status . ,status) (load_average . ((1 . ,1min-load-average))) (system_uptime . ,system-uptime) (processor_count . ,processor-count)))) (define (trigger-build-allocation build-coordinator) ((build-coordinator-allocator-thread build-coordinator))) (define (build-coordinator-prompt-hook-processing-for-event build-coordinator event-name) (and=> (assoc-ref (build-coordinator-hook-condvars build-coordinator) event-name) (lambda (condvar) (signal-condition-variable condvar) #t))) (define (update-build-allocation-plan build-coordinator) (define datastore (build-coordinator-datastore build-coordinator)) (let* ((allocator-proc (build-coordinator-allocation-strategy build-coordinator)) (new-plan (with-time-logging "computing new build allocation plan" (allocator-proc datastore #:metrics-registry (build-coordinator-metrics-registry build-coordinator))))) (datastore-replace-build-allocation-plan datastore new-plan) (let ((build-count-per-agent (datastore-count-build-allocation-plan-entries datastore))) (build-coordinator-send-event build-coordinator "allocation-plan-update" `((allocation_plan_counts . ,build-count-per-agent))))) #t) (define (make-build-allocator-thread build-coordinator) (define mtx (make-mutex)) (define v (make-condition-variable)) (define allocation-needed (make-atomic-box #f)) (define (trigger-build-allocation) (atomic-box-set! allocation-needed #t) (signal-condition-variable v)) (define success-counter-metric (make-counter-metric (build-coordinator-metrics-registry build-coordinator) "allocator_allocations_total")) (define failure-counter-metric (make-counter-metric (build-coordinator-metrics-registry build-coordinator) "allocator_failures_total")) (define (update-build-allocation-plan-loop) (while #t (with-mutex mtx (let ((previous-allocation-needed-value (atomic-box-swap! allocation-needed #f))) (when (eq? #f previous-allocation-needed-value) (wait-condition-variable v mtx) (atomic-box-set! allocation-needed #f))) (call-with-duration-metric (build-coordinator-metrics-registry build-coordinator) "allocate_builds_duration_seconds" (lambda () (with-exception-handler (lambda (exn) (simple-format (current-error-port) "build-allocator-thread: exception: ~A\n" exn) (metric-increment failure-counter-metric) (atomic-box-set! allocation-needed #t)) (lambda () (with-throw-handler #t (lambda () (update-build-allocation-plan build-coordinator) (metric-increment success-counter-metric)) (lambda (key . args) (simple-format (current-error-port) "error in build allocator thread: ~A ~A\n" key args) (backtrace)))) #:unwind? #t)) #:buckets ((@@ (prometheus) exponential-histogram-buckets) ; TODO #:start 1 #:end (* 30 60)))))) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name "allocator")) (const #t)) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: allocator thread: ~A\n" exn) (exit 1)) (lambda () (let ((build-allocation-plan-total (make-gauge-metric (build-coordinator-metrics-registry build-coordinator) "build_allocation_plan_total" #:labels '(agent_id)))) (with-time-logging "allocator initialise metrics" (for-each (match-lambda ((agent-id . count) (metric-set build-allocation-plan-total count #:label-values `((agent_id . ,agent-id))))) (datastore-count-build-allocation-plan-entries (build-coordinator-datastore build-coordinator))))) (update-build-allocation-plan-loop))))) trigger-build-allocation) (define (spawn-fiber-to-watch-for-deferred-builds coordinator) (spawn-fiber (lambda () (while #t (sleep 60) ; 1 minute (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception when watching for deferred builds: ~A\n" exn)) (lambda () (let ((first-deferred-build (datastore-find-first-unallocated-deferred-build (build-coordinator-datastore coordinator)))) (when (and first-deferred-build (time<=? (date->time-utc (assq-ref first-deferred-build 'deferred-until)) (current-time))) (simple-format #t "guix-build-coordinator: triggering build allocation for deferred build: ~A\n" (assq-ref first-deferred-build 'uuid)) (trigger-build-allocation coordinator)))) #:unwind? #t))) #:parallel? #t)) (define (start-hook-processing-threads build-coordinator parallel-hooks) (define datastore (build-coordinator-datastore build-coordinator)) (define success-counter-metric (make-counter-metric (build-coordinator-metrics-registry build-coordinator) "hook_success_total" #:labels '(event))) (define failure-counter-metric (make-counter-metric (build-coordinator-metrics-registry build-coordinator) "hook_failure_total" #:labels '(event))) (define (process-event id event arguments handler) (log-msg (build-coordinator-logger build-coordinator) 'DEBUG "running " event " handler (id: " id ", arguments: " arguments ")") (and (with-exception-handler (lambda (exn) (log-msg (build-coordinator-logger build-coordinator) 'ERROR "error running " event " (" id ") hook: " exn) (metric-increment failure-counter-metric #:label-values `((event . ,event))) (sleep 10) #f) (lambda () (call-with-duration-metric (build-coordinator-metrics-registry build-coordinator) "hook_duration_seconds" (lambda () (with-throw-handler #t (lambda () (start-stack 'hook (apply handler build-coordinator arguments))) (lambda (key . args) (log-msg (build-coordinator-logger build-coordinator) 'ERROR "error running " event " (" id ") hook: " key " " args) (let* ((stack (make-stack #t 3)) (backtrace (call-with-output-string (lambda (port) (display-backtrace stack port) (newline port))))) (display backtrace (current-output-port)))))) #:labels '(event) #:label-values `((event . ,event))) #t) #:unwind? #t) (begin (log-msg (build-coordinator-logger build-coordinator) 'DEBUG event " (" id ") handler finished") (datastore-delete-unprocessed-hook-event datastore id) ;; If this is the hook for a successful build, once the hook ;; completed successfully, delete the nar files for this build. (when (eq? 'build-success event) (match arguments ((build-id) (let ((data-location (build-data-location build-id))) (when (file-exists? data-location) (delete-file-recursively data-location)))))) (metric-increment success-counter-metric #:label-values `((event . ,event)))))) (define (single-thread-process-events event-name handler) (let ((mtx (make-mutex)) (condvar (make-condition-variable))) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (symbol->string event-name))) (const #t)) (lock-mutex mtx) (while #t (with-exception-handler (lambda (exn) (with-exception-handler (lambda _ ;; Things are really going wrong if logging about ;; the hook processing thread crashing, also raises ;; an exception, so just try and sleep and hope ;; things go better next time (sleep 10)) (lambda () (log-msg (build-coordinator-logger build-coordinator) 'CRITICAL "hook processing thread " event-name " exception: " exn)) #:unwind? #t) (sleep 10)) (lambda () (with-throw-handler #t (lambda () (while #t (match (datastore-list-unprocessed-hook-events datastore event-name 1) (() (wait-condition-variable condvar mtx)) (((id event arguments)) (process-event id event arguments handler))))) (lambda (key . args) (log-msg (build-coordinator-logger build-coordinator) 'CRITICAL "error in " event-name " hook processing thread: " key " " args) (backtrace)))) #:unwind? #t)))) condvar)) (define (work-queue-process-events event-name handler thread-count) (let-values (((pool-mutex job-available count-threads list-jobs) (create-thread-pool (lambda () (max 1 (length (datastore-list-unprocessed-hook-events datastore event-name thread-count)))) (lambda (running-jobs) (let* ((in-progress-ids (map car running-jobs)) (potential-jobs (datastore-list-unprocessed-hook-events datastore event-name (+ 1 (length in-progress-ids))))) (find (match-lambda ((id rest ...) (not (member id in-progress-ids)))) potential-jobs))) (lambda (id event arguments) (process-event id event arguments handler)) #:name (symbol->string event-name)))) job-available)) (map (match-lambda ((event-name . handler) (cons event-name (or (and=> (assq-ref parallel-hooks event-name) (lambda (thread-count) (work-queue-process-events event-name handler thread-count))) (single-thread-process-events event-name handler))))) (build-coordinator-hooks build-coordinator))) (define (fetch-builds build-coordinator agent systems max-builds deprecated-requested-count) (define datastore (build-coordinator-datastore build-coordinator)) (define (allocate-one-build agent-id) (let ((build-details (datastore-fetch-build-to-allocate datastore agent-id))) (if build-details (let ((build-id (assq-ref build-details 'uuid))) (datastore-insert-to-allocated-builds datastore agent-id (list build-id)) (datastore-remove-builds-from-plan datastore (list build-id)) build-details) #f))) (define (allocate-several-builds agent-id count) (let loop ((builds '())) (if (= (length builds) count) builds (let ((build-details (allocate-one-build agent-id))) (if build-details (loop (cons build-details builds)) builds))))) (define build-submit-outputs-hook (assq-ref (build-coordinator-hooks build-coordinator) 'build-submit-outputs)) (define (get-builds) (datastore-call-with-transaction datastore (lambda _ (let* ((initially-allocated-builds (datastore-list-agent-builds datastore agent)) (start-count (length initially-allocated-builds)) (target-count (or max-builds (+ start-count deprecated-requested-count)))) (if (< start-count target-count) (let ((new-builds (allocate-several-builds agent (- target-count start-count)))) (unless (null? new-builds) (let ((allocation-plan-metric (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) "build_allocation_plan_total"))) (for-each (match-lambda ((agent-id . count) (metric-set allocation-plan-metric count #:label-values `((agent_id . ,agent-id))))) (datastore-count-build-allocation-plan-entries datastore)))) ;; Previously allocate builds just returned newly allocated ;; builds, but if max-builds is provided, return all the ;; builds. This means the agent can handle this in a idempotent ;; manor. (if max-builds (append initially-allocated-builds new-builds) new-builds)) ;; Previously allocate builds just returned newly allocated builds, ;; but if max-builds is provided, return all the builds. This means ;; the agent can handle this in a idempotent manor. (if max-builds initially-allocated-builds '())))) #:duration-metric-name "allocate_builds_to_agent")) (call-with-duration-metric (build-coordinator-metrics-registry build-coordinator) "coordinator_fetch_builds_duration_seconds" (lambda () (call-with-delay-logging (lambda () (let ((update-made (datastore-update-agent-requested-systems (build-coordinator-datastore build-coordinator) agent systems))) (when update-made (trigger-build-allocation build-coordinator))) (let ((builds (get-builds))) (build-coordinator-send-event build-coordinator "agent-builds-allocated" `((agent_id . ,agent) (builds . ,(list->vector (map (lambda (build) `(,@build (tags . ,(list->vector (map (match-lambda ((key . value) `((key . ,key) (value . ,value)))) (vector->list (datastore-fetch-build-tags datastore (assq-ref build 'uuid)))))))) builds))))) (map (lambda (build) (define submit-outputs? (with-exception-handler (lambda (exn) (log-msg (build-coordinator-logger build-coordinator) 'CRITICAL "build-submit-outputs hook raised exception: " exn)) (lambda () (with-throw-handler #t (lambda () (let ((hook-result (call-with-delay-logging (lambda () (build-submit-outputs-hook build-coordinator (assq-ref build 'uuid)))))) (if (boolean? hook-result) hook-result (begin (log-msg (build-coordinator-logger build-coordinator) 'CRITICAL "build-submit-outputs hook returned non boolean: " hook-result) #t)))) (lambda (key . args) (backtrace)))) #:unwind? #t)) `(,@build ;; TODO This needs reconsidering when things having been built in ;; the past doesn't necessarily mean they're still available. (submit_outputs . ,submit-outputs?))) builds))))))) (define (agent-details build-coordinator agent-id) (define datastore (build-coordinator-datastore build-coordinator)) (define build-submit-outputs-hook (assq-ref (build-coordinator-hooks build-coordinator) 'build-submit-outputs)) (define (submit-outputs? build) (with-exception-handler (lambda (exn) (log-msg (build-coordinator-logger build-coordinator) 'CRITICAL "build-submit-outputs hook raised exception: " exn)) (lambda () (with-throw-handler #t (lambda () (let ((hook-result (call-with-delay-logging (lambda () (build-submit-outputs-hook build-coordinator (assq-ref build 'uuid)))))) (if (boolean? hook-result) hook-result (begin (log-msg (build-coordinator-logger build-coordinator) 'CRITICAL "build-submit-outputs hook returned non boolean: " hook-result) #t)))) (lambda (key . args) (backtrace)))) #:unwind? #t)) (let ((agent (datastore-find-agent datastore agent-id)) (allocated-builds (datastore-list-agent-builds datastore agent-id))) `(,@agent ; description (builds . ,(list->vector (map (lambda (build) `(,@build (submit_outputs . ,(submit-outputs? build)))) allocated-builds)))))) (define (build-data-location build-id ) (string-append (%config 'builds-dir) "/" build-id)) (define (build-output-file-location datastore build-id output-name) (let ((output (datastore-find-build-output datastore build-id output-name))) (string-append (build-data-location build-id) "/outputs/" output-name "/" (basename output) ".nar.lz"))) (define (build-log-file-directory build-id) (string-append (%config 'build-logs-dir) "/" build-id)) (define (build-log-file-destination build-id format) (string-append (build-log-file-directory build-id) "/" (cond ((string=? format "bzip2") "log.bz2") ((string=? format "gzip") "log.gz") (else (error "unknown log format" format))))) (define (build-log-file-location build-id) (let* ((directory (build-log-file-directory build-id)) (potential-files (scandir directory (lambda (file) (and (not (member file '("." ".."))) (not (string-suffix? ".tmp" file))))))) (match potential-files ((file) (string-append directory "/" file)) (() #f) (#f #f) ; directory doesn't exist (files (error (simple-format #f "found multiple files for ~A: ~A" build-id files)))))) (define (handle-build-result build-coordinator agent-id build-id result-json) (define datastore (build-coordinator-datastore build-coordinator)) (call-with-duration-metric (build-coordinator-metrics-registry build-coordinator) "coordinator_handle_build_result_duration_seconds" (lambda () (let* ((result (assoc-ref result-json "result")) (success? (string=? result "success"))) (let ((build-details (datastore-find-build datastore build-id))) (when (assq-ref build-details 'processed) (raise-exception (make-agent-error 'build_already_processed))) (when success? (unless (build-log-file-location build-id) (raise-exception (make-agent-error 'missing_build_log_file))) (let ((outputs (vector->list (assoc-ref result-json "outputs")))) (for-each (lambda (output) (let* ((output-name (assq-ref output 'name)) (output-location (build-output-file-location datastore build-id output-name)) (hash-file (string-append output-location ".hash"))) (unless (and (file-exists? hash-file) (file-exists? output-location)) (raise-exception (make-agent-error `((missing_output . ,output-name))))) (let ((hash (call-with-input-file hash-file read-line)) (expected-hash (assoc-ref (find (lambda (output-details) (string=? (assoc-ref output-details "name") output-name)) outputs) "hash"))) (unless (and (string? hash) ; it can be eof (string=? hash expected-hash)) (delete-file output-location) (delete-file hash-file) (log-msg (build-coordinator-logger build-coordinator) 'WARN build-id ": hash mismatch for output: " output-name) (raise-exception (make-agent-error `((missing_output . ,output-name)))))))) ;; TODO This requires that agents only submit outputs that are ;; considered unbuilt. This matches up with the criteria in ;; fetch-builds for telling agents whether to submit outputs. ;; ;; I'm hoping this will be useful, but I'm not sure how elegant ;; this approach is. (datastore-list-unbuilt-derivation-outputs datastore (assq-ref build-details 'derivation-name)))))) (let ((exception (datastore-call-with-transaction datastore (lambda _ (let ((build-details (datastore-find-build datastore build-id))) (cond ((assq-ref build-details 'processed) (make-agent-error 'build_already_processed)) ((assq-ref build-details 'canceled) (datastore-remove-build-allocation datastore build-id agent-id) (make-agent-error 'cannot_store_result_for_canceled_build)) (else (datastore-insert-build-result datastore build-id agent-id (if success? "success" "failure") #f) ; failure-reason TODO (datastore-remove-build-allocation datastore build-id agent-id) (datastore-mark-build-as-processed datastore build-id ;; TODO Check what the value of this is (assoc-ref result-json "end_time")) (datastore-insert-unprocessed-hook-event datastore (if (string=? result "success") "build-success" "build-failure") (list build-id)) (when success? (datastore-delete-relevant-outputs-from-unbuilt-outputs datastore build-id) (datastore-update-unprocessed-builds-for-build-success datastore build-id) (datastore-store-output-metadata datastore build-id (vector->list (assoc-ref result-json "outputs")))) #f)))) #:duration-metric-name "store_build_result"))) (when exception ;; Raise the exception here to avoid aborting the transaction (raise-exception exception))) (log-msg (build-coordinator-logger build-coordinator) 'INFO build-id ": processed result: " result) (build-coordinator-prompt-hook-processing-for-event build-coordinator (if success? 'build-success 'build-failure)) (build-coordinator-send-event build-coordinator (if success? 'build-success 'build-failure) `((build_id . ,build-id) (agent_id . ,agent-id))) ;; Trigger build allocation, as the result of this build ;; could change the allocation (trigger-build-allocation build-coordinator) #t)))) (define (handle-build-start-report build-coordinator agent-id build-id) (call-with-duration-metric (build-coordinator-metrics-registry build-coordinator) "coordinator_handle_build_start_report_duration_seconds" (lambda () (datastore-store-build-start (build-coordinator-datastore build-coordinator) build-id agent-id) (build-coordinator-prompt-hook-processing-for-event build-coordinator 'build-started) (build-coordinator-send-event build-coordinator 'build-started `((build_id . ,build-id) (agent_id . ,agent-id)))))) (define (handle-setup-failure-report build-coordinator agent-id build-id report-json) (define datastore (build-coordinator-datastore build-coordinator)) (let ((failure-reason (assoc-ref report-json "failure_reason"))) (if (string=? failure-reason "missing_inputs") ;; For missing inputs, we need to store the inputs that were missing, ;; so that has a special function (let ((missing-inputs (vector->list (assoc-ref report-json "missing_inputs")))) (datastore-store-setup-failure/missing-inputs datastore build-id agent-id missing-inputs) (build-coordinator-prompt-hook-processing-for-event build-coordinator 'build-missing-inputs)) (datastore-store-setup-failure datastore build-id agent-id failure-reason))) (build-coordinator-send-event build-coordinator 'build-setup-failure `((build_id . ,build-id) (agent_id . ,agent-id) (report . ,report-json))) ;; Trigger build allocation, so that the allocator can handle this setup ;; failure (trigger-build-allocation build-coordinator))