aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/agent-messaging
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator/agent-messaging')
-rw-r--r--guix-build-coordinator/agent-messaging/http/server.scm328
1 files changed, 192 insertions, 136 deletions
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm
index 12139c0..4cf1e97 100644
--- a/guix-build-coordinator/agent-messaging/http/server.scm
+++ b/guix-build-coordinator/agent-messaging/http/server.scm
@@ -21,6 +21,7 @@
(define-module (guix-build-coordinator agent-messaging http server)
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-19)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 ftw)
#:use-module (ice-9 threads)
@@ -43,6 +44,7 @@
#:use-module (prometheus)
#:use-module (guix base32)
#:use-module (guix base64)
+ #:use-module (guix progress)
#:use-module (guix build utils)
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator utils fibers)
@@ -75,6 +77,55 @@ if there was no request body."
'read-request-body
fixed/read-request-body)
+(define* (port-hash* algorithm port
+ #:key (reporter progress-reporter/silent))
+ (let ((out get (open-hash-port algorithm)))
+ (dump-port* port out #:reporter reporter)
+ (close-port out)
+ (get)))
+
+(define (rate-limited proc interval)
+ "Return a procedure that will forward the invocation to PROC when the time
+elapsed since the previous forwarded invocation is greater or equal to
+INTERVAL (a time-duration object), otherwise does nothing and returns #f."
+ (let ((previous-at #f))
+ (lambda args
+ (let* ((now (current-time time-monotonic))
+ (forward-invocation (lambda ()
+ (set! previous-at now)
+ (apply proc args))))
+ (if previous-at
+ (let ((elapsed (time-difference now previous-at)))
+ (if (time>=? elapsed interval)
+ (forward-invocation)
+ #f))
+ (forward-invocation))))))
+
+(define* (progress-reporter/hash size log-port
+ #:key (progress-interval
+ (make-time time-duration 0 20)))
+ (define total 0)
+
+ (define (report-progress transferred)
+ (define message
+ (format #f "~a~%" transferred))
+
+ (display message log-port) ;should be atomic
+ (force-output log-port))
+
+ (progress-reporter
+ (start (lambda ()
+ (set! total 0)
+ (display "0\n" log-port)))
+ (report (let ((report (rate-limited report-progress progress-interval)))
+ (lambda (transferred)
+ (set! total transferred)
+ (report transferred))))
+ (stop (lambda ()
+ (let ((size (or size total)))
+ (report-progress size)
+ (display "finished\n" log-port))))))
+
(define (http-agent-messaging-start-server port host secret-key-base
build-coordinator
chunked-request-channel)
@@ -217,6 +268,46 @@ port. Also, the port used can be changed by passing the --port option.\n"
`((event . ,(assq-ref event-count 'event)))))
(datastore-count-unprocessed-hook-events datastore)))))
+(define* (compute-hash-of-uploaded-output logger file-name
+ #:key
+ (reporter progress-reporter/silent))
+ (bytevector->nix-base32-string
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg logger
+ 'WARN
+ "error computing hash: " exn)
+
+ (when (file-exists? file-name)
+ (let ((md5-hash
+ (bytevector->base16-string
+ (file-hash (hash-algorithm md5) file-name)))
+ (file-bytes
+ (stat:size (stat file-name))))
+ ;; I've seen exceptions happen here from lzip, so try
+ ;; deleting the tmp file so that it's re-uploaded.
+ (log-msg logger 'WARN "deleting " file-name)
+ (delete-file file-name)
+
+ (raise-exception
+ (make-exception
+ exn
+ (make-exception-with-irritants
+ `((file-bytes . ,file-bytes)
+ (md5-hash . ,md5-hash)))))))
+
+ (raise-exception exn))
+ (lambda ()
+ (call-with-input-file file-name
+ (lambda (compressed-port)
+ (call-with-lzip-input-port compressed-port
+ (lambda (port)
+ (port-hash* (hash-algorithm sha256)
+ port
+ #:reporter reporter))))
+ #:binary #t))
+ #:unwind? #t)))
+
(define (controller request
method-and-path-components
body
@@ -431,40 +522,6 @@ port. Also, the port used can be changed by passing the --port option.\n"
(let ((agent-id-for-build
(datastore-agent-for-build datastore uuid)))
- (define (compute-hash file-name)
- (bytevector->nix-base32-string
- (with-exception-handler
- (lambda (exn)
- (log-msg logger
- 'WARN
- "error computing hash: " exn)
-
- (when (file-exists? file-name)
- (let ((md5-hash
- (bytevector->base16-string
- (file-hash (hash-algorithm md5) file-name)))
- (file-bytes
- (stat:size (stat file-name))))
- ;; I've seen exceptions happen here from lzip, so try
- ;; deleting the tmp file so that it's re-uploaded.
- (log-msg logger 'WARN "deleting " file-name)
- (delete-file file-name)
-
- (raise-exception
- (make-exception
- exn
- (make-exception-with-irritants
- `((file-bytes . ,file-bytes)
- (md5-hash . ,md5-hash)))))))
-
- (raise-exception exn))
- (lambda ()
- (call-with-input-file file-name
- (lambda (compressed-port)
- (call-with-lzip-input-port compressed-port
- port-sha256))))
- #:unwind? #t)))
-
(define (receive-file output-file-name tmp-output-file-name)
(call-with-worker-thread
chunked-request-channel
@@ -516,17 +573,7 @@ port. Also, the port used can be changed by passing the --port option.\n"
(/ bytes-read
1000000))))
bytes-read)
- last-progress-update-bytes-read))))))))
-
- ;; Compute the hash of the file
- (let ((hash (compute-hash tmp-output-file-name)))
- (log-msg logger
- 'DEBUG
- "computed the hash of " tmp-output-file-name ", renaming")
-
- (rename-file tmp-output-file-name
- output-file-name)
- hash))))
+ last-progress-update-bytes-read)))))))))))
(if (authenticated? agent-id-for-build request)
(let* ((output-file-name
@@ -548,30 +595,56 @@ port. Also, the port used can be changed by passing the --port option.\n"
"deleting " tmp-output-file-name)
(delete-file tmp-output-file-name))
- (let ((hash
- (if (bytevector? body)
- (begin
- (call-with-output-file tmp-output-file-name
- (lambda (output-port)
- (put-bytevector output-port body)))
-
- (let ((hash (compute-hash tmp-output-file-name)))
- (log-msg logger
- 'DEBUG
- "computed the hash of " tmp-output-file-name
- ", renaming")
-
- (rename-file tmp-output-file-name
- output-file-name)
- hash))
- (receive-file output-file-name
- tmp-output-file-name))))
-
- (call-with-output-file (string-append output-file-name ".hash")
- (lambda (port)
- (simple-format port "~A\n" hash)))
-
- (no-content)))
+ (if (bytevector? body)
+ (begin
+ (call-with-output-file tmp-output-file-name
+ (lambda (output-port)
+ (put-bytevector output-port body)))
+
+ (let ((hash (compute-hash-of-uploaded-output
+ logger
+ tmp-output-file-name)))
+ (log-msg logger
+ 'DEBUG
+ "computed the hash of " tmp-output-file-name
+ ", renaming")
+
+ (rename-file tmp-output-file-name
+ output-file-name)
+
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" hash))))
+
+ (no-content))
+ (begin
+ (receive-file output-file-name
+ tmp-output-file-name)
+
+ (list
+ (build-response
+ #:code 200
+ #:headers '((content-type . (text/plain))
+ (Transfer-Encoding . "chunked")))
+ (lambda (response-port)
+ ;; Compute the hash of the file
+ (let* ((reporter (progress-reporter/hash
+ (stat:size (stat tmp-output-file-name))
+ response-port))
+ (hash (compute-hash-of-uploaded-output
+ logger
+ tmp-output-file-name
+ #:reporter reporter)))
+ (log-msg logger
+ 'DEBUG
+ "computed the hash of " tmp-output-file-name ", renaming")
+
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" hash))))
+
+ (rename-file tmp-output-file-name
+ output-file-name))))))
(render-json
'(("error" . "access denied"))
#:code 403))))
@@ -597,38 +670,6 @@ port. Also, the port used can be changed by passing the --port option.\n"
(let ((agent-id-for-build
(datastore-agent-for-build datastore uuid)))
- (define (compute-hash file-name)
- (bytevector->nix-base32-string
- (with-exception-handler
- (lambda (exn)
- (log-msg logger 'WARN "error computing hash: " exn)
-
- (when (file-exists? file-name)
- (let ((md5-hash
- (bytevector->base16-string
- (file-hash (hash-algorithm md5) file-name)))
- (file-bytes
- (stat:size (stat file-name))))
- ;; I've seen exceptions happen here from lzip, so try
- ;; deleting the tmp file so that it's re-uploaded.
- (log-msg logger 'WARN "deleting " file-name)
- (delete-file file-name)
-
- (raise-exception
- (make-exception
- exn
- (make-exception-with-irritants
- `((file-bytes . ,file-bytes)
- (md5-hash . ,md5-hash)))))))
-
- (raise-exception exn))
- (lambda ()
- (call-with-input-file file-name
- (lambda (compressed-port)
- (call-with-lzip-input-port compressed-port
- port-sha256))))
- #:unwind? #t)))
-
(define (receive-file output-file-name tmp-output-file-name)
(call-with-worker-thread
chunked-request-channel
@@ -681,18 +722,7 @@ port. Also, the port used can be changed by passing the --port option.\n"
1000000))))
bytes-read)
last-progress-update-bytes-read))))))
- (close-port output-port))
-
- ;; Compute the hash of the file
- (let ((hash (compute-hash tmp-output-file-name)))
- (log-msg logger
- 'DEBUG
- "computed the hash of " tmp-output-file-name
- ", renaming")
-
- (rename-file tmp-output-file-name
- output-file-name)
- hash))))
+ (close-port output-port)))))
(if (authenticated? agent-id-for-build request)
(let* ((output-file-name
@@ -709,29 +739,55 @@ port. Also, the port used can be changed by passing the --port option.\n"
"deleting " output-file-name)
(delete-file output-file-name))
- (let ((hash
- (if (bytevector? body)
- (let ((output-port (open-file tmp-output-file-name "a")))
- (put-bytevector output-port body)
- (close-port output-port)
-
- (let ((hash (compute-hash tmp-output-file-name)))
- (log-msg logger
- 'DEBUG
- "computed the hash of " tmp-output-file-name
- ", renaming")
-
- (rename-file tmp-output-file-name
- output-file-name)
- hash))
- (receive-file output-file-name
- tmp-output-file-name))))
-
- (call-with-output-file (string-append output-file-name ".hash")
- (lambda (port)
- (simple-format port "~A\n" hash)))
-
- (no-content)))
+ (if (bytevector? body)
+ (let ((output-port (open-file tmp-output-file-name "a")))
+ (put-bytevector output-port body)
+ (close-port output-port)
+
+ (let ((hash (compute-hash-of-uploaded-output
+ logger
+ tmp-output-file-name)))
+ (log-msg logger
+ 'DEBUG
+ "computed the hash of " tmp-output-file-name
+ ", renaming")
+
+ (rename-file tmp-output-file-name
+ output-file-name)
+
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" hash))))
+
+ (no-content))
+ (begin
+ (receive-file output-file-name
+ tmp-output-file-name)
+
+ (list
+ (build-response
+ #:code 200
+ #:headers '((content-type . (text/plain))
+ (Transfer-Encoding . "chunked")))
+ (lambda (response-port)
+ ;; Compute the hash of the file
+ (let* ((reporter (progress-reporter/hash
+ (stat:size (stat tmp-output-file-name))
+ response-port))
+ (hash (compute-hash-of-uploaded-output
+ logger
+ tmp-output-file-name
+ #:reporter reporter)))
+ (log-msg logger
+ 'DEBUG
+ "computed the hash of " tmp-output-file-name ", renaming")
+
+ (call-with-output-file (string-append output-file-name ".hash")
+ (lambda (port)
+ (simple-format port "~A\n" hash))))
+
+ (rename-file tmp-output-file-name
+ output-file-name))))))
(render-json
'(("error" . "access denied"))
#:code 403))))