path: root/guix-build-coordinator/agent-messaging/http
diff options
authorChristopher Baines <mail@cbaines.net>2023-05-10 10:13:54 +0100
committerChristopher Baines <mail@cbaines.net>2023-05-10 10:13:54 +0100
commit930e1d5b489797666602674247efe82756ca4082 (patch)
tree89ec76474d02ee0a1ba75deda899172a9368f1a2 /guix-build-coordinator/agent-messaging/http
parent039430a0ef549c36cd67227be2b0ca2b08bb7925 (diff)
Move computing output hashes to dedicated threads
This should help the coordinator and agents ensure hashes are computed and the agent finds out when this has happened, even in a situation where the coordinator is restarted/crashes and the connection between the agents and coordinator are lost.
Diffstat (limited to 'guix-build-coordinator/agent-messaging/http')
1 files changed, 219 insertions, 116 deletions
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm
index 2ef8500..7fe3dd4 100644
--- a/guix-build-coordinator/agent-messaging/http/server.scm
+++ b/guix-build-coordinator/agent-messaging/http/server.scm
@@ -41,16 +41,24 @@
#:use-module (lzlib)
#:use-module (gcrypt base16)
#:use-module (gcrypt hash)
+ #:use-module (fibers)
+ #:use-module (fibers timers)
+ #:use-module (fibers channels)
+ #:use-module (fibers operations)
#:use-module (prometheus)
#:use-module (guix base32)
#:use-module (guix base64)
#:use-module (guix progress)
#:use-module (guix build utils)
+ #:use-module ((guix build syscalls)
+ #:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator utils fibers)
#:use-module (guix-build-coordinator datastore)
#:use-module (guix-build-coordinator coordinator)
- #:export (http-agent-messaging-start-server))
+ #:export (make-output-hash-channel
+ http-agent-messaging-start-server))
(define (bad-request message . args)
(throw 'bad-request message args))
@@ -107,35 +115,27 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
-(define* (progress-reporter/hash size log-port
- force-output-to-log-port
+(define* (progress-reporter/hash size callback
#:key (progress-interval
- (make-time time-duration 0 20)))
+ (make-time time-duration 0 10)))
(define total 0)
- (define (report-progress transferred)
- (define message
- (format #f "~a~%" transferred))
- (display message log-port) ;should be atomic
- (force-output-to-log-port))
(start (lambda ()
(set! total 0)
- (display "0\n" log-port)))
- (report (let ((report (rate-limited report-progress progress-interval)))
+ (callback 0)))
+ (report (let ((report (rate-limited callback progress-interval)))
(lambda (transferred)
(set! total transferred)
(report transferred))))
(stop (lambda ()
(let ((size (or size total)))
- (report-progress size)
- (display "finished\n" log-port))))))
+ (callback size))))))
(define (http-agent-messaging-start-server port host secret-key-base
- chunked-request-channel)
+ chunked-request-channel
+ output-hash-channel)
(define gc-metrics-updater
(build-coordinator-metrics-registry build-coordinator)))
@@ -177,6 +177,7 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
+ output-hash-channel
#:host host
#:port port))
@@ -275,45 +276,170 @@ port. Also, the port used can be changed by passing the --port option.\n"
`((event . ,(assq-ref event-count 'event)))))
(datastore-count-unprocessed-hook-events datastore)))))
-(define* (compute-hash-of-uploaded-output logger file-name
- #:key
- (reporter progress-reporter/silent))
- (bytevector->nix-base32-string
- (with-exception-handler
- (lambda (exn)
- (log-msg logger
- "error computing hash: " exn)
- (when (file-exists? file-name)
- (let ((md5-hash
- (bytevector->base16-string
- (file-hash (hash-algorithm md5) file-name)))
- (file-bytes
- (stat:size (stat file-name))))
- ;; I've seen exceptions happen here from lzip, so try
- ;; deleting the tmp file so that it's re-uploaded.
- (log-msg logger 'WARN "deleting " file-name)
- (delete-file file-name)
- (raise-exception
- (make-exception
- exn
- (make-exception-with-irritants
- `((file-bytes . ,file-bytes)
- (md5-hash . ,md5-hash)))))))
- (raise-exception exn))
+(define (make-output-hash-channel build-coordinator)
+ (define logger
+ (build-coordinator-logger build-coordinator))
+ (define (compute-hash-of-uploaded-output channel filename)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg logger
+ "error computing hash: " exn)
+ (when (file-exists? filename)
+ (let ((md5-hash
+ (bytevector->base16-string
+ (file-hash (hash-algorithm md5) filename)))
+ (file-bytes
+ (stat:size (stat filename))))
+ ;; I've seen exceptions happen here from lzip, so try
+ ;; deleting the tmp file so that it's re-uploaded.
+ (log-msg logger 'WARN "deleting " filename)
+ (delete-file filename)
+ (raise-exception
+ (make-exception
+ exn
+ (make-exception-with-irritants
+ `((file-bytes . ,file-bytes)
+ (md5-hash . ,md5-hash)))))))
+ exn)
+ (lambda ()
+ (bytevector->nix-base32-string
+ (call-with-input-file filename
+ (lambda (compressed-port)
+ (call-with-lzip-input-port
+ compressed-port
+ (lambda (port)
+ (port-hash* (hash-algorithm sha256)
+ port
+ #:reporter
+ (progress-reporter/hash
+ (stat:size (stat filename))
+ (lambda (processed-bytes)
+ (put-message
+ channel
+ `(update ,filename ,processed-bytes))))))))
+ #:binary #t)))
+ #:unwind? #t))
+ (let ((channel (make-channel))
+ (update-channels-by-filename
+ (make-hash-table)))
+ (call-with-new-thread
(lambda ()
- (call-with-input-file file-name
- (lambda (compressed-port)
- (call-with-lzip-input-port compressed-port
- (lambda (port)
- (port-hash* (hash-algorithm sha256)
- port
- #:reporter reporter))))
- #:binary #t))
- #:unwind? #t)))
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception in output hash thread: " exn))
+ (lambda ()
+ (match (get-message channel)
+ (('request build-uuid filename update-channel)
+ (or (and=>
+ (hash-ref update-channels-by-filename
+ filename)
+ (lambda (existing-channels)
+ (hash-set! update-channels-by-filename
+ filename
+ (cons update-channel
+ existing-channels))))
+ (begin
+ (hash-set! update-channels-by-filename
+ filename
+ (list update-channel))
+ (call-with-new-thread
+ (lambda ()
+ (set-thread-name "hash output")
+ (log-msg logger 'DEBUG build-uuid ": computing hash of " filename)
+ (put-message
+ channel
+ (list 'result
+ filename
+ (compute-hash-of-uploaded-output channel
+ filename))))))))
+ (('update filename bytes-processed)
+ (for-each
+ (lambda (update-channel)
+ (spawn-fiber
+ (lambda ()
+ (perform-operation
+ (choice-operation
+ (put-operation update-channel bytes-processed)
+ (sleep-operation 5))))
+ (build-coordinator-scheduler build-coordinator)
+ #:parallel? #t))
+ (hash-ref update-channels-by-filename filename)))
+ (('result filename result)
+ (for-each
+ (lambda (update-channel)
+ (spawn-fiber
+ (lambda ()
+ (perform-operation
+ (choice-operation
+ (put-operation update-channel (list 'result result))
+ (sleep-operation 60))))
+ (build-coordinator-scheduler build-coordinator)
+ #:parallel? #t))
+ (hash-ref update-channels-by-filename filename))
+ (hash-remove! update-channels-by-filename filename))))
+ #:unwind? #t))))
+ channel))
+(define (compute-output-hash-via-channel output-hash-channel
+ request
+ response-port
+ build-uuid
+ tmp-output-file-name)
+ (let ((channel (make-channel)))
+ (define (write-to-response-port response)
+ (display response response-port)
+ (force-output response-port)
+ ;; TODO because the chunked output port
+ ;; doesn't call force-output on the
+ ;; underlying port, do that here. We
+ ;; want this event to be sent now,
+ ;; rather than when some buffer fills
+ ;; up.
+ (force-output (request-port request)))
+ (define (get-message* channel)
+ (perform-operation
+ (choice-operation
+ (get-operation channel)
+ (wrap-operation
+ (sleep-operation 20)
+ (const 'timeout)))))
+ (put-message output-hash-channel
+ (list 'request build-uuid tmp-output-file-name channel))
+ (let loop ((previous-bytes-processed 0)
+ (message (get-message* channel)))
+ (match message
+ (('result result) result)
+ ('timeout
+ (write-to-response-port
+ (simple-format #f "~A\n" previous-bytes-processed))
+ (loop previous-bytes-processed
+ (get-message* channel)))
+ (bytes-processed
+ (if (> bytes-processed previous-bytes-processed)
+ (begin
+ (write-to-response-port
+ (simple-format #f "~A\n" bytes-processed))
+ (loop bytes-processed
+ (get-message* channel)))
+ (begin
+ ;; Still write to keep the connection open
+ (write-to-response-port
+ (simple-format #f "~A\n" previous-bytes-processed))
+ (loop previous-bytes-processed
+ (get-message* channel)))))))))
(define* (receive-file body
@@ -385,6 +511,7 @@ port. Also, the port used can be changed by passing the --port option.\n"
+ output-hash-channel
(define (authenticated? uuid request)
(let* ((authorization-base64
@@ -643,37 +770,25 @@ port. Also, the port used can be changed by passing the --port option.\n"
;; Make sure NGinx gets the response headers
(force-output (request-port request))
- ;; Compute the hash of the file
- (let* ((reporter (progress-reporter/hash
- (stat:size (stat tmp-output-file-name))
- response-port
- (lambda ()
- (force-output response-port)
- ;; TODO because the chunked output port
- ;; doesn't call force-output on the
- ;; underlying port, do that here. We
- ;; want this event to be sent now,
- ;; rather than when some buffer fills
- ;; up.
- (force-output (request-port request)))))
- (hash
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (compute-hash-of-uploaded-output
- logger
- tmp-output-file-name
- #:reporter reporter)))))
- (log-msg logger
- "computed the hash of " tmp-output-file-name ", renaming")
- (call-with-output-file (string-append output-file-name ".hash")
- (lambda (port)
- (simple-format port "~A\n" hash))))
- (rename-file tmp-output-file-name
- output-file-name))))
+ (let ((result (compute-output-hash-via-channel
+ output-hash-channel
+ request
+ response-port
+ uuid
+ tmp-output-file-name)))
+ ;; TODO: Maybe do something different here to
+ ;; indicate the issue to the agent?
+ (unless (exception? result)
+ (log-msg logger
+ "computed the hash of " tmp-output-file-name ", renaming")
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" result)))
+ (rename-file tmp-output-file-name
+ output-file-name))))))
'(("error" . "access denied"))
#:code 403))))
@@ -736,37 +851,25 @@ port. Also, the port used can be changed by passing the --port option.\n"
;; Make sure NGinx gets the response headers
(force-output (request-port request))
- ;; Compute the hash of the file
- (let* ((reporter (progress-reporter/hash
- (stat:size (stat tmp-output-file-name))
- response-port
- (lambda ()
- (force-output response-port)
- ;; TODO because the chunked output port
- ;; doesn't call force-output on the
- ;; underlying port, do that here. We
- ;; want this event to be sent now,
- ;; rather than when some buffer fills
- ;; up.
- (force-output (request-port request)))))
- (hash
- (call-with-worker-thread
- chunked-request-channel
- (lambda ()
- (compute-hash-of-uploaded-output
- logger
- tmp-output-file-name
- #:reporter reporter)))))
- (log-msg logger
- "computed the hash of " tmp-output-file-name ", renaming")
- (call-with-output-file (string-append output-file-name ".hash")
- (lambda (port)
- (simple-format port "~A\n" hash))))
- (rename-file tmp-output-file-name
- output-file-name))))
+ (let ((result (compute-output-hash-via-channel
+ output-hash-channel
+ request
+ response-port
+ uuid
+ tmp-output-file-name)))
+ ;; TODO: Maybe do something different here to
+ ;; indicate the issue to the agent?
+ (unless (exception? result)
+ (log-msg logger
+ "computed the hash of " tmp-output-file-name ", renaming")
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" result)))
+ (rename-file tmp-output-file-name
+ output-file-name))))))
'(("error" . "access denied"))
#:code 403))))