From 667e174e6f11d3f015d4335d9801386c2db962e3 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 13 Mar 2024 14:52:25 +0000 Subject: Remove mutexes from the upload monitoring Instead, store the state in the work queue. --- guix-build-coordinator/agent.scm | 342 +++++++++++++++++---------------------- 1 file changed, 145 insertions(+), 197 deletions(-) diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index 1b19fd7..8144947 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -24,6 +24,7 @@ #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-43) + #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 format) @@ -55,12 +56,14 @@ (make-upload-progress file bytes-sent bytes-hashed total-bytes) upload-progress? - (file upload-progress-file) + (file upload-progress-file + set-upload-progress-file!) (bytes-sent upload-progress-bytes-sent set-upload-progress-bytes-sent!) (bytes-hashed upload-progress-bytes-hashed set-upload-progress-bytes-hashed!) - (total-bytes upload-progress-total-bytes)) + (total-bytes upload-progress-total-bytes + set-upload-progress-total-bytes!)) (define temporary-directory (or (getenv "TMPDIR") "/tmp")) @@ -119,33 +122,21 @@ (write-textfile metrics-registry metrics-file))) - (define upload-slots - (make-vector max-parallel-uploads #f)) - - (define uploads-mutex - (make-mutex)) - (define uploads-condition-variable - (make-condition-variable)) - - (define (with-upload-monitoring lgr file total-bytes p) - (define upload-progress-record - (make-upload-progress file 0 0 total-bytes)) - - (define bytes-already-sent 0) - - (define last-progress-update-bytes-sent 0) - (define last-progress-update-bytes-hashed 0) - (define last-progress-update-time 0) - - (define (display-update) - (let ((uploads-count - (vector-count (lambda (_ slot) - (not (eq? #f slot))) - upload-slots))) - (log-msg lgr 'INFO uploads-count " uploads in progress") - - (vector-for-each - (lambda (_ upload-progress) + (define (make-uploads-updater list-jobs) + (lambda () + (let ((upload-progress-records + (filter-map + (match-lambda + ((build upload-progress thunk) + (if (upload-progress-file upload-progress) + upload-progress + #f))) + (list-jobs)))) + (log-msg lgr 'INFO (length upload-progress-records) + " uploads in progress") + + (for-each + (lambda (upload-progress) (when upload-progress (log-msg lgr 'INFO (upload-progress-file upload-progress) @@ -153,12 +144,9 @@ (let ((total-bytes (upload-progress-total-bytes upload-progress)) (bytes-sent - (+ bytes-already-sent - (upload-progress-bytes-sent - upload-progress))) + (upload-progress-bytes-sent upload-progress)) (bytes-hashed - (upload-progress-bytes-hashed - upload-progress))) + (upload-progress-bytes-hashed upload-progress))) (if (and (= bytes-sent total-bytes) (> bytes-hashed 0)) (format @@ -171,101 +159,9 @@ "~2,2f/~2,2fMB sent" (/ bytes-sent 1000000) (/ total-bytes 1000000))))))) - upload-slots))) - - (define (report-bytes-sent bytes-now-sent) - (set-upload-progress-bytes-sent! - upload-progress-record - bytes-now-sent) - - (when (or (> bytes-now-sent - (+ last-progress-update-bytes-sent 10000000)) - (and (> (time-second (current-time)) - (+ last-progress-update-time 15)))) - (set! last-progress-update-bytes-sent - bytes-now-sent) - (set! last-progress-update-time - (time-second (current-time))) - - (display-update))) - - (define (reporter-set-bytes-already-sent bytes) - (set! bytes-already-sent bytes)) - - (define reporter - (make-progress-reporter - (lambda () - (report-bytes-sent 0)) - report-bytes-sent - (lambda () - (report-bytes-sent total-bytes)))) - - (define (report-bytes-hashed bytes-now-hashed) - (set-upload-progress-bytes-hashed! upload-progress-record - bytes-now-hashed) - (when (or (> bytes-now-hashed - (+ last-progress-update-bytes-hashed 10000000)) - (and (> (time-second (current-time)) - (+ last-progress-update-time 15)))) - (set! last-progress-update-bytes-hashed - bytes-now-hashed) - (set! last-progress-update-time - (time-second (current-time))) - - (display-update))) + upload-progress-records)))) - (define (free-slot index) - (with-mutex uploads-mutex - (vector-set! upload-slots - index - #f)) - - (signal-condition-variable uploads-condition-variable)) - - (lock-mutex uploads-mutex) - - (let loop () - (let ((free-index - (any (lambda (index) - (if (eq? (vector-ref upload-slots index) - #f) - index - #f)) - (iota (vector-length upload-slots) - 0)))) - - (if free-index - (begin - (vector-set! upload-slots - free-index - upload-progress-record) - - (unlock-mutex uploads-mutex) - - (call-with-values - (lambda () - (with-exception-handler - (lambda (exn) - (free-slot free-index) - - (raise-exception exn)) - (lambda () - (p reporter-set-bytes-already-sent - reporter - report-bytes-hashed)) - #:unwind? #t)) - (lambda vals - (free-slot free-index) - - (apply values vals)))) - (begin - (wait-condition-variable - uploads-condition-variable - uploads-mutex) - - (loop)))))) - - (define (process-job build perform-post-build-actions) + (define (process-job build perform-post-build-actions uploads-updater) (let ((build-id (assoc-ref build "uuid")) (derivation-name (or (assoc-ref build "derivation_name") (assoc-ref build "derivation-name"))) @@ -369,7 +265,8 @@ (perform-post-build-actions (list build - (lambda () + (make-upload-progress #f 0 0 0) + (lambda (upload-progress) (agent-submit-log-file lgr coordinator-interface build-id derivation-name) @@ -383,7 +280,8 @@ submit-outputs? output-details compressed-outputs - with-upload-monitoring) + upload-progress + uploads-updater) (post-build-failure lgr coordinator-interface build-id @@ -442,50 +340,58 @@ (log-msg lgr 'INFO "connecting to coordinator " (slot-ref coordinator-interface 'coordinator-uri)) - (let*-values (((perform-post-build-actions count-post-build-jobs - count-post-build-threads - list-post-build-jobs) - (create-work-queue max-parallel-uploads - (lambda (build thunk) - (thunk)) - #:name "upload" - - ;; The priority here is a list where the - ;; first element is the build priority, - ;; and the second is the number of bytes - ;; to upload - #:priority (second a) - (second b)) - - ;; Prioritise uploading smaller - ;; files first - (< a-priority - b-priority)))))) - - ((process-job-with-queue count-jobs count-threads list-jobs) - (create-work-queue current-max-builds - (lambda (build) - (process-job build - perform-post-build-actions)) - #:thread-start-delay - (make-time - time-duration - 0 - (max 5 - (- 135 - (* 120 - (/ max-parallel-builds 64))))) - #:thread-stop-delay - (make-time - time-duration - 0 - 20) - #:name "job"))) + (let* ((perform-post-build-actions + count-post-build-jobs + count-post-build-threads + list-post-build-jobs + (create-work-queue max-parallel-uploads + (lambda (build upload-progress proc) + (proc upload-progress)) + #:name "upload" + + ;; The priority here is a list where the + ;; first element is the build priority, + ;; and the second is the number of bytes + ;; to upload + #:priority (second a) + (second b)) + + ;; Prioritise uploading smaller + ;; files first + (< a-priority + b-priority)))))) + + (uploads-updater + (make-uploads-updater list-post-build-jobs)) + + (process-job-with-queue + count-jobs + count-threads + list-jobs + (create-work-queue current-max-builds + (lambda (build) + (process-job build + perform-post-build-actions + uploads-updater)) + #:thread-start-delay + (make-time + time-duration + 0 + (max 5 + (- 135 + (* 120 + (/ max-parallel-builds 64))))) + #:thread-stop-delay + (make-time + time-duration + 0 + 20) + #:name "job"))) (define (display-info) (display (simple-format @@ -509,15 +415,23 @@ "\n" (string-join (map (match-lambda - ((build-details _) + ((build-details upload-progress _) (simple-format #f " - ~A (derived priority: ~A) - ~A" + ~A~A" (assoc-ref build-details "uuid") (assoc-ref build-details "derived_priority") (or (assoc-ref build-details "derivation_name") - (assoc-ref build-details "derivation-name"))))) + (assoc-ref build-details "derivation-name")) + (if (upload-progress-file upload-progress) + (simple-format + #f " + uploading ~A (~A/~A)" + (upload-progress-file upload-progress) + (upload-progress-bytes-sent upload-progress) + (upload-progress-total-bytes upload-progress)) + "")))) (list-post-build-jobs)) "\n"))) (current-error-port))) @@ -1001,7 +915,8 @@ but the guix-daemon claims it's unavailable" submit-outputs? output-details compressed-outputs - with-upload-monitoring) + upload-progress + uploads-updater) (define outputs (derivation-outputs derivation)) @@ -1113,25 +1028,58 @@ but the guix-daemon claims it's unavailable" #:unwind? #t)) (define (submit-one-output output-name output bytes compressed-file) - (with-upload-monitoring - lgr - (derivation-output-path output) - bytes - (lambda (reporter-set-bytes-already-sent reporter report-bytes-hashed) - (log-msg lgr 'INFO - build-id ": submitting output " - (derivation-output-path output)) - (submit-output coordinator-interface - build-id output-name - compressed-file - #:log (build-log-procedure lgr build-id) - #:reporter-set-bytes-already-sent - reporter-set-bytes-already-sent - #:reporter reporter - #:report-bytes-hashed report-bytes-hashed) - (log-msg lgr 'INFO - build-id ": finished submitting output " - (derivation-output-path output))))) + (define bytes-already-sent 0) + + (define reporter + (make-progress-reporter + (lambda () + (set-upload-progress-bytes-sent! upload-progress + bytes-already-sent)) + (lambda (bytes) + (set-upload-progress-bytes-sent! upload-progress + (+ bytes-already-sent bytes))) + (lambda () + (set-upload-progress-bytes-sent! upload-progress bytes)))) + + (define last-progress-update-bytes-sent 0) + (define last-progress-update-bytes-hashed 0) + (define last-progress-update-time (time-second (current-time))) + + (define (report-bytes-hashed bytes-now-hashed) + (set-upload-progress-bytes-hashed! upload-progress + bytes-now-hashed) + (when (or (> bytes-now-hashed + (+ last-progress-update-bytes-hashed 10000000)) + (and (> (time-second (current-time)) + (+ last-progress-update-time 15)))) + (set! last-progress-update-bytes-hashed + bytes-now-hashed) + (set! last-progress-update-time + (time-second (current-time))) + + (uploads-updater))) + + (set-upload-progress-file! upload-progress (derivation-output-path output)) + (set-upload-progress-total-bytes! upload-progress bytes) + + (log-msg lgr 'INFO + build-id ": submitting output " + (derivation-output-path output)) + (submit-output coordinator-interface + build-id output-name + compressed-file + #:log (build-log-procedure lgr build-id) + #:reporter-set-bytes-already-sent + (lambda (bytes) + (set! bytes-already-sent bytes)) + #:reporter reporter + #:report-bytes-hashed report-bytes-hashed) + + (set-upload-progress-file! upload-progress #f) + + (log-msg lgr 'INFO + build-id ": finished submitting output " + (derivation-output-path output))) (if submit-outputs? (begin -- cgit v1.2.3