aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/agent-messaging
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-05-10 10:13:54 +0100
committerChristopher Baines <mail@cbaines.net>2023-05-10 10:13:54 +0100
commit930e1d5b489797666602674247efe82756ca4082 (patch)
tree89ec76474d02ee0a1ba75deda899172a9368f1a2 /guix-build-coordinator/agent-messaging
parent039430a0ef549c36cd67227be2b0ca2b08bb7925 (diff)
downloadbuild-coordinator-930e1d5b489797666602674247efe82756ca4082.tar
build-coordinator-930e1d5b489797666602674247efe82756ca4082.tar.gz
Move computing output hashes to dedicated threads
This should help the coordinator and agents ensure hashes are computed and the agent finds out when this has happened, even in a situation where the coordinator is restarted/crashes and the connection between the agents and coordinator are lost.
Diffstat (limited to 'guix-build-coordinator/agent-messaging')
-rw-r--r--guix-build-coordinator/agent-messaging/http/server.scm335
1 files changed, 219 insertions, 116 deletions
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm
index 2ef8500..7fe3dd4 100644
--- a/guix-build-coordinator/agent-messaging/http/server.scm
+++ b/guix-build-coordinator/agent-messaging/http/server.scm
@@ -41,16 +41,24 @@
#:use-module (lzlib)
#:use-module (gcrypt base16)
#:use-module (gcrypt hash)
+ #:use-module (fibers)
+ #:use-module (fibers timers)
+ #:use-module (fibers channels)
+ #:use-module (fibers operations)
#:use-module (prometheus)
#:use-module (guix base32)
#:use-module (guix base64)
#:use-module (guix progress)
#:use-module (guix build utils)
+ #:use-module ((guix build syscalls)
+ #:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator utils fibers)
#:use-module (guix-build-coordinator datastore)
#:use-module (guix-build-coordinator coordinator)
- #:export (http-agent-messaging-start-server))
+ #:export (make-output-hash-channel
+
+ http-agent-messaging-start-server))
(define (bad-request message . args)
(throw 'bad-request message args))
@@ -107,35 +115,27 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
#f))
(forward-invocation))))))
-(define* (progress-reporter/hash size log-port
- force-output-to-log-port
+(define* (progress-reporter/hash size callback
#:key (progress-interval
- (make-time time-duration 0 20)))
+ (make-time time-duration 0 10)))
(define total 0)
- (define (report-progress transferred)
- (define message
- (format #f "~a~%" transferred))
-
- (display message log-port) ;should be atomic
- (force-output-to-log-port))
-
(progress-reporter
(start (lambda ()
(set! total 0)
- (display "0\n" log-port)))
- (report (let ((report (rate-limited report-progress progress-interval)))
+ (callback 0)))
+ (report (let ((report (rate-limited callback progress-interval)))
(lambda (transferred)
(set! total transferred)
(report transferred))))
(stop (lambda ()
(let ((size (or size total)))
- (report-progress size)
- (display "finished\n" log-port))))))
+ (callback size))))))
(define (http-agent-messaging-start-server port host secret-key-base
build-coordinator
- chunked-request-channel)
+ chunked-request-channel
+ output-hash-channel)
(define gc-metrics-updater
(get-gc-metrics-updater
(build-coordinator-metrics-registry build-coordinator)))
@@ -177,6 +177,7 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
secret-key-base
build-coordinator
chunked-request-channel
+ output-hash-channel
update-managed-metrics!)))
#:host host
#:port port))
@@ -275,45 +276,170 @@ port. Also, the port used can be changed by passing the --port option.\n"
`((event . ,(assq-ref event-count 'event)))))
(datastore-count-unprocessed-hook-events datastore)))))
-(define* (compute-hash-of-uploaded-output logger file-name
- #:key
- (reporter progress-reporter/silent))
- (bytevector->nix-base32-string
- (with-exception-handler
- (lambda (exn)
- (log-msg logger
- 'WARN
- "error computing hash: " exn)
-
- (when (file-exists? file-name)
- (let ((md5-hash
- (bytevector->base16-string
- (file-hash (hash-algorithm md5) file-name)))
- (file-bytes
- (stat:size (stat file-name))))
- ;; I've seen exceptions happen here from lzip, so try
- ;; deleting the tmp file so that it's re-uploaded.
- (log-msg logger 'WARN "deleting " file-name)
- (delete-file file-name)
-
- (raise-exception
- (make-exception
- exn
- (make-exception-with-irritants
- `((file-bytes . ,file-bytes)
- (md5-hash . ,md5-hash)))))))
-
- (raise-exception exn))
+(define (make-output-hash-channel build-coordinator)
+ (define logger
+ (build-coordinator-logger build-coordinator))
+
+ (define (compute-hash-of-uploaded-output channel filename)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg logger
+ 'WARN
+ "error computing hash: " exn)
+
+ (when (file-exists? filename)
+ (let ((md5-hash
+ (bytevector->base16-string
+ (file-hash (hash-algorithm md5) filename)))
+ (file-bytes
+ (stat:size (stat filename))))
+ ;; I've seen exceptions happen here from lzip, so try
+ ;; deleting the tmp file so that it's re-uploaded.
+ (log-msg logger 'WARN "deleting " filename)
+ (delete-file filename)
+
+ (raise-exception
+ (make-exception
+ exn
+ (make-exception-with-irritants
+ `((file-bytes . ,file-bytes)
+ (md5-hash . ,md5-hash)))))))
+
+ exn)
+ (lambda ()
+ (bytevector->nix-base32-string
+ (call-with-input-file filename
+ (lambda (compressed-port)
+ (call-with-lzip-input-port
+ compressed-port
+ (lambda (port)
+ (port-hash* (hash-algorithm sha256)
+ port
+ #:reporter
+ (progress-reporter/hash
+ (stat:size (stat filename))
+ (lambda (processed-bytes)
+ (put-message
+ channel
+ `(update ,filename ,processed-bytes))))))))
+ #:binary #t)))
+ #:unwind? #t))
+
+ (let ((channel (make-channel))
+ (update-channels-by-filename
+ (make-hash-table)))
+ (call-with-new-thread
(lambda ()
- (call-with-input-file file-name
- (lambda (compressed-port)
- (call-with-lzip-input-port compressed-port
- (lambda (port)
- (port-hash* (hash-algorithm sha256)
- port
- #:reporter reporter))))
- #:binary #t))
- #:unwind? #t)))
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception in output hash thread: " exn))
+ (lambda ()
+ (match (get-message channel)
+ (('request build-uuid filename update-channel)
+ (or (and=>
+ (hash-ref update-channels-by-filename
+ filename)
+ (lambda (existing-channels)
+ (hash-set! update-channels-by-filename
+ filename
+ (cons update-channel
+ existing-channels))))
+ (begin
+ (hash-set! update-channels-by-filename
+ filename
+ (list update-channel))
+
+ (call-with-new-thread
+ (lambda ()
+ (set-thread-name "hash output")
+ (log-msg logger 'DEBUG build-uuid ": computing hash of " filename)
+
+ (put-message
+ channel
+ (list 'result
+ filename
+ (compute-hash-of-uploaded-output channel
+ filename))))))))
+ (('update filename bytes-processed)
+ (for-each
+ (lambda (update-channel)
+ (spawn-fiber
+ (lambda ()
+ (perform-operation
+ (choice-operation
+ (put-operation update-channel bytes-processed)
+ (sleep-operation 5))))
+ (build-coordinator-scheduler build-coordinator)
+ #:parallel? #t))
+ (hash-ref update-channels-by-filename filename)))
+ (('result filename result)
+ (for-each
+ (lambda (update-channel)
+ (spawn-fiber
+ (lambda ()
+ (perform-operation
+ (choice-operation
+ (put-operation update-channel (list 'result result))
+ (sleep-operation 60))))
+ (build-coordinator-scheduler build-coordinator)
+ #:parallel? #t))
+ (hash-ref update-channels-by-filename filename))
+
+ (hash-remove! update-channels-by-filename filename))))
+ #:unwind? #t))))
+ channel))
+
+(define (compute-output-hash-via-channel output-hash-channel
+ request
+ response-port
+ build-uuid
+ tmp-output-file-name)
+ (let ((channel (make-channel)))
+ (define (write-to-response-port response)
+ (display response response-port)
+ (force-output response-port)
+ ;; TODO because the chunked output port
+ ;; doesn't call force-output on the
+ ;; underlying port, do that here. We
+ ;; want this event to be sent now,
+ ;; rather than when some buffer fills
+ ;; up.
+ (force-output (request-port request)))
+
+ (define (get-message* channel)
+ (perform-operation
+ (choice-operation
+ (get-operation channel)
+ (wrap-operation
+ (sleep-operation 20)
+ (const 'timeout)))))
+
+ (put-message output-hash-channel
+ (list 'request build-uuid tmp-output-file-name channel))
+
+ (let loop ((previous-bytes-processed 0)
+ (message (get-message* channel)))
+ (match message
+ (('result result) result)
+ ('timeout
+ (write-to-response-port
+ (simple-format #f "~A\n" previous-bytes-processed))
+ (loop previous-bytes-processed
+ (get-message* channel)))
+ (bytes-processed
+ (if (> bytes-processed previous-bytes-processed)
+ (begin
+ (write-to-response-port
+ (simple-format #f "~A\n" bytes-processed))
+ (loop bytes-processed
+ (get-message* channel)))
+ (begin
+ ;; Still write to keep the connection open
+ (write-to-response-port
+ (simple-format #f "~A\n" previous-bytes-processed))
+ (loop previous-bytes-processed
+ (get-message* channel)))))))))
(define* (receive-file body
length
@@ -385,6 +511,7 @@ port. Also, the port used can be changed by passing the --port option.\n"
secret-key-base
build-coordinator
chunked-request-channel
+ output-hash-channel
update-managed-metrics!)
(define (authenticated? uuid request)
(let* ((authorization-base64
@@ -643,37 +770,25 @@ port. Also, the port used can be changed by passing the --port option.\n"
;; Make sure NGinx gets the response headers
(force-output (request-port request))
- ;; Compute the hash of the file
- (let* ((reporter (progress-reporter/hash
- (stat:size (stat tmp-output-file-name))
- response-port
- (lambda ()
- (force-output response-port)
- ;; TODO because the chunked output port
- ;; doesn't call force-output on the
- ;; underlying port, do that here. We
- ;; want this event to be sent now,
- ;; rather than when some buffer fills
- ;; up.
- (force-output (request-port request)))))
- (hash
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (compute-hash-of-uploaded-output
- logger
- tmp-output-file-name
- #:reporter reporter)))))
- (log-msg logger
- 'DEBUG
- "computed the hash of " tmp-output-file-name ", renaming")
-
- (call-with-output-file (string-append output-file-name ".hash")
- (lambda (port)
- (simple-format port "~A\n" hash))))
-
- (rename-file tmp-output-file-name
- output-file-name))))
+ (let ((result (compute-output-hash-via-channel
+ output-hash-channel
+ request
+ response-port
+ uuid
+ tmp-output-file-name)))
+ ;; TODO: Maybe do something different here to
+ ;; indicate the issue to the agent?
+ (unless (exception? result)
+ (log-msg logger
+ 'DEBUG
+ "computed the hash of " tmp-output-file-name ", renaming")
+
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" result)))
+
+ (rename-file tmp-output-file-name
+ output-file-name))))))
(render-json
'(("error" . "access denied"))
#:code 403))))
@@ -736,37 +851,25 @@ port. Also, the port used can be changed by passing the --port option.\n"
;; Make sure NGinx gets the response headers
(force-output (request-port request))
- ;; Compute the hash of the file
- (let* ((reporter (progress-reporter/hash
- (stat:size (stat tmp-output-file-name))
- response-port
- (lambda ()
- (force-output response-port)
- ;; TODO because the chunked output port
- ;; doesn't call force-output on the
- ;; underlying port, do that here. We
- ;; want this event to be sent now,
- ;; rather than when some buffer fills
- ;; up.
- (force-output (request-port request)))))
- (hash
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (compute-hash-of-uploaded-output
- logger
- tmp-output-file-name
- #:reporter reporter)))))
- (log-msg logger
- 'DEBUG
- "computed the hash of " tmp-output-file-name ", renaming")
-
- (call-with-output-file (string-append output-file-name ".hash")
- (lambda (port)
- (simple-format port "~A\n" hash))))
-
- (rename-file tmp-output-file-name
- output-file-name))))
+ (let ((result (compute-output-hash-via-channel
+ output-hash-channel
+ request
+ response-port
+ uuid
+ tmp-output-file-name)))
+ ;; TODO: Maybe do something different here to
+ ;; indicate the issue to the agent?
+ (unless (exception? result)
+ (log-msg logger
+ 'DEBUG
+ "computed the hash of " tmp-output-file-name ", renaming")
+
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" result)))
+
+ (rename-file tmp-output-file-name
+ output-file-name))))))
(render-json
'(("error" . "access denied"))
#:code 403))))