;;; Guix Build Coordinator ;;; ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of the guix-build-coordinator. ;;; ;;; The Guix Build Coordinator is free software; you can redistribute ;;; it and/or modify it under the terms of the GNU General Public ;;; License as published by the Free Software Foundation; either ;;; version 3 of the License, or (at your option) any later version. ;;; ;;; The Guix Build Coordinator is distributed in the hope that it will ;;; be useful, but WITHOUT ANY WARRANTY; without even the implied ;;; warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ;;; See the GNU General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (guix-build-coordinator agent) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 futures) #:use-module (ice-9 atomic) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (ice-9 textual-ports) #:use-module (rnrs bytevectors) #:use-module (web http) #:use-module (oop goops) #:use-module (logging logger) #:use-module (logging port-log) #:use-module (prometheus) #:use-module (lzlib) #:use-module (guix store) #:use-module (guix progress) #:use-module (guix derivations) #:use-module (guix base32) #:use-module (guix serialization) #:use-module ((guix build syscalls) #:select (set-thread-name free-disk-space)) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator agent-messaging) #:use-module (guix-build-coordinator agent-messaging abstract) #:export (run-agent)) (define-record-type (make-upload-progress file bytes-sent bytes-hashed total-bytes) upload-progress? (file upload-progress-file set-upload-progress-file!) (bytes-sent upload-progress-bytes-sent set-upload-progress-bytes-sent!) (bytes-hashed upload-progress-bytes-hashed set-upload-progress-bytes-hashed!) (total-bytes upload-progress-total-bytes set-upload-progress-total-bytes!)) (define temporary-directory (or (getenv "TMPDIR") "/tmp")) (define (run-agent uuid coordinator-interface systems max-parallel-builds max-allocated-builds max-parallel-uploads derivation-substitute-urls non-derivation-substitute-urls metrics-file max-1min-load-average timestamp-log-output?) (define lgr (make )) (define port-log (make #:port (current-output-port) #:formatter ;; In guile-lib v0.2.8 onwards, the formatter is ;; called with more arguments (lambda args ; lvl, time, str (format #f "~a(~5a): ~a~%" (if timestamp-log-output? (strftime "%F %H:%M:%S " (localtime (second args))) "") (first args) (third args))))) (define metrics-enabled? (and (not (string-null? metrics-file)) (let ((directory (dirname metrics-file))) (or (file-exists? directory) (begin (simple-format (current-error-port) "skipping writing metrics as ~A does not exist\n" directory) #f))) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "skipping writing metrics, encountered exception ~A\n" exn) #f) (lambda () (let ((test-file (string-append metrics-file "-tmp"))) (call-with-output-file test-file (lambda (port) (display "test" port))) (delete-file test-file) #t)) #:unwind? #t))) (define metrics-registry (make-metrics-registry #:namespace "guixbuildcoordinator_agent")) (define (write-metrics) (when metrics-enabled? (write-textfile metrics-registry metrics-file))) (define (make-uploads-updater list-jobs) (lambda () (let ((upload-progress-records (filter-map (match-lambda ((build upload-progress thunk) (if (upload-progress-file upload-progress) upload-progress #f))) (list-jobs)))) (log-msg lgr 'INFO (length upload-progress-records) " uploads in progress") (for-each (lambda (upload-progress) (when upload-progress (log-msg lgr 'INFO (upload-progress-file upload-progress) ": " (let ((total-bytes (upload-progress-total-bytes upload-progress)) (bytes-sent (upload-progress-bytes-sent upload-progress)) (bytes-hashed (upload-progress-bytes-hashed upload-progress))) (if (and (= bytes-sent total-bytes) (> bytes-hashed 0)) (format #f "uploaded, ~2,2f/~2,2fMB hashed" (/ bytes-hashed 1000000) (/ total-bytes 1000000)) (format #f "~2,2f/~2,2fMB sent" (/ bytes-sent 1000000) (/ total-bytes 1000000))))))) upload-progress-records)))) (define (process-job build perform-post-build-actions uploads-updater) (let ((build-id (assoc-ref build "uuid")) (derivation-name (or (assoc-ref build "derivation_name") (assoc-ref build "derivation-name"))) (submit-outputs? (match (assoc "submit_outputs" build) ((_ . val) val) (#f #t)))) ; default to submitting outputs (define (get-compressed-outputs store) (let ((outputs (derivation-outputs (read-derivation-from-file* derivation-name)))) (for-each (match-lambda ((_ . output) (add-temp-root store (derivation-output-path output)))) outputs) (map (match-lambda ((output-name . output) (let* ((file (derivation-output-path output)) (directory (or (getenv "TMPDIR") "/tmp")) (template (string-append directory "/guix-build-coordinator-file.XXXXXX")) (out (mkstemp! template))) (log-msg lgr 'INFO build-id ": compressing " file " -> " template " prior to sending") (call-with-lzip-output-port out (lambda (port) (write-file file port)) #:level 9) (close-port out) (log-msg lgr 'INFO build-id ": finished compressing " file) (cons output-name template)))) outputs))) (define (get-output-details store outputs) (map (match-lambda ((output-name . output) (let ((path-info (query-path-info store (derivation-output-path output)))) `((name . ,output-name) (hash . ,(bytevector->nix-base32-string (path-info-hash path-info))) (size . ,(path-info-nar-size path-info)) (references . ,(list->vector (map basename (path-info-references path-info)))))))) outputs)) (log-msg lgr 'INFO build-id ": setting up to build: " derivation-name) (with-store/non-blocking store (let ((pre-build-status (call-with-duration-metric metrics-registry "pre_build_duration_seconds" (lambda () (pre-build-process lgr store build-id derivation-substitute-urls non-derivation-substitute-urls derivation-name)) #:buckets (list 1 2.5 5 10 25 50 100 200 500 1000 (inf))))) (write-metrics) (if (eq? (assq-ref pre-build-status 'result) 'success) (begin (log-msg lgr 'INFO build-id ": setup successful, building: " derivation-name) (report-build-start coordinator-interface build-id #:log (build-log-procedure lgr build-id)) (let* ((derivation (read-derivation-from-file* derivation-name)) (result (perform-build lgr store build-id derivation)) ;; TODO Check this handles timezones right (end-time (localtime (time-second (current-time)) "UTC")) (output-details (if result (get-output-details store (derivation-outputs derivation)) #f)) (compressed-outputs (and result submit-outputs? (get-compressed-outputs store)))) (perform-post-build-actions (list build (make-upload-progress #f 0 0 0) (lambda (upload-progress) (agent-submit-log-file lgr coordinator-interface build-id derivation-name) (if result (post-build-success lgr coordinator-interface build-id derivation end-time submit-outputs? output-details compressed-outputs upload-progress uploads-updater) (post-build-failure lgr coordinator-interface build-id end-time)) (log-msg lgr 'INFO build-id ": finished processing: " derivation-name))) #:priority (list (assoc-ref build "derived_priority") (if (and result submit-outputs?) (fold (lambda (output result) (let ((file (cdr output))) (+ (stat:size (stat file)) result))) 0 compressed-outputs) 0))))) (begin (log-msg lgr 'INFO build-id ": setup failure: " (assq-ref pre-build-status 'failure_reason)) (report-setup-failure coordinator-interface build-id pre-build-status #:log (build-log-procedure lgr build-id)))))))) (define (current-max-builds) (let ((current-load (get-load-average #:period 1))) (if (>= current-load max-1min-load-average) 1 max-parallel-builds))) (add-handler! lgr port-log) (open-log! lgr) (log-msg lgr 'INFO "starting agent " uuid) (let ((left-over-temporary-files (scandir temporary-directory (lambda (name) (string-prefix? "guix-build-coordinator-file" name))))) (unless (null? left-over-temporary-files) (log-msg lgr 'INFO "deleting " (length left-over-temporary-files) " left over temporary files") (for-each (lambda (name) (let ((full-filename (string-append temporary-directory "/" name))) (log-msg lgr 'INFO "deleting " full-filename) (delete-file full-filename))) left-over-temporary-files))) (log-msg lgr 'INFO "connecting to coordinator " (slot-ref coordinator-interface 'coordinator-uri)) (let* ((perform-post-build-actions count-post-build-jobs count-post-build-threads list-post-build-jobs (create-work-queue max-parallel-uploads (lambda (build upload-progress proc) (proc upload-progress)) #:name "upload" ;; The priority here is a list where the ;; first element is the build priority, ;; and the second is the number of bytes ;; to upload #:priority (second a) (second b)) ;; Prioritise uploading smaller ;; files first (< a-priority b-priority)))))) (uploads-updater (make-uploads-updater list-post-build-jobs)) (process-job-with-queue count-jobs count-threads list-jobs (create-work-queue current-max-builds (lambda (build) (process-job build perform-post-build-actions uploads-updater)) #:thread-start-delay (make-time time-duration 0 (max 5 (- 135 (* 120 (/ max-parallel-builds 64))))) #:thread-stop-delay (make-time time-duration 0 20) #:name "job"))) (define (display-info) (display (simple-format #f "current threads: ~A current jobs: ~A\n~A\n" (count-threads) (+ (count-jobs) (count-post-build-jobs)) (string-append (string-join (map (match-lambda ((build-details) (simple-format #f " - ~A (derived priority: ~A) ~A" (assoc-ref build-details "uuid") (assoc-ref build-details "derived_priority") (or (assoc-ref build-details "derivation_name") (assoc-ref build-details "derivation-name"))))) (list-jobs)) "\n") "\n" (string-join (map (match-lambda ((build-details upload-progress _) (simple-format #f " - ~A (derived priority: ~A) ~A~A" (assoc-ref build-details "uuid") (assoc-ref build-details "derived_priority") (or (assoc-ref build-details "derivation_name") (assoc-ref build-details "derivation-name")) (if (upload-progress-file upload-progress) (simple-format #f " uploading ~A (~A/~A)" (upload-progress-file upload-progress) (upload-progress-bytes-sent upload-progress) (upload-progress-total-bytes upload-progress)) "")))) (list-post-build-jobs)) "\n"))) (current-error-port))) (let ((details (submit-status coordinator-interface 'idle #:1min-load-average (get-load-average #:period 1) #:system-uptime (get-uptime) #:initial-status-update? #t #:log (build-log-procedure lgr) #:retry-times 60))) (for-each (lambda (job-args) (process-job-with-queue job-args)) (vector->list (assoc-ref details "builds"))) (unless (running-on-the-hurd?) (call-with-new-thread (lambda () (set-thread-name "signal info") (sigaction SIGUSR1 (lambda _ (display-info))) (while #t (sleep 100000)))) (call-with-new-thread (lambda () (set-thread-name "console info") (let loop ((line (get-line (current-input-port)))) (unless (eof-object? line) (display-info) (loop (get-line (current-input-port)))))))) (call-with-new-thread (lambda () (set-thread-name "submit status") (while #t (with-exception-handler (lambda (exn) (log-msg lgr 'WARN "exception submitting status: " exn)) (lambda () (submit-status coordinator-interface (if (= (+ (count-jobs) (count-post-build-jobs)) 0) 'idle 'active) #:1min-load-average (get-load-average #:period 1) #:system-uptime (get-uptime) #:log (build-log-procedure lgr))) #:unwind? #t) (sleep 30)))) (while #t (let ((current-threads (count-threads)) (job-count (count-jobs))) (if (and (or (not max-allocated-builds) (< (+ job-count (count-post-build-jobs)) max-allocated-builds)) (or (< job-count current-threads) (= job-count 0))) (let* ((queued-build-ids (map (lambda (job-args) (assoc-ref (car job-args) "uuid")) (append (list-jobs) (list-post-build-jobs)))) (fetched-builds (with-exception-handler (lambda (exn) (log-msg lgr 'WARN "error fetching builds: " exn) '()) (lambda () (let ((free-space (free-disk-space "/gnu/store"))) (if (< free-space (* 2 (expt 2 30))) ; 2G (begin (log-msg lgr 'WARN "low space on /gnu/store, " "not fetching new builds") (sleep 30) '()) (fetch-builds-for-agent coordinator-interface systems (+ (max current-threads 1) (count-post-build-jobs)) #:log (build-log-procedure lgr))))) #:unwind? #t)) (new-builds (remove (lambda (build) (member (assoc-ref build "uuid") queued-build-ids)) fetched-builds))) (log-msg lgr 'INFO "running " current-threads " threads, currently allocated " (length fetched-builds) " builds") (log-msg lgr 'INFO "starting " (length new-builds) " new " (if (= (length new-builds) 1) "build" "builds")) (for-each (lambda (job-args) (process-job-with-queue job-args)) new-builds) (when (null? new-builds) (sleep 5))) (sleep 3))))))) (define* (build-log-procedure lgr #:optional build-id) (lambda (level . components) (apply log-msg lgr level (if build-id (cons* build-id ": " components) components)))) (define (agent-submit-log-file lgr coordinator-interface build-id derivation-name) (let ((log-file ;; TODO Not sure if retrying here is useful? (retry-on-error (lambda () (let ((file (derivation-log-file derivation-name))) (or file (raise-exception (make-exception-with-message (simple-format #f "log file missing for ~A (~A)" derivation-name build-id)))))) #:times 3 #:delay 3))) (retry-on-error (lambda () (log-msg lgr 'INFO build-id ": uploading log file " log-file) (submit-log-file coordinator-interface build-id log-file #:log (build-log-procedure lgr build-id))) #:times 12 #:delay 10))) (define (pre-build-process lgr store build-id derivation-substitute-urls non-derivation-substitute-urls derivation-name) (define (find-missing-inputs derivation inputs) (log-msg lgr 'DEBUG build-id ": checking the availability of build inputs") (let* ((output-paths (append-map derivation-input-output-paths inputs)) (missing-paths (remove (lambda (path) (if (valid-path? store path) (begin (add-temp-root store path) #t) #f)) output-paths)) (path-substitutes (begin (with-store/non-blocking substitute-store (apply set-build-options substitute-store `(,@(if non-derivation-substitute-urls (list #:substitute-urls non-derivation-substitute-urls) '()) #:max-silent-time 120 #:timeout ,(* 10 60))) (unless non-derivation-substitute-urls (log-msg lgr 'WARN "non-derivation-substitute-urls unset, unable to query substitute servers without caching")) (map (lambda (file) (log-msg lgr 'DEBUG build-id ": looking for missing input " file) (let ((substitute-urls-with-substitute (if (list? non-derivation-substitute-urls) (retry-on-error (lambda () (with-port-timeouts (lambda () (has-substiutes-no-cache? non-derivation-substitute-urls file)) #:timeout 60)) #:times 20 #:delay (random 15)) #f))) (and (if (eq? substitute-urls-with-substitute #f) #t ; keep going (not (null? substitute-urls-with-substitute))) (let ((log-port (open-output-string))) ;; TODO Do something with the logged output (parameterize ((current-build-output-port log-port)) (if (has-substitutes? substitute-store file) #t (begin (log-msg lgr 'WARN "a substitute should be available for " file ", but the guix-daemon claims it's unavailable" (if substitute-urls-with-substitute (string-append ":\n" (string-join substitute-urls-with-substitute "\n")) "")) #f))))))) missing-paths))))) (if (null? missing-paths) '() (if (member #f path-substitutes) (fold (lambda (file substitute-available? result) (if substitute-available? result (cons file result))) '() missing-paths path-substitutes) (begin (log-msg lgr 'DEBUG build-id ": fetching " (length missing-paths) " missing paths") (retry-on-error (lambda () (with-store/non-blocking fetch-substitute-store ;; Download the substitutes (apply set-build-options fetch-substitute-store `(,@(if non-derivation-substitute-urls (list #:substitute-urls non-derivation-substitute-urls) '()) #:max-silent-time 120 #:timeout ,(* 60 60))) (let ((log-port (open-output-string))) (with-throw-handler #t (lambda () (with-port-timeouts (lambda () (parameterize ((current-build-output-port log-port)) (build-things fetch-substitute-store missing-paths))) #:timeout (* 60 10))) (lambda (key . args) (log-msg lgr 'ERROR "exception when fetching missing paths " key ": " args) (display (get-output-string log-port)) (display (newline)) (close-output-port log-port))))) (for-each (lambda (missing-path) (add-temp-root store missing-path)) missing-paths)) #:times 12 #:delay (random 15)) ;; Double check everything is actually present. (let ((missing-files (remove (lambda (path) (valid-path? store path)) output-paths))) (if (null? missing-files) '() (begin (log-msg lgr 'WARN "failed to fetch substitutes for " missing-files) (let ((unavailable-outputs (delete-duplicates (append-map (lambda (missing-output) (find-missing-substitutes-for-output store non-derivation-substitute-urls missing-output)) missing-files)))) (log-msg lgr 'WARN "the following outputs are missing: " (string-join (map (lambda (output) (string-append " - " output)) unavailable-outputs))) (if (null? unavailable-outputs) ; TODO This probably ; shouldn't happen missing-files unavailable-outputs)))))))))) (define (delete-outputs store derivation) (let* ((outputs (derivation-outputs derivation)) (output-file-names (map derivation-output-path (map cdr outputs)))) (if (any file-exists? output-file-names) (let ((log-port (open-output-string))) (catch #t (lambda () (log-msg lgr 'DEBUG build-id ": deleting " (if (= (length output-file-names) 1) "output" "outputs")) ;; There can be issues deleting links when collecting garbage ;; from multiple threads (monitor ;; TODO Do something with the logged output (parameterize ((current-build-output-port log-port)) (delete-paths store output-file-names))) #t) (lambda (key args) (display (get-output-string log-port)) (log-msg lgr 'ERROR "delete-outputs: " key args) #f))) #t))) (let ((derivation (if (valid-path? store derivation-name) (begin (add-temp-root store derivation-name) (read-derivation-from-file* derivation-name)) (and (with-exception-handler (lambda (exn) (log-msg lgr 'ERROR "exception when reading/fetching derivation: " exn) (log-msg lgr 'ERROR "derivation-substitute-urls: " derivation-substitute-urls) #f) (lambda () (log-msg lgr 'DEBUG build-id ": substituting derivation") (retry-on-error (lambda () (with-store/non-blocking fetch-substitute-store ;; substitute-derivation uses set-build-options, so use ;; a temporary store connection (substitute-derivation fetch-substitute-store derivation-name #:substitute-urls derivation-substitute-urls)) (add-temp-root store derivation-name)) #:times 20 #:delay (random 15)) #t) #:unwind? #t) (read-derivation-from-file* derivation-name))))) (if derivation (begin (log-msg lgr 'DEBUG build-id ": derivation read from file") (match (delete-outputs store derivation) (#t (let ((missing-inputs (find-missing-inputs derivation (derivation-inputs derivation)))) (if (null? missing-inputs) '((result . success)) (begin (log-msg lgr 'DEBUG build-id ": missing inputs: " missing-inputs) `((result . failure) (failure_reason . missing_inputs) (missing_inputs . ,(list->vector missing-inputs))))))) (failure '((result . failure) (failure_reason . could_not_delete_outputs))))) '((result . failure) (failure_reason . error_fetching_derivation))))) (define (perform-build lgr store build-id derivation) (set-build-options store #:use-substitutes? #f) (parameterize ((current-build-output-port (%make-void-port "w"))) (with-exception-handler (lambda (exn) (if (store-protocol-error? exn) (cond ((= (store-protocol-error-status exn) 1) (log-msg lgr 'INFO build-id ": build failed")) ((= (store-protocol-error-status exn) 100) (log-msg lgr 'INFO build-id ": build failed (permanent failure)")) ((= (store-protocol-error-status exn) 101) (log-msg lgr 'INFO build-id ": build failed due to a timeout")) (else (log-msg lgr 'ERROR build-id ": unknown store protocol error: " (store-protocol-error-message exn) "(status: " (store-protocol-error-status exn) ")") (raise-exception exn))) (begin (log-msg lgr 'ERROR build-id ": unknown exception " exn) (raise-exception exn))) #f) (lambda () (with-throw-handler #t (lambda () (build-things store (list (derivation-file-name derivation))) (for-each (lambda (output) (add-temp-root store output)) (map derivation-output-path (map cdr (derivation-outputs derivation))))) (lambda (key . args) (unless (and (eq? key '%exception) (store-protocol-error? (car args)) (let ((status (store-protocol-error-status (car args)))) (or (= status 1) (= status 100) (= status 101)))) (simple-format (current-error-port) "exception when performing build: ~A ~A\n" key args) (backtrace)))) #t) #:unwind? #t))) (define (post-build-failure lgr coordinator-interface build-id end-time) (log-msg lgr 'INFO build-id ": build failed") (with-exception-handler (lambda (exn) (unless (agent-error-from-coordinator? exn) (raise-exception exn)) (let ((details (agent-error-from-coordinator-details exn))) (if (string? details) (cond ((string=? details "build_already_processed") (log-msg lgr 'WARN build-id ": coordinator indicates this build is already marked as processed") #t) (else (raise-exception (make-exception (make-exception-with-message "unrecognised error from coordinator") (make-exception-with-irritants details))))) (raise-exception (make-exception (make-exception-with-message "unrecognised error from coordinator") (make-exception-with-irritants details)))))) (lambda () (submit-build-result coordinator-interface build-id `((result . failure) (end_time . ,(strftime "%F %T" end-time))) #:log (build-log-procedure lgr build-id))) #:unwind? #t)) (define (post-build-success lgr coordinator-interface build-id derivation end-time submit-outputs? output-details compressed-outputs upload-progress uploads-updater) (define outputs (derivation-outputs derivation)) (define (maybe-delete-compressed-outputs) (with-exception-handler (lambda (exn) (log-msg lgr 'ERROR build-id ": exception deleting compressed outputs: " exn)) (lambda () (when submit-outputs? (for-each (lambda (file) (log-msg lgr 'INFO build-id ": deleting " file) (delete-file file)) (map cdr compressed-outputs)))) #:unwind? #t)) (define (attempt-submit-build-result) (with-exception-handler (lambda (exn) (unless (agent-error-from-coordinator? exn) (raise-exception exn)) (let ((details (agent-error-from-coordinator-details exn))) (if (string? details) (cond ((string=? details "build_already_processed") (log-msg lgr 'WARN build-id ": coordinator indicates this build is already marked as processed") (maybe-delete-compressed-outputs) #t) ((string=? details "cannot_store_result_for_canceled_build") (log-msg lgr 'WARN build-id ": coordinator indicates this build is now canceled") (maybe-delete-compressed-outputs) #t) ((string=? details "missing_build_log_file") ;; Retry submitting the log file (agent-submit-log-file lgr coordinator-interface build-id (derivation-file-name derivation)) (attempt-submit-build-result)) (else (maybe-delete-compressed-outputs) (raise-exception (make-exception (make-exception-with-message "unrecognised error from coordinator") (make-exception-with-irritants details))))) (or (and=> (assoc-ref details "missing_output") (lambda (missing-output-name) (let ((missing-output (any (match-lambda ((name . output) (if (string=? name missing-output-name) output #f))) outputs))) (unless missing-output (raise-exception (make-exception (make-exception-with-message "unknown missing output") (make-exception-with-irritants missing-output-name)))) (log-msg lgr 'DEBUG build-id ": missing output: " missing-output) (log-msg lgr 'DEBUG build-id ": pausing before attempting to send") (sleep 30) (log-msg lgr 'DEBUG build-id ": reattempting submitting: " missing-output) (let ((compressed-file (assoc-ref compressed-outputs missing-output-name))) (submit-one-output missing-output-name missing-output (stat:size (stat compressed-file)) compressed-file))) (attempt-submit-build-result))) (raise-exception (make-exception (make-exception-with-message "unrecognised error from coordinator") (make-exception-with-irritants details))))))) (lambda () (submit-build-result coordinator-interface build-id `((result . success) (end_time . ,(strftime "%F %T" end-time)) (outputs . ,(list->vector output-details))) #:log (build-log-procedure lgr build-id)) (maybe-delete-compressed-outputs)) #:unwind? #t)) (define (submit-one-output output-name output bytes compressed-file) (define bytes-already-sent 0) (define reporter (make-progress-reporter (lambda () (set-upload-progress-bytes-sent! upload-progress bytes-already-sent)) (lambda (bytes) (set-upload-progress-bytes-sent! upload-progress (+ bytes-already-sent bytes))) (lambda () (set-upload-progress-bytes-sent! upload-progress bytes)))) (define last-progress-update-bytes-sent 0) (define last-progress-update-bytes-hashed 0) (define last-progress-update-time (time-second (current-time))) (define (report-bytes-hashed bytes-now-hashed) (set-upload-progress-bytes-hashed! upload-progress bytes-now-hashed) (when (or (> bytes-now-hashed (+ last-progress-update-bytes-hashed 10000000)) (and (> (time-second (current-time)) (+ last-progress-update-time 15)))) (set! last-progress-update-bytes-hashed bytes-now-hashed) (set! last-progress-update-time (time-second (current-time))) (uploads-updater))) (set-upload-progress-file! upload-progress (derivation-output-path output)) (set-upload-progress-total-bytes! upload-progress bytes) (log-msg lgr 'INFO build-id ": submitting output " (derivation-output-path output)) (submit-output coordinator-interface build-id output-name compressed-file #:log (build-log-procedure lgr build-id) #:reporter-set-bytes-already-sent (lambda (bytes) (set! bytes-already-sent bytes)) #:reporter reporter #:report-bytes-hashed report-bytes-hashed) (set-upload-progress-file! upload-progress #f) (log-msg lgr 'INFO build-id ": finished submitting output " (derivation-output-path output))) (if submit-outputs? (begin (log-msg lgr 'INFO build-id ": build successful, submitting outputs") (for-each (match-lambda ((output-name . output) (let ((compressed-file (assoc-ref compressed-outputs output-name))) (submit-one-output output-name output (stat:size (stat compressed-file)) compressed-file)))) outputs) (log-msg lgr 'INFO build-id ": finished submitting outputs, reporting build success")) (begin (log-msg lgr 'INFO build-id ": build successful, skipping submitting outputs") (log-msg lgr 'INFO build-id ": reporting build success"))) (attempt-submit-build-result))