aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2024-03-13 14:52:25 +0000
committerChristopher Baines <mail@cbaines.net>2024-03-13 15:30:52 +0000
commit667e174e6f11d3f015d4335d9801386c2db962e3 (patch)
treeefce39326443a956162bff1f65ca47293357f873
parent1c80525d21f183ed2556def6fcd021f6ea9a88a4 (diff)
downloadbuild-coordinator-667e174e6f11d3f015d4335d9801386c2db962e3.tar
build-coordinator-667e174e6f11d3f015d4335d9801386c2db962e3.tar.gz
Remove mutexes from the upload monitoring
Instead, store the state in the work queue.
-rw-r--r--guix-build-coordinator/agent.scm342
1 files changed, 145 insertions, 197 deletions
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index 1b19fd7..8144947 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -24,6 +24,7 @@
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-19)
#:use-module (srfi srfi-43)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
@@ -55,12 +56,14 @@
(make-upload-progress file bytes-sent bytes-hashed
total-bytes)
upload-progress?
- (file upload-progress-file)
+ (file upload-progress-file
+ set-upload-progress-file!)
(bytes-sent upload-progress-bytes-sent
set-upload-progress-bytes-sent!)
(bytes-hashed upload-progress-bytes-hashed
set-upload-progress-bytes-hashed!)
- (total-bytes upload-progress-total-bytes))
+ (total-bytes upload-progress-total-bytes
+ set-upload-progress-total-bytes!))
(define temporary-directory
(or (getenv "TMPDIR") "/tmp"))
@@ -119,33 +122,21 @@
(write-textfile metrics-registry
metrics-file)))
- (define upload-slots
- (make-vector max-parallel-uploads #f))
-
- (define uploads-mutex
- (make-mutex))
- (define uploads-condition-variable
- (make-condition-variable))
-
- (define (with-upload-monitoring lgr file total-bytes p)
- (define upload-progress-record
- (make-upload-progress file 0 0 total-bytes))
-
- (define bytes-already-sent 0)
-
- (define last-progress-update-bytes-sent 0)
- (define last-progress-update-bytes-hashed 0)
- (define last-progress-update-time 0)
-
- (define (display-update)
- (let ((uploads-count
- (vector-count (lambda (_ slot)
- (not (eq? #f slot)))
- upload-slots)))
- (log-msg lgr 'INFO uploads-count " uploads in progress")
-
- (vector-for-each
- (lambda (_ upload-progress)
+ (define (make-uploads-updater list-jobs)
+ (lambda ()
+ (let ((upload-progress-records
+ (filter-map
+ (match-lambda
+ ((build upload-progress thunk)
+ (if (upload-progress-file upload-progress)
+ upload-progress
+ #f)))
+ (list-jobs))))
+ (log-msg lgr 'INFO (length upload-progress-records)
+ " uploads in progress")
+
+ (for-each
+ (lambda (upload-progress)
(when upload-progress
(log-msg lgr 'INFO
(upload-progress-file upload-progress)
@@ -153,12 +144,9 @@
(let ((total-bytes
(upload-progress-total-bytes upload-progress))
(bytes-sent
- (+ bytes-already-sent
- (upload-progress-bytes-sent
- upload-progress)))
+ (upload-progress-bytes-sent upload-progress))
(bytes-hashed
- (upload-progress-bytes-hashed
- upload-progress)))
+ (upload-progress-bytes-hashed upload-progress)))
(if (and (= bytes-sent total-bytes)
(> bytes-hashed 0))
(format
@@ -171,101 +159,9 @@
"~2,2f/~2,2fMB sent"
(/ bytes-sent 1000000)
(/ total-bytes 1000000)))))))
- upload-slots)))
-
- (define (report-bytes-sent bytes-now-sent)
- (set-upload-progress-bytes-sent!
- upload-progress-record
- bytes-now-sent)
-
- (when (or (> bytes-now-sent
- (+ last-progress-update-bytes-sent 10000000))
- (and (> (time-second (current-time))
- (+ last-progress-update-time 15))))
- (set! last-progress-update-bytes-sent
- bytes-now-sent)
- (set! last-progress-update-time
- (time-second (current-time)))
-
- (display-update)))
-
- (define (reporter-set-bytes-already-sent bytes)
- (set! bytes-already-sent bytes))
-
- (define reporter
- (make-progress-reporter
- (lambda ()
- (report-bytes-sent 0))
- report-bytes-sent
- (lambda ()
- (report-bytes-sent total-bytes))))
-
- (define (report-bytes-hashed bytes-now-hashed)
- (set-upload-progress-bytes-hashed! upload-progress-record
- bytes-now-hashed)
- (when (or (> bytes-now-hashed
- (+ last-progress-update-bytes-hashed 10000000))
- (and (> (time-second (current-time))
- (+ last-progress-update-time 15))))
- (set! last-progress-update-bytes-hashed
- bytes-now-hashed)
- (set! last-progress-update-time
- (time-second (current-time)))
-
- (display-update)))
+ upload-progress-records))))
- (define (free-slot index)
- (with-mutex uploads-mutex
- (vector-set! upload-slots
- index
- #f))
-
- (signal-condition-variable uploads-condition-variable))
-
- (lock-mutex uploads-mutex)
-
- (let loop ()
- (let ((free-index
- (any (lambda (index)
- (if (eq? (vector-ref upload-slots index)
- #f)
- index
- #f))
- (iota (vector-length upload-slots)
- 0))))
-
- (if free-index
- (begin
- (vector-set! upload-slots
- free-index
- upload-progress-record)
-
- (unlock-mutex uploads-mutex)
-
- (call-with-values
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (free-slot free-index)
-
- (raise-exception exn))
- (lambda ()
- (p reporter-set-bytes-already-sent
- reporter
- report-bytes-hashed))
- #:unwind? #t))
- (lambda vals
- (free-slot free-index)
-
- (apply values vals))))
- (begin
- (wait-condition-variable
- uploads-condition-variable
- uploads-mutex)
-
- (loop))))))
-
- (define (process-job build perform-post-build-actions)
+ (define (process-job build perform-post-build-actions uploads-updater)
(let ((build-id (assoc-ref build "uuid"))
(derivation-name (or (assoc-ref build "derivation_name")
(assoc-ref build "derivation-name")))
@@ -369,7 +265,8 @@
(perform-post-build-actions
(list
build
- (lambda ()
+ (make-upload-progress #f 0 0 0)
+ (lambda (upload-progress)
(agent-submit-log-file lgr
coordinator-interface
build-id derivation-name)
@@ -383,7 +280,8 @@
submit-outputs?
output-details
compressed-outputs
- with-upload-monitoring)
+ upload-progress
+ uploads-updater)
(post-build-failure lgr
coordinator-interface
build-id
@@ -442,50 +340,58 @@
(log-msg lgr 'INFO "connecting to coordinator "
(slot-ref coordinator-interface 'coordinator-uri))
- (let*-values (((perform-post-build-actions count-post-build-jobs
- count-post-build-threads
- list-post-build-jobs)
- (create-work-queue max-parallel-uploads
- (lambda (build thunk)
- (thunk))
- #:name "upload"
-
- ;; The priority here is a list where the
- ;; first element is the build priority,
- ;; and the second is the number of bytes
- ;; to upload
- #:priority<?
- (lambda (a b)
- (let ((a-priority (first a))
- (b-priority (first b)))
- (if (= a-priority b-priority)
- (> (second a)
- (second b))
-
- ;; Prioritise uploading smaller
- ;; files first
- (< a-priority
- b-priority))))))
-
- ((process-job-with-queue count-jobs count-threads list-jobs)
- (create-work-queue current-max-builds
- (lambda (build)
- (process-job build
- perform-post-build-actions))
- #:thread-start-delay
- (make-time
- time-duration
- 0
- (max 5
- (- 135
- (* 120
- (/ max-parallel-builds 64)))))
- #:thread-stop-delay
- (make-time
- time-duration
- 0
- 20)
- #:name "job")))
+ (let* ((perform-post-build-actions
+ count-post-build-jobs
+ count-post-build-threads
+ list-post-build-jobs
+ (create-work-queue max-parallel-uploads
+ (lambda (build upload-progress proc)
+ (proc upload-progress))
+ #:name "upload"
+
+ ;; The priority here is a list where the
+ ;; first element is the build priority,
+ ;; and the second is the number of bytes
+ ;; to upload
+ #:priority<?
+ (lambda (a b)
+ (let ((a-priority (first a))
+ (b-priority (first b)))
+ (if (= a-priority b-priority)
+ (> (second a)
+ (second b))
+
+ ;; Prioritise uploading smaller
+ ;; files first
+ (< a-priority
+ b-priority))))))
+
+ (uploads-updater
+ (make-uploads-updater list-post-build-jobs))
+
+ (process-job-with-queue
+ count-jobs
+ count-threads
+ list-jobs
+ (create-work-queue current-max-builds
+ (lambda (build)
+ (process-job build
+ perform-post-build-actions
+ uploads-updater))
+ #:thread-start-delay
+ (make-time
+ time-duration
+ 0
+ (max 5
+ (- 135
+ (* 120
+ (/ max-parallel-builds 64)))))
+ #:thread-stop-delay
+ (make-time
+ time-duration
+ 0
+ 20)
+ #:name "job")))
(define (display-info)
(display
(simple-format
@@ -509,15 +415,23 @@
"\n"
(string-join
(map (match-lambda
- ((build-details _)
+ ((build-details upload-progress _)
(simple-format
#f " - ~A (derived priority: ~A)
- ~A"
+ ~A~A"
(assoc-ref build-details "uuid")
(assoc-ref build-details "derived_priority")
(or
(assoc-ref build-details "derivation_name")
- (assoc-ref build-details "derivation-name")))))
+ (assoc-ref build-details "derivation-name"))
+ (if (upload-progress-file upload-progress)
+ (simple-format
+ #f "
+ uploading ~A (~A/~A)"
+ (upload-progress-file upload-progress)
+ (upload-progress-bytes-sent upload-progress)
+ (upload-progress-total-bytes upload-progress))
+ ""))))
(list-post-build-jobs))
"\n")))
(current-error-port)))
@@ -1001,7 +915,8 @@ but the guix-daemon claims it's unavailable"
submit-outputs?
output-details
compressed-outputs
- with-upload-monitoring)
+ upload-progress
+ uploads-updater)
(define outputs
(derivation-outputs derivation))
@@ -1113,25 +1028,58 @@ but the guix-daemon claims it's unavailable"
#:unwind? #t))
(define (submit-one-output output-name output bytes compressed-file)
- (with-upload-monitoring
- lgr
- (derivation-output-path output)
- bytes
- (lambda (reporter-set-bytes-already-sent reporter report-bytes-hashed)
- (log-msg lgr 'INFO
- build-id ": submitting output "
- (derivation-output-path output))
- (submit-output coordinator-interface
- build-id output-name
- compressed-file
- #:log (build-log-procedure lgr build-id)
- #:reporter-set-bytes-already-sent
- reporter-set-bytes-already-sent
- #:reporter reporter
- #:report-bytes-hashed report-bytes-hashed)
- (log-msg lgr 'INFO
- build-id ": finished submitting output "
- (derivation-output-path output)))))
+ (define bytes-already-sent 0)
+
+ (define reporter
+ (make-progress-reporter
+ (lambda ()
+ (set-upload-progress-bytes-sent! upload-progress
+ bytes-already-sent))
+ (lambda (bytes)
+ (set-upload-progress-bytes-sent! upload-progress
+ (+ bytes-already-sent bytes)))
+ (lambda ()
+ (set-upload-progress-bytes-sent! upload-progress bytes))))
+
+ (define last-progress-update-bytes-sent 0)
+ (define last-progress-update-bytes-hashed 0)
+ (define last-progress-update-time (time-second (current-time)))
+
+ (define (report-bytes-hashed bytes-now-hashed)
+ (set-upload-progress-bytes-hashed! upload-progress
+ bytes-now-hashed)
+ (when (or (> bytes-now-hashed
+ (+ last-progress-update-bytes-hashed 10000000))
+ (and (> (time-second (current-time))
+ (+ last-progress-update-time 15))))
+ (set! last-progress-update-bytes-hashed
+ bytes-now-hashed)
+ (set! last-progress-update-time
+ (time-second (current-time)))
+
+ (uploads-updater)))
+
+ (set-upload-progress-file! upload-progress (derivation-output-path output))
+ (set-upload-progress-total-bytes! upload-progress bytes)
+
+ (log-msg lgr 'INFO
+ build-id ": submitting output "
+ (derivation-output-path output))
+ (submit-output coordinator-interface
+ build-id output-name
+ compressed-file
+ #:log (build-log-procedure lgr build-id)
+ #:reporter-set-bytes-already-sent
+ (lambda (bytes)
+ (set! bytes-already-sent bytes))
+ #:reporter reporter
+ #:report-bytes-hashed report-bytes-hashed)
+
+ (set-upload-progress-file! upload-progress #f)
+
+ (log-msg lgr 'INFO
+ build-id ": finished submitting output "
+ (derivation-output-path output)))
(if submit-outputs?
(begin