;;; Guix Build Coordinator ;;; ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of the guix-build-coordinator. ;;; ;;; The Guix Build Coordinator is free software; you can redistribute ;;; it and/or modify it under the terms of the GNU General Public ;;; License as published by the Free Software Foundation; either ;;; version 3 of the License, or (at your option) any later version. ;;; ;;; The Guix Build Coordinator is distributed in the hope that it will ;;; be useful, but WITHOUT ANY WARRANTY; without even the implied ;;; warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ;;; See the GNU General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (guix-build-coordinator agent) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-43) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 futures) #:use-module (ice-9 atomic) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (ice-9 textual-ports) #:use-module (rnrs bytevectors) #:use-module (web http) #:use-module (oop goops) #:use-module (logging logger) #:use-module (logging port-log) #:use-module (prometheus) #:use-module (guix store) #:use-module (guix derivations) #:use-module (guix base32) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator agent-messaging) #:use-module (guix-build-coordinator agent-messaging abstract) #:export (run-agent)) (define-record-type (make-upload-progress file bytes-sent) upload-progress? (file upload-progress-file) (bytes-sent upload-progress-bytes-sent set-upload-progress-bytes-sent!)) (define (run-agent uuid coordinator-interface systems max-parallel-builds derivation-substitute-urls non-derivation-substitute-urls metrics-file max-1min-load-average) (define lgr (make )) (define port-log (make #:port (current-output-port) #:formatter (lambda (lvl time str) (format #f "~a (~5a): ~a~%" (strftime "%F %H:%M:%S" (localtime time)) lvl str)))) (define metrics-enabled? (and (not (string-null? metrics-file)) (let ((directory (dirname metrics-file))) (or (file-exists? directory) (begin (simple-format (current-error-port) "skipping writing metrics as ~A does not exist\n" directory) #f))) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "skipping writing metrics, encountered exception ~A\n" exn) #f) (lambda () (let ((test-file (string-append metrics-file "-tmp"))) (call-with-output-file test-file (lambda (port) (display "test" port))) (delete-file test-file) #t)) #:unwind? #t))) (define metrics-registry (make-metrics-registry #:namespace "guixbuildcoordinator_agent")) (define (write-metrics) (when metrics-enabled? (write-textfile metrics-registry metrics-file))) (define parallel-uploads 3) (define upload-slots (make-vector parallel-uploads #f)) (define queued-uploads '()) (define uploads-mutex (make-mutex)) (define uploads-condition-variable (make-condition-variable)) (define (with-upload-slot lgr file p) (define upload-progress-record (make-upload-progress file 0)) (define last-progress-update-bytes-sent 0) (define last-progress-update-time 0) (define (report-bytes-sent bytes) (let ((bytes-now-sent (+ (upload-progress-bytes-sent upload-progress-record) bytes))) (set-upload-progress-bytes-sent! upload-progress-record bytes-now-sent) (let ((uploads-count (vector-count (lambda (_ slot) (not (eq? #f slot))) upload-slots)) (queued-uploads-count (length queued-uploads))) (when (or (> bytes-now-sent (+ last-progress-update-bytes-sent 20000000)) (and (> queued-uploads-count 0) (> (- (time-second (current-time))) (+ last-progress-update-time 30)))) (set! last-progress-update-bytes-sent bytes-now-sent) (set! last-progress-update-time (time-second (current-time))) (log-msg lgr 'INFO uploads-count " uploads in progress, " queued-uploads-count " queued") (vector-for-each (lambda (_ upload-progress) (when upload-progress (log-msg lgr 'INFO (upload-progress-file upload-progress) ": " (rationalize (exact->inexact (/ (upload-progress-bytes-sent upload-progress) 1000000)) 0.1) "MB sent"))) upload-slots))))) (define (free-slot index) (with-mutex uploads-mutex (vector-set! upload-slots index #f)) (signal-condition-variable uploads-condition-variable)) (lock-mutex uploads-mutex) (set! queued-uploads (cons file queued-uploads)) (let loop () (let ((free-index (any (lambda (index) (if (eq? (vector-ref upload-slots index) #f) index #f)) (iota (vector-length upload-slots) 0)))) (if free-index (begin (vector-set! upload-slots free-index upload-progress-record) (set! queued-uploads (delete file queued-uploads string=?)) (unlock-mutex uploads-mutex) (call-with-values (lambda () (with-exception-handler (lambda (exn) (free-slot free-index) (raise-exception exn)) (lambda () (p report-bytes-sent)) #:unwind? #t)) (lambda vals (free-slot free-index) (apply values vals)))) (begin (wait-condition-variable uploads-condition-variable uploads-mutex (+ 240 (time-second (current-time)))) (loop)))))) (define (process-job build perform-post-build-actions) (let ((build-id (assoc-ref build "uuid")) (derivation-name (or (assoc-ref build "derivation_name") (assoc-ref build "derivation-name"))) (submit-outputs? (match (assoc "submit_outputs" build) ((_ . val) val) (#f #t)))) ; default to submitting outputs (log-msg lgr 'INFO build-id ": setting up to build: " derivation-name) (with-store store (let ((pre-build-status (call-with-duration-metric metrics-registry "pre_build_duration_seconds" (lambda () (pre-build-process lgr store build-id derivation-substitute-urls non-derivation-substitute-urls derivation-name)) #:buckets (list 1 2.5 5 10 25 50 100 200 500 1000 (inf))))) (write-metrics) (if (eq? (assq-ref pre-build-status 'result) 'success) (begin (log-msg lgr 'INFO build-id ": setup successful, building: " derivation-name) (report-build-start coordinator-interface build-id #:log (build-log-procedure lgr build-id)) (let* ((result (perform-build lgr store build-id derivation-name)) ;; TODO Check this handles timezones right (end-time (localtime (time-second (current-time)) "UTC"))) (perform-post-build-actions build (lambda () (agent-submit-log-file lgr coordinator-interface build-id derivation-name) (if result (post-build-success lgr coordinator-interface build-id derivation-name end-time submit-outputs? with-upload-slot) (post-build-failure lgr coordinator-interface build-id derivation-name end-time)) (log-msg lgr 'INFO build-id ": finished processing: " derivation-name))))) (begin (log-msg lgr 'INFO build-id ": setup failure: " (assq-ref pre-build-status 'failure_reason)) (report-setup-failure coordinator-interface build-id pre-build-status #:log (build-log-procedure lgr build-id)))))))) (define (current-max-builds) (let ((current-load (get-load-average #:period 1))) (if (>= current-load max-1min-load-average) 1 max-parallel-builds))) (add-handler! lgr port-log) (open-log! lgr) (log-msg lgr 'INFO "starting agent " uuid) (log-msg lgr 'INFO "connecting to coordinator " (slot-ref coordinator-interface 'coordinator-uri)) (let*-values (((perform-post-build-actions count-post-build-jobs count-post-build-threads list-post-build-jobs) (create-work-queue #f ;; One thread per job (lambda (build thunk) (thunk)))) ((process-job-with-queue count-jobs count-threads list-jobs) (create-work-queue current-max-builds (lambda (build) (process-job build perform-post-build-actions)) #:thread-start-delay (make-time time-duration 0 (max 5 (- 135 (* 120 (/ max-parallel-builds 64))))) #:thread-stop-delay (make-time time-duration 0 20)))) (define (display-info) (display (simple-format #f "current threads: ~A current jobs: ~A\n~A\n" (count-threads) (+ (count-jobs) (count-post-build-jobs)) (string-append (string-join (map (match-lambda ((build-details) (simple-format #f " - ~A (priority: ~A) ~A" (assoc-ref build-details "uuid") (assoc-ref build-details "priority") (assoc-ref build-details "derivation-name")))) (list-jobs)) "\n") "\n" (string-join (map (match-lambda ((build-details _) (simple-format #f " - ~A (priority: ~A) ~A" (assoc-ref build-details "uuid") (assoc-ref build-details "priority") (assoc-ref build-details "derivation-name")))) (list-post-build-jobs)) "\n"))) (current-error-port))) (let ((details (submit-status coordinator-interface 'idle #:log (build-log-procedure lgr (assoc-ref build "uuid"))))) (for-each (lambda (job-args) (process-job-with-queue job-args)) (vector->list (assoc-ref details "builds"))) (unless (running-on-the-hurd?) (call-with-new-thread (lambda () (sigaction SIGUSR1 (lambda _ (display-info))) (while #t (sleep 100000)))) (call-with-new-thread (lambda () (let loop ((line (get-line (current-input-port)))) (unless (eof-object? line) (display-info) (loop (get-line (current-input-port)))))))) (while #t (let ((current-threads (count-threads)) (job-count (count-jobs))) (if (or (< job-count current-threads) (= job-count 0)) (let* ((queued-build-ids (append (map (lambda (job-args) (assoc-ref (car job-args) "uuid")) (append (list-jobs) (list-post-build-jobs))))) (fetched-builds (fetch-builds-for-agent coordinator-interface systems (+ (max current-threads 1) (count-post-build-jobs)) #:log (build-log-procedure lgr))) (new-builds (remove (lambda (build) (member (assoc-ref build "uuid") queued-build-ids)) fetched-builds))) (log-msg lgr 'INFO "running " current-threads " threads, currently allocated " (length fetched-builds) " builds") (log-msg lgr 'INFO "starting " (length new-builds) " new " (if (eq? (length new-builds) 1) "build" "builds")) (for-each (lambda (job-args) (process-job-with-queue job-args)) new-builds) (when (null? new-builds) (sleep 5))) (sleep 3))))))) (define* (build-log-procedure lgr #:optional build-id) (lambda (level . components) (apply log-msg lgr level (if build-id (cons* build-id ": " components) components)))) (define (agent-submit-log-file lgr coordinator-interface build-id derivation-name) (let ((log-file ;; TODO Not sure if retrying here is useful? (retry-on-error (lambda () (let ((file (derivation-log-file derivation-name))) (or file (raise-exception (make-exception-with-message (simple-format #f "log file missing for ~A (~A)" derivation-name build-id)))))) #:times 3 #:delay 3))) (retry-on-error (lambda () (log-msg lgr 'INFO build-id ": uploading log file " log-file) (submit-log-file coordinator-interface build-id log-file #:log (build-log-procedure lgr build-id))) #:times 12 #:delay 10))) (define (pre-build-process lgr store build-id derivation-substitute-urls non-derivation-substitute-urls derivation-name) (define (find-missing-inputs derivation inputs) (log-msg lgr 'DEBUG build-id ": checking the availability of build inputs") (let* ((output-paths (append-map derivation-input-output-paths inputs)) (missing-paths (remove (lambda (path) (valid-path? store path)) output-paths)) (path-substitutes (begin (with-store store (apply set-build-options store `(,@(if non-derivation-substitute-urls (list #:substitute-urls non-derivation-substitute-urls) '()) #:max-silent-time 120 #:timeout ,(* 10 60))) (unless non-derivation-substitute-urls (log-msg lgr 'WARN "non-derivation-substitute-urls unset, unable to query substitute servers without caching")) (map (lambda (file) (let ((substitute-urls-with-substitute (if (list? non-derivation-substitute-urls) (retry-on-error (lambda () (has-substiutes-no-cache? non-derivation-substitute-urls file)) #:times 20 #:delay (random 15)) #f))) (and (if (eq? substitute-urls-with-substitute #f) #t ; keep going (not (null? substitute-urls-with-substitute))) (let ((log-port (open-output-string))) ;; TODO Do something with the logged output (parameterize ((current-build-output-port log-port)) (if (has-substitutes? store file) #t (begin (log-msg lgr 'WARN "a substitute should be available for " file ", but the guix-daemon claims it's unavailable" (if substitute-urls-with-substitute (string-append ":\n" (string-join substitute-urls-with-substitute "\n")) "")) #f))))))) missing-paths))))) (if (null? missing-paths) '() (if (member #f path-substitutes) (fold (lambda (file substitute-available? result) (if substitute-available? result (cons file result))) '() missing-paths path-substitutes) (begin (retry-on-error (lambda () (with-timeout (* 1000 60 11) ; 11 minutes (raise-exception (make-exception-with-message "timeout fetching inputs")) (begin (with-store store ;; Download the substitutes (apply set-build-options store `(,@(if non-derivation-substitute-urls (list #:substitute-urls non-derivation-substitute-urls) '()) #:max-silent-time 120 #:timeout ,(* 60 60))) (let ((log-port (open-output-string))) (with-throw-handler #t (lambda () (parameterize ((current-build-output-port log-port)) (build-things store missing-paths))) (lambda (key . args) (log-msg lgr 'ERROR "exception when fetching missing paths " key ": " args) (display (get-output-string log-port)) (display (newline)) (close-output-port log-port)))))))) #:times 12 #:delay (random 15)) ;; Double check everything is actually present. (let ((missing-files (remove (lambda (path) (valid-path? store path)) output-paths))) (if (null? missing-files) '() (begin (log-msg lgr 'WARN "failed to fetch substitutes for " missing-files) (let ((unavailable-outputs (delete-duplicates (append-map (lambda (missing-output) (find-missing-substitutes-for-output store non-derivation-substitute-urls missing-output)) missing-files)))) (log-msg lgr 'WARN "the following outputs are missing: " (string-join (map (lambda (output) (string-append " - " output)) unavailable-outputs))) (if (null? unavailable-outputs) ; TODO This probably ; shouldn't happen missing-files unavailable-outputs)))))))))) (define (delete-outputs derivation) (let* ((outputs (derivation-outputs derivation)) (output-file-names (map derivation-output-path (map cdr outputs)))) (if (any file-exists? output-file-names) (let ((log-port (open-output-string))) (catch #t (lambda () (log-msg lgr 'DEBUG build-id ": deleting " (if (eq? (length output-file-names) 1) "output" "outputs")) ;; There can be issues deleting links when collecting garbage ;; from multiple threads (monitor (with-store store ;; TODO Do something with the logged output (parameterize ((current-build-output-port log-port)) (delete-paths store output-file-names)))) #t) (lambda (key args) (display (get-output-string log-port)) (log-msg lgr 'ERROR "delete-outputs: " key args) #f))) #t))) (let ((derivation (if (valid-path? store derivation-name) (read-derivation-from-file derivation-name) (and (with-exception-handler (lambda (exn) (log-msg lgr 'ERROR "exception when reading/fetching derivation: " exn) #f) (lambda () (log-msg lgr 'DEBUG build-id ": substituting derivation") (retry-on-error (lambda () (substitute-derivation derivation-name #:substitute-urls derivation-substitute-urls)) #:times 20 #:delay (random 15)) #t) #:unwind? #t) (read-derivation-from-file derivation-name))))) (if derivation (begin (log-msg lgr 'DEBUG build-id ": derivation read from file") (match (delete-outputs derivation) (#t (let ((missing-inputs (find-missing-inputs derivation (derivation-inputs derivation)))) (if (null? missing-inputs) '((result . success)) `((result . failure) (failure_reason . missing_inputs) (missing_inputs . ,(list->vector missing-inputs)))))) (failure '((result . failure) (failure_reason . could_not_delete_outputs))))) '((result . failure) (failure_reason . error_fetching_derivation))))) (define (perform-build lgr store build-id derivation-name) (set-build-options store #:use-substitutes? #f) (parameterize ((current-build-output-port (%make-void-port "w"))) (with-exception-handler (lambda (exn) (if (store-protocol-error? exn) (cond ((eq? (store-protocol-error-status exn) 100) (log-msg lgr 'INFO build-id ": build failed")) ((eq? (store-protocol-error-status exn) 101) (log-msg lgr 'INFO build-id ": build failed due to a timeout")) (else (log-msg lgr 'ERROR build-id ": unknown error status " (store-protocol-error-status exn)))) (log-msg lgr 'ERROR build-id ": unknown exception " exn)) #f) (lambda () (build-things store (list derivation-name)) #t) #:unwind? #t))) (define (post-build-failure lgr coordinator-interface build-id derivation end-time) (log-msg lgr 'INFO build-id ": build failed") (with-exception-handler (lambda (exn) (unless (agent-error-from-coordinator? exn) (raise-exception exn)) (let ((details (agent-error-from-coordinator-details exn))) (if (string? details) (cond ((string=? details "build_already_processed") (log-msg lgr 'WARN build-id ": coordinator indicates this build is already marked as processed") #t) (else (raise-exception (make-exception (make-exception-with-message "unrecognised error from coordinator") (make-exception-with-irritants details))))) (raise-exception (make-exception (make-exception-with-message "unrecognised error from coordinator") (make-exception-with-irritants details)))))) (lambda () (submit-build-result coordinator-interface build-id `((result . failure) (end_time . ,(strftime "%F %T" end-time))) #:log (build-log-procedure lgr build-id))) #:unwind? #t)) (define (post-build-success lgr coordinator-interface build-id derivation end-time submit-outputs? with-upload-slot) (define output-details (map (match-lambda ((output-name . output) (let ((path-info (with-store store (query-path-info store (derivation-output-path output))))) `((name . ,output-name) (hash . ,(bytevector->nix-base32-string (path-info-hash path-info))) (size . ,(path-info-nar-size path-info)) (references . ,(list->vector (map basename (path-info-references path-info)))))))) (derivation-outputs (read-derivation-from-file derivation)))) (define (attempt-submit-build-result) (with-exception-handler (lambda (exn) (unless (agent-error-from-coordinator? exn) (raise-exception exn)) (let ((details (agent-error-from-coordinator-details exn))) (if (string? details) (cond ((string=? details "build_already_processed") (log-msg lgr 'WARN build-id ": coordinator indicates this build is already marked as processed") #t) ((string=? details "cannot_store_result_for_canceled_build") (log-msg lgr 'WARN build-id ": coordinator indicates this build is now canceled") #t) ((string=? details "missing_build_log_file") ;; Retry submitting the log file (agent-submit-log-file lgr coordinator-interface build-id derivation) (attempt-submit-build-result)) (else (raise-exception (make-exception (make-exception-with-message "unrecognised error from coordinator") (make-exception-with-irritants details))))) (or (and=> (assoc-ref details "missing_output") (lambda (missing-output-name) (let ((missing-output (any (match-lambda ((name . output) (if (string=? name missing-output-name) output #f))) (derivation-outputs (read-derivation-from-file derivation))))) (unless missing-output (raise-exception (make-exception (make-exception-with-message "unknown missing output") (make-exception-with-irritants missing-output-name)))) (submit-one-output missing-output-name missing-output)) (attempt-submit-build-result))) (raise-exception (make-exception (make-exception-with-message "unrecognised error from coordinator") (make-exception-with-irritants details))))))) (lambda () (submit-build-result coordinator-interface build-id `((result . success) (end_time . ,(strftime "%F %T" end-time)) (outputs . ,(list->vector output-details))) #:log (build-log-procedure lgr build-id))) #:unwind? #t)) (define (submit-one-output output-name output) (retry-on-error (lambda () (with-upload-slot lgr (derivation-output-path output) (lambda (report-bytes-sent) (log-msg lgr 'INFO build-id ": submitting output " (derivation-output-path output)) (submit-output coordinator-interface build-id output-name (derivation-output-path output) #:log (build-log-procedure lgr build-id) #:report-bytes-sent report-bytes-sent)))) #:times 48 #:delay (random 15))) (if submit-outputs? (begin (log-msg lgr 'INFO build-id ": build successful, submitting outputs") (for-each (match-lambda ((output-name . output) (submit-one-output output-name output))) (derivation-outputs (read-derivation-from-file derivation))) (log-msg lgr 'INFO build-id ": finished submitting outputs, reporting build success")) (begin (log-msg lgr 'INFO build-id ": build successful, skipping submitting outputs") (log-msg lgr 'INFO build-id ": reporting build success"))) (attempt-submit-build-result))