diff options
Diffstat (limited to 'guix-build-coordinator/agent-messaging')
-rw-r--r-- | guix-build-coordinator/agent-messaging/http/server.scm | 328 |
1 files changed, 192 insertions, 136 deletions
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm index 12139c0..4cf1e97 100644 --- a/guix-build-coordinator/agent-messaging/http/server.scm +++ b/guix-build-coordinator/agent-messaging/http/server.scm @@ -21,6 +21,7 @@ (define-module (guix-build-coordinator agent-messaging http server) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) + #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 ftw) #:use-module (ice-9 threads) @@ -43,6 +44,7 @@ #:use-module (prometheus) #:use-module (guix base32) #:use-module (guix base64) + #:use-module (guix progress) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) @@ -75,6 +77,55 @@ if there was no request body." 'read-request-body fixed/read-request-body) +(define* (port-hash* algorithm port + #:key (reporter progress-reporter/silent)) + (let ((out get (open-hash-port algorithm))) + (dump-port* port out #:reporter reporter) + (close-port out) + (get))) + +(define (rate-limited proc interval) + "Return a procedure that will forward the invocation to PROC when the time +elapsed since the previous forwarded invocation is greater or equal to +INTERVAL (a time-duration object), otherwise does nothing and returns #f." + (let ((previous-at #f)) + (lambda args + (let* ((now (current-time time-monotonic)) + (forward-invocation (lambda () + (set! previous-at now) + (apply proc args)))) + (if previous-at + (let ((elapsed (time-difference now previous-at))) + (if (time>=? elapsed interval) + (forward-invocation) + #f)) + (forward-invocation)))))) + +(define* (progress-reporter/hash size log-port + #:key (progress-interval + (make-time time-duration 0 20))) + (define total 0) + + (define (report-progress transferred) + (define message + (format #f "~a~%" transferred)) + + (display message log-port) ;should be atomic + (force-output log-port)) + + (progress-reporter + (start (lambda () + (set! total 0) + (display "0\n" log-port))) + (report (let ((report (rate-limited report-progress progress-interval))) + (lambda (transferred) + (set! total transferred) + (report transferred)))) + (stop (lambda () + (let ((size (or size total))) + (report-progress size) + (display "finished\n" log-port)))))) + (define (http-agent-messaging-start-server port host secret-key-base build-coordinator chunked-request-channel) @@ -217,6 +268,46 @@ port. Also, the port used can be changed by passing the --port option.\n" `((event . ,(assq-ref event-count 'event))))) (datastore-count-unprocessed-hook-events datastore))))) +(define* (compute-hash-of-uploaded-output logger file-name + #:key + (reporter progress-reporter/silent)) + (bytevector->nix-base32-string + (with-exception-handler + (lambda (exn) + (log-msg logger + 'WARN + "error computing hash: " exn) + + (when (file-exists? file-name) + (let ((md5-hash + (bytevector->base16-string + (file-hash (hash-algorithm md5) file-name))) + (file-bytes + (stat:size (stat file-name)))) + ;; I've seen exceptions happen here from lzip, so try + ;; deleting the tmp file so that it's re-uploaded. + (log-msg logger 'WARN "deleting " file-name) + (delete-file file-name) + + (raise-exception + (make-exception + exn + (make-exception-with-irritants + `((file-bytes . ,file-bytes) + (md5-hash . ,md5-hash))))))) + + (raise-exception exn)) + (lambda () + (call-with-input-file file-name + (lambda (compressed-port) + (call-with-lzip-input-port compressed-port + (lambda (port) + (port-hash* (hash-algorithm sha256) + port + #:reporter reporter)))) + #:binary #t)) + #:unwind? #t))) + (define (controller request method-and-path-components body @@ -431,40 +522,6 @@ port. Also, the port used can be changed by passing the --port option.\n" (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) - (define (compute-hash file-name) - (bytevector->nix-base32-string - (with-exception-handler - (lambda (exn) - (log-msg logger - 'WARN - "error computing hash: " exn) - - (when (file-exists? file-name) - (let ((md5-hash - (bytevector->base16-string - (file-hash (hash-algorithm md5) file-name))) - (file-bytes - (stat:size (stat file-name)))) - ;; I've seen exceptions happen here from lzip, so try - ;; deleting the tmp file so that it's re-uploaded. - (log-msg logger 'WARN "deleting " file-name) - (delete-file file-name) - - (raise-exception - (make-exception - exn - (make-exception-with-irritants - `((file-bytes . ,file-bytes) - (md5-hash . ,md5-hash))))))) - - (raise-exception exn)) - (lambda () - (call-with-input-file file-name - (lambda (compressed-port) - (call-with-lzip-input-port compressed-port - port-sha256)))) - #:unwind? #t))) - (define (receive-file output-file-name tmp-output-file-name) (call-with-worker-thread chunked-request-channel @@ -516,17 +573,7 @@ port. Also, the port used can be changed by passing the --port option.\n" (/ bytes-read 1000000)))) bytes-read) - last-progress-update-bytes-read)))))))) - - ;; Compute the hash of the file - (let ((hash (compute-hash tmp-output-file-name))) - (log-msg logger - 'DEBUG - "computed the hash of " tmp-output-file-name ", renaming") - - (rename-file tmp-output-file-name - output-file-name) - hash)))) + last-progress-update-bytes-read))))))))))) (if (authenticated? agent-id-for-build request) (let* ((output-file-name @@ -548,30 +595,56 @@ port. Also, the port used can be changed by passing the --port option.\n" "deleting " tmp-output-file-name) (delete-file tmp-output-file-name)) - (let ((hash - (if (bytevector? body) - (begin - (call-with-output-file tmp-output-file-name - (lambda (output-port) - (put-bytevector output-port body))) - - (let ((hash (compute-hash tmp-output-file-name))) - (log-msg logger - 'DEBUG - "computed the hash of " tmp-output-file-name - ", renaming") - - (rename-file tmp-output-file-name - output-file-name) - hash)) - (receive-file output-file-name - tmp-output-file-name)))) - - (call-with-output-file (string-append output-file-name ".hash") - (lambda (port) - (simple-format port "~A\n" hash))) - - (no-content))) + (if (bytevector? body) + (begin + (call-with-output-file tmp-output-file-name + (lambda (output-port) + (put-bytevector output-port body))) + + (let ((hash (compute-hash-of-uploaded-output + logger + tmp-output-file-name))) + (log-msg logger + 'DEBUG + "computed the hash of " tmp-output-file-name + ", renaming") + + (rename-file tmp-output-file-name + output-file-name) + + (call-with-output-file (string-append output-file-name ".hash") + (lambda (port) + (simple-format port "~A\n" hash)))) + + (no-content)) + (begin + (receive-file output-file-name + tmp-output-file-name) + + (list + (build-response + #:code 200 + #:headers '((content-type . (text/plain)) + (Transfer-Encoding . "chunked"))) + (lambda (response-port) + ;; Compute the hash of the file + (let* ((reporter (progress-reporter/hash + (stat:size (stat tmp-output-file-name)) + response-port)) + (hash (compute-hash-of-uploaded-output + logger + tmp-output-file-name + #:reporter reporter))) + (log-msg logger + 'DEBUG + "computed the hash of " tmp-output-file-name ", renaming") + + (call-with-output-file (string-append output-file-name ".hash") + (lambda (port) + (simple-format port "~A\n" hash)))) + + (rename-file tmp-output-file-name + output-file-name)))))) (render-json '(("error" . "access denied")) #:code 403)))) @@ -597,38 +670,6 @@ port. Also, the port used can be changed by passing the --port option.\n" (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) - (define (compute-hash file-name) - (bytevector->nix-base32-string - (with-exception-handler - (lambda (exn) - (log-msg logger 'WARN "error computing hash: " exn) - - (when (file-exists? file-name) - (let ((md5-hash - (bytevector->base16-string - (file-hash (hash-algorithm md5) file-name))) - (file-bytes - (stat:size (stat file-name)))) - ;; I've seen exceptions happen here from lzip, so try - ;; deleting the tmp file so that it's re-uploaded. - (log-msg logger 'WARN "deleting " file-name) - (delete-file file-name) - - (raise-exception - (make-exception - exn - (make-exception-with-irritants - `((file-bytes . ,file-bytes) - (md5-hash . ,md5-hash))))))) - - (raise-exception exn)) - (lambda () - (call-with-input-file file-name - (lambda (compressed-port) - (call-with-lzip-input-port compressed-port - port-sha256)))) - #:unwind? #t))) - (define (receive-file output-file-name tmp-output-file-name) (call-with-worker-thread chunked-request-channel @@ -681,18 +722,7 @@ port. Also, the port used can be changed by passing the --port option.\n" 1000000)))) bytes-read) last-progress-update-bytes-read)))))) - (close-port output-port)) - - ;; Compute the hash of the file - (let ((hash (compute-hash tmp-output-file-name))) - (log-msg logger - 'DEBUG - "computed the hash of " tmp-output-file-name - ", renaming") - - (rename-file tmp-output-file-name - output-file-name) - hash)))) + (close-port output-port))))) (if (authenticated? agent-id-for-build request) (let* ((output-file-name @@ -709,29 +739,55 @@ port. Also, the port used can be changed by passing the --port option.\n" "deleting " output-file-name) (delete-file output-file-name)) - (let ((hash - (if (bytevector? body) - (let ((output-port (open-file tmp-output-file-name "a"))) - (put-bytevector output-port body) - (close-port output-port) - - (let ((hash (compute-hash tmp-output-file-name))) - (log-msg logger - 'DEBUG - "computed the hash of " tmp-output-file-name - ", renaming") - - (rename-file tmp-output-file-name - output-file-name) - hash)) - (receive-file output-file-name - tmp-output-file-name)))) - - (call-with-output-file (string-append output-file-name ".hash") - (lambda (port) - (simple-format port "~A\n" hash))) - - (no-content))) + (if (bytevector? body) + (let ((output-port (open-file tmp-output-file-name "a"))) + (put-bytevector output-port body) + (close-port output-port) + + (let ((hash (compute-hash-of-uploaded-output + logger + tmp-output-file-name))) + (log-msg logger + 'DEBUG + "computed the hash of " tmp-output-file-name + ", renaming") + + (rename-file tmp-output-file-name + output-file-name) + + (call-with-output-file (string-append output-file-name ".hash") + (lambda (port) + (simple-format port "~A\n" hash)))) + + (no-content)) + (begin + (receive-file output-file-name + tmp-output-file-name) + + (list + (build-response + #:code 200 + #:headers '((content-type . (text/plain)) + (Transfer-Encoding . "chunked"))) + (lambda (response-port) + ;; Compute the hash of the file + (let* ((reporter (progress-reporter/hash + (stat:size (stat tmp-output-file-name)) + response-port)) + (hash (compute-hash-of-uploaded-output + logger + tmp-output-file-name + #:reporter reporter))) + (log-msg logger + 'DEBUG + "computed the hash of " tmp-output-file-name ", renaming") + + (call-with-output-file (string-append output-file-name ".hash") + (lambda (port) + (simple-format port "~A\n" hash)))) + + (rename-file tmp-output-file-name + output-file-name)))))) (render-json '(("error" . "access denied")) #:code 403)))) |