diff options
author | Christopher Baines <mail@cbaines.net> | 2023-05-10 10:13:54 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2023-05-10 10:13:54 +0100 |
commit | 930e1d5b489797666602674247efe82756ca4082 (patch) | |
tree | 89ec76474d02ee0a1ba75deda899172a9368f1a2 /guix-build-coordinator/agent-messaging | |
parent | 039430a0ef549c36cd67227be2b0ca2b08bb7925 (diff) | |
download | build-coordinator-930e1d5b489797666602674247efe82756ca4082.tar build-coordinator-930e1d5b489797666602674247efe82756ca4082.tar.gz |
Move computing output hashes to dedicated threads
This should help the coordinator and agents ensure hashes are computed and the
agent finds out when this has happened, even in a situation where the
coordinator is restarted/crashes and the connection between the agents and
coordinator are lost.
Diffstat (limited to 'guix-build-coordinator/agent-messaging')
-rw-r--r-- | guix-build-coordinator/agent-messaging/http/server.scm | 335 |
1 files changed, 219 insertions, 116 deletions
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm index 2ef8500..7fe3dd4 100644 --- a/guix-build-coordinator/agent-messaging/http/server.scm +++ b/guix-build-coordinator/agent-messaging/http/server.scm @@ -41,16 +41,24 @@ #:use-module (lzlib) #:use-module (gcrypt base16) #:use-module (gcrypt hash) + #:use-module (fibers) + #:use-module (fibers timers) + #:use-module (fibers channels) + #:use-module (fibers operations) #:use-module (prometheus) #:use-module (guix base32) #:use-module (guix base64) #:use-module (guix progress) #:use-module (guix build utils) + #:use-module ((guix build syscalls) + #:select (set-thread-name)) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator coordinator) - #:export (http-agent-messaging-start-server)) + #:export (make-output-hash-channel + + http-agent-messaging-start-server)) (define (bad-request message . args) (throw 'bad-request message args)) @@ -107,35 +115,27 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." #f)) (forward-invocation)))))) -(define* (progress-reporter/hash size log-port - force-output-to-log-port +(define* (progress-reporter/hash size callback #:key (progress-interval - (make-time time-duration 0 20))) + (make-time time-duration 0 10))) (define total 0) - (define (report-progress transferred) - (define message - (format #f "~a~%" transferred)) - - (display message log-port) ;should be atomic - (force-output-to-log-port)) - (progress-reporter (start (lambda () (set! total 0) - (display "0\n" log-port))) - (report (let ((report (rate-limited report-progress progress-interval))) + (callback 0))) + (report (let ((report (rate-limited callback progress-interval))) (lambda (transferred) (set! total transferred) (report transferred)))) (stop (lambda () (let ((size (or size total))) - (report-progress size) - (display "finished\n" log-port)))))) + (callback size)))))) (define (http-agent-messaging-start-server port host secret-key-base build-coordinator - chunked-request-channel) + chunked-request-channel + output-hash-channel) (define gc-metrics-updater (get-gc-metrics-updater (build-coordinator-metrics-registry build-coordinator))) @@ -177,6 +177,7 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." secret-key-base build-coordinator chunked-request-channel + output-hash-channel update-managed-metrics!))) #:host host #:port port)) @@ -275,45 +276,170 @@ port. Also, the port used can be changed by passing the --port option.\n" `((event . ,(assq-ref event-count 'event))))) (datastore-count-unprocessed-hook-events datastore))))) -(define* (compute-hash-of-uploaded-output logger file-name - #:key - (reporter progress-reporter/silent)) - (bytevector->nix-base32-string - (with-exception-handler - (lambda (exn) - (log-msg logger - 'WARN - "error computing hash: " exn) - - (when (file-exists? file-name) - (let ((md5-hash - (bytevector->base16-string - (file-hash (hash-algorithm md5) file-name))) - (file-bytes - (stat:size (stat file-name)))) - ;; I've seen exceptions happen here from lzip, so try - ;; deleting the tmp file so that it's re-uploaded. - (log-msg logger 'WARN "deleting " file-name) - (delete-file file-name) - - (raise-exception - (make-exception - exn - (make-exception-with-irritants - `((file-bytes . ,file-bytes) - (md5-hash . ,md5-hash))))))) - - (raise-exception exn)) +(define (make-output-hash-channel build-coordinator) + (define logger + (build-coordinator-logger build-coordinator)) + + (define (compute-hash-of-uploaded-output channel filename) + (with-exception-handler + (lambda (exn) + (log-msg logger + 'WARN + "error computing hash: " exn) + + (when (file-exists? filename) + (let ((md5-hash + (bytevector->base16-string + (file-hash (hash-algorithm md5) filename))) + (file-bytes + (stat:size (stat filename)))) + ;; I've seen exceptions happen here from lzip, so try + ;; deleting the tmp file so that it's re-uploaded. + (log-msg logger 'WARN "deleting " filename) + (delete-file filename) + + (raise-exception + (make-exception + exn + (make-exception-with-irritants + `((file-bytes . ,file-bytes) + (md5-hash . ,md5-hash))))))) + + exn) + (lambda () + (bytevector->nix-base32-string + (call-with-input-file filename + (lambda (compressed-port) + (call-with-lzip-input-port + compressed-port + (lambda (port) + (port-hash* (hash-algorithm sha256) + port + #:reporter + (progress-reporter/hash + (stat:size (stat filename)) + (lambda (processed-bytes) + (put-message + channel + `(update ,filename ,processed-bytes)))))))) + #:binary #t))) + #:unwind? #t)) + + (let ((channel (make-channel)) + (update-channels-by-filename + (make-hash-table))) + (call-with-new-thread (lambda () - (call-with-input-file file-name - (lambda (compressed-port) - (call-with-lzip-input-port compressed-port - (lambda (port) - (port-hash* (hash-algorithm sha256) - port - #:reporter reporter)))) - #:binary #t)) - #:unwind? #t))) + (while #t + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception in output hash thread: " exn)) + (lambda () + (match (get-message channel) + (('request build-uuid filename update-channel) + (or (and=> + (hash-ref update-channels-by-filename + filename) + (lambda (existing-channels) + (hash-set! update-channels-by-filename + filename + (cons update-channel + existing-channels)))) + (begin + (hash-set! update-channels-by-filename + filename + (list update-channel)) + + (call-with-new-thread + (lambda () + (set-thread-name "hash output") + (log-msg logger 'DEBUG build-uuid ": computing hash of " filename) + + (put-message + channel + (list 'result + filename + (compute-hash-of-uploaded-output channel + filename)))))))) + (('update filename bytes-processed) + (for-each + (lambda (update-channel) + (spawn-fiber + (lambda () + (perform-operation + (choice-operation + (put-operation update-channel bytes-processed) + (sleep-operation 5)))) + (build-coordinator-scheduler build-coordinator) + #:parallel? #t)) + (hash-ref update-channels-by-filename filename))) + (('result filename result) + (for-each + (lambda (update-channel) + (spawn-fiber + (lambda () + (perform-operation + (choice-operation + (put-operation update-channel (list 'result result)) + (sleep-operation 60)))) + (build-coordinator-scheduler build-coordinator) + #:parallel? #t)) + (hash-ref update-channels-by-filename filename)) + + (hash-remove! update-channels-by-filename filename)))) + #:unwind? #t)))) + channel)) + +(define (compute-output-hash-via-channel output-hash-channel + request + response-port + build-uuid + tmp-output-file-name) + (let ((channel (make-channel))) + (define (write-to-response-port response) + (display response response-port) + (force-output response-port) + ;; TODO because the chunked output port + ;; doesn't call force-output on the + ;; underlying port, do that here. We + ;; want this event to be sent now, + ;; rather than when some buffer fills + ;; up. + (force-output (request-port request))) + + (define (get-message* channel) + (perform-operation + (choice-operation + (get-operation channel) + (wrap-operation + (sleep-operation 20) + (const 'timeout))))) + + (put-message output-hash-channel + (list 'request build-uuid tmp-output-file-name channel)) + + (let loop ((previous-bytes-processed 0) + (message (get-message* channel))) + (match message + (('result result) result) + ('timeout + (write-to-response-port + (simple-format #f "~A\n" previous-bytes-processed)) + (loop previous-bytes-processed + (get-message* channel))) + (bytes-processed + (if (> bytes-processed previous-bytes-processed) + (begin + (write-to-response-port + (simple-format #f "~A\n" bytes-processed)) + (loop bytes-processed + (get-message* channel))) + (begin + ;; Still write to keep the connection open + (write-to-response-port + (simple-format #f "~A\n" previous-bytes-processed)) + (loop previous-bytes-processed + (get-message* channel))))))))) (define* (receive-file body length @@ -385,6 +511,7 @@ port. Also, the port used can be changed by passing the --port option.\n" secret-key-base build-coordinator chunked-request-channel + output-hash-channel update-managed-metrics!) (define (authenticated? uuid request) (let* ((authorization-base64 @@ -643,37 +770,25 @@ port. Also, the port used can be changed by passing the --port option.\n" ;; Make sure NGinx gets the response headers (force-output (request-port request)) - ;; Compute the hash of the file - (let* ((reporter (progress-reporter/hash - (stat:size (stat tmp-output-file-name)) - response-port - (lambda () - (force-output response-port) - ;; TODO because the chunked output port - ;; doesn't call force-output on the - ;; underlying port, do that here. We - ;; want this event to be sent now, - ;; rather than when some buffer fills - ;; up. - (force-output (request-port request))))) - (hash - (call-with-worker-thread - chunked-request-channel - (lambda () - (compute-hash-of-uploaded-output - logger - tmp-output-file-name - #:reporter reporter))))) - (log-msg logger - 'DEBUG - "computed the hash of " tmp-output-file-name ", renaming") - - (call-with-output-file (string-append output-file-name ".hash") - (lambda (port) - (simple-format port "~A\n" hash)))) - - (rename-file tmp-output-file-name - output-file-name)))) + (let ((result (compute-output-hash-via-channel + output-hash-channel + request + response-port + uuid + tmp-output-file-name))) + ;; TODO: Maybe do something different here to + ;; indicate the issue to the agent? + (unless (exception? result) + (log-msg logger + 'DEBUG + "computed the hash of " tmp-output-file-name ", renaming") + + (call-with-output-file (string-append output-file-name ".hash") + (lambda (port) + (simple-format port "~A\n" result))) + + (rename-file tmp-output-file-name + output-file-name)))))) (render-json '(("error" . "access denied")) #:code 403)))) @@ -736,37 +851,25 @@ port. Also, the port used can be changed by passing the --port option.\n" ;; Make sure NGinx gets the response headers (force-output (request-port request)) - ;; Compute the hash of the file - (let* ((reporter (progress-reporter/hash - (stat:size (stat tmp-output-file-name)) - response-port - (lambda () - (force-output response-port) - ;; TODO because the chunked output port - ;; doesn't call force-output on the - ;; underlying port, do that here. We - ;; want this event to be sent now, - ;; rather than when some buffer fills - ;; up. - (force-output (request-port request))))) - (hash - (call-with-worker-thread - chunked-request-channel - (lambda () - (compute-hash-of-uploaded-output - logger - tmp-output-file-name - #:reporter reporter))))) - (log-msg logger - 'DEBUG - "computed the hash of " tmp-output-file-name ", renaming") - - (call-with-output-file (string-append output-file-name ".hash") - (lambda (port) - (simple-format port "~A\n" hash)))) - - (rename-file tmp-output-file-name - output-file-name)))) + (let ((result (compute-output-hash-via-channel + output-hash-channel + request + response-port + uuid + tmp-output-file-name))) + ;; TODO: Maybe do something different here to + ;; indicate the issue to the agent? + (unless (exception? result) + (log-msg logger + 'DEBUG + "computed the hash of " tmp-output-file-name ", renaming") + + (call-with-output-file (string-append output-file-name ".hash") + (lambda (port) + (simple-format port "~A\n" result))) + + (rename-file tmp-output-file-name + output-file-name)))))) (render-json '(("error" . "access denied")) #:code 403)))) |