diff options
author | Christopher Baines <mail@cbaines.net> | 2021-01-15 21:53:54 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2021-01-15 22:20:13 +0000 |
commit | f7d3c4bb78cf40702268ceecbecc74d7c839629f (patch) | |
tree | f0052b99517f6de1e038dd4077f03f11ec9e0d72 | |
parent | b4ce30d7e0dc02d7a231dd0bb27c29843f2afb75 (diff) | |
download | build-coordinator-f7d3c4bb78cf40702268ceecbecc74d7c839629f.tar build-coordinator-f7d3c4bb78cf40702268ceecbecc74d7c839629f.tar.gz |
Use methods for the agent messaging
This will allow adding more agent messaging approaches.
-rw-r--r-- | guix-build-coordinator/agent-messaging/http.scm | 407 | ||||
-rw-r--r-- | guix-build-coordinator/agent.scm | 39 | ||||
-rw-r--r-- | scripts/guix-build-coordinator-agent.in | 9 |
3 files changed, 258 insertions, 197 deletions
diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm index 3c9283f..b1cacdb 100644 --- a/guix-build-coordinator/agent-messaging/http.scm +++ b/guix-build-coordinator/agent-messaging/http.scm @@ -29,6 +29,7 @@ #:use-module (ice-9 binary-ports) #:use-module (system repl error-handling) #:use-module (rnrs bytevectors) + #:use-module (oop goops) #:use-module (logging logger) #:use-module (json) #:use-module (web http) @@ -48,6 +49,8 @@ #:export (agent-error-from-coordinator? agent-error-from-coordinator-details + make-http-agent-interface + submit-status submit-log-file submit-build-result @@ -61,6 +64,19 @@ agent-error-from-coordinator? (details agent-error-from-coordinator-details)) +(define-class <http-agent-interface> () + (coordinator-uri #:init-keyword #:coordinator-uri) + (agent-uuid #:init-keyword #:agent-uuid) + (password #:init-keyword #:password)) + +(define (make-http-agent-interface coordinator-uri + agent-uuid + password) + (make <http-agent-interface> + #:coordinator-uri coordinator-uri + #:agent-uuid agent-uuid + #:password password)) + (define (coordinator-uri-for-path base-uri-string agent-path) (let* ((base-uri (string->uri base-uri-string)) (scheme (uri-scheme base-uri)) @@ -102,7 +118,7 @@ #f))) (define* (coordinator-http-request log - coordinator-uri agent-uuid password + interface path #:key method body (headers '()) succeed-on-access-denied-retry?) @@ -111,10 +127,12 @@ "Basic " (base64-encode (string->utf8 - (string-append agent-uuid ":" password))))) + (string-append (slot-ref interface 'agent-uuid) + ":" + (slot-ref interface 'password)))))) (define uri - (coordinator-uri-for-path coordinator-uri + (coordinator-uri-for-path (slot-ref interface 'coordinator-uri) path)) (define first-request-failed? #f) @@ -173,186 +191,219 @@ #:delay 10 #:ignore agent-error-from-coordinator?)) -(define* (submit-status coordinator-uri agent-uuid password - status - #:key (log default-log)) - (coordinator-http-request - log - coordinator-uri agent-uuid password - (string-append "/agent/" agent-uuid) - #:method 'PUT ; TODO Should be PATCH - #:body `((status . ,status)))) - -(define* (submit-output coordinator-uri agent-uuid password - build-id output-name file - #:key (log default-log)) - (define auth-value - (string-append - "Basic " - (base64-encode - (string->utf8 - (string-append agent-uuid ":" password))))) +(define-method (submit-status + (interface <http-agent-interface>) + . + args) + (apply + (lambda* (status #:key (log default-log)) + (coordinator-http-request + log + interface + (string-append "/agent/" (slot-ref interface 'agent-uuid)) + #:method 'PUT ; TODO Should be PATCH + #:body `((status . ,status)))) + args)) - (define uri - (coordinator-uri-for-path - coordinator-uri - (string-append "/build/" build-id "/output/" output-name))) - - (define path-info - (with-store store - (query-path-info store file))) - - ;; For small outputs, compress while sending the data, but for bigger store - ;; items, do all the compression up front to hopefully reduce the time to - ;; send them. - (if (< (path-info-nar-size path-info) - 5000000) ; 5MB - (retry-on-error - (lambda () - (with-request-mutex +(define-method (submit-output + (interface <http-agent-interface>) + . + args) + (apply + (lambda* (build-id output-name file #:key (log default-log)) + (define auth-value + (string-append + "Basic " + (base64-encode + (string->utf8 + (string-append (slot-ref interface 'agent-uuid) + ":" + (slot-ref interface 'password)))))) + + (define uri + (coordinator-uri-for-path + (slot-ref interface 'coordinator-uri) + (string-append "/build/" build-id "/output/" output-name))) + + (define path-info + (with-store store + (query-path-info store file))) + + ;; For small outputs, compress while sending the data, but for bigger + ;; store items, do all the compression up front to hopefully reduce the + ;; time to send them. + (if (< (path-info-nar-size path-info) + 5000000) ; 5MB + (retry-on-error (lambda () - (call-with-streaming-http-request - uri + (with-request-mutex + (lambda () + (call-with-streaming-http-request + uri + (lambda (port) + (call-with-lzip-output-port port + (lambda (port) + (write-file file port)) + #:level 9)) + #:headers `((Authorization . ,auth-value)))))) + #:times 6 + #:delay 15) + (let* ((directory (or (getenv "TMPDIR") "/tmp")) + (template (string-append directory + "/guix-build-coordinator-file.XXXXXX")) + (out (mkstemp! template))) + (log 'INFO "compressing " file " -> " template " prior to sending") + (call-with-lzip-output-port out (lambda (port) - (call-with-lzip-output-port port - (lambda (port) - (write-file file port)) - #:level 9)) - #:headers `((Authorization . ,auth-value)))))) - #:times 6 - #:delay 15) - (let* ((directory (or (getenv "TMPDIR") "/tmp")) - (template (string-append directory - "/guix-build-coordinator-file.XXXXXX")) - (out (mkstemp! template))) - (log 'INFO "compressing " file " -> " template " prior to sending") - (call-with-lzip-output-port out - (lambda (port) - (write-file file port)) - #:level 9) - (close-port out) - - (log 'INFO "finished compressing " file ", now sending") - (retry-on-error - (lambda () - (with-request-mutex + (write-file file port)) + #:level 9) + (close-port out) + + (log 'INFO "finished compressing " file ", now sending") + (retry-on-error (lambda () - (call-with-input-file template - (lambda (file-port) - (let-values (((response body) - (call-with-streaming-http-request - uri - (lambda (port) - (with-time-logging - (simple-format #f "sending ~A" file) - (dump-port file-port port - #:buffer-size (expt 2 20)))) - #:headers `((Authorization . ,auth-value))))) - (when (>= (response-code response) 400) - (raise-exception - (make-exception-with-message - (coordinator-handle-failed-request log - 'PUT - (uri-path uri) - response - body)))))))))) - #:times 12 - #:delay (random 15)) - - (delete-file template)))) - -(define* (submit-log-file coordinator-uri agent-uuid password - build-id file - #:key (log default-log)) - (define auth-value - (string-append - "Basic " - (base64-encode - (string->utf8 - (string-append agent-uuid ":" password))))) + (with-request-mutex + (lambda () + (call-with-input-file template + (lambda (file-port) + (let-values (((response body) + (call-with-streaming-http-request + uri + (lambda (port) + (with-time-logging + (simple-format #f "sending ~A" file) + (dump-port file-port port + #:buffer-size (expt 2 20)))) + #:headers `((Authorization . ,auth-value))))) + (when (>= (response-code response) 400) + (raise-exception + (make-exception-with-message + (coordinator-handle-failed-request log + 'PUT + (uri-path uri) + response + body)))))))))) + #:times 12 + #:delay (random 15)) - (define format - (cond - ((string-suffix? ".bz2" file) "bzip2") - ((string-suffix? ".gz" file) "gzip") - (else - (error "unsupported log format for" file)))) + (delete-file template)))) + args)) - (define uri - (coordinator-uri-for-path - coordinator-uri - (string-append "/build/" build-id "/log/" format))) +(define-method (submit-log-file + (interface <http-agent-interface>) + . + args) + (apply + (lambda* (build-id file #:key (log default-log)) + (define auth-value + (string-append + "Basic " + (base64-encode + (string->utf8 + (string-append (slot-ref interface 'agent-uuid) + ":" + (slot-ref interface 'password)))))) + + (define format + (cond + ((string-suffix? ".bz2" file) "bzip2") + ((string-suffix? ".gz" file) "gzip") + (else + (error "unsupported log format for" file)))) - (retry-on-error - (lambda () - (with-request-mutex + (define uri + (coordinator-uri-for-path + (slot-ref interface 'coordinator-uri) + (string-append "/build/" build-id "/log/" format))) + + (retry-on-error (lambda () - (let-values (((response body) - (call-with-streaming-http-request - uri - (lambda (request-port) - (call-with-input-file file - (lambda (file-port) - (dump-port file-port request-port)) - #:binary #t)) - #:headers `((Authorization . ,auth-value))))) - (if (>= (response-code response) 400) - (raise-exception - (make-exception-with-message - (coordinator-handle-failed-request log - 'PUT - (uri-path uri) - response - body))) - (begin - (log 'INFO - "successfully uploaded log file (" - (response-code response) - ")") - #t)))))) - #:times 12 - #:delay (random 15))) - -(define* (submit-build-result coordinator-uri agent-uuid password - build-id result - #:key (log default-log)) - (coordinator-http-request - log - coordinator-uri agent-uuid password - (string-append "/build/" build-id) - #:method 'PUT ; TODO Should be PATCH - #:body result)) - -(define* (report-build-start coordinator-uri agent-uuid password - build-id - #:key (log default-log)) - (coordinator-http-request - log - coordinator-uri agent-uuid password - (string-append "/build/" build-id "/report-build-start") - #:method 'POST)) - -(define* (report-setup-failure coordinator-uri agent-uuid password - build-id report - #:key (log default-log)) - (coordinator-http-request - log - coordinator-uri agent-uuid password - (string-append "/build/" build-id "/report-setup-failure") - #:method 'POST - #:body report - #:succeed-on-access-denied-retry? #t)) - -(define* (fetch-builds-for-agent coordinator-uri agent-uuid password - systems - target-count - #:key (log default-log)) - (vector->list - (assoc-ref (coordinator-http-request - log - coordinator-uri agent-uuid password - (string-append "/agent/" agent-uuid "/fetch-builds") - #:body `((target_count . ,target-count) - (systems . ,(list->vector systems))) - #:method 'POST) - "builds"))) + (with-request-mutex + (lambda () + (let-values (((response body) + (call-with-streaming-http-request + uri + (lambda (request-port) + (call-with-input-file file + (lambda (file-port) + (dump-port file-port request-port)) + #:binary #t)) + #:headers `((Authorization . ,auth-value))))) + (if (>= (response-code response) 400) + (raise-exception + (make-exception-with-message + (coordinator-handle-failed-request log + 'PUT + (uri-path uri) + response + body))) + (begin + (log 'INFO + "successfully uploaded log file (" + (response-code response) + ")") + #t)))))) + #:times 12 + #:delay (random 15))) + args)) + +(define-method (submit-build-result + (interface <http-agent-interface>) + . + args) + (apply + (lambda* (build-id result #:key (log default-log)) + (coordinator-http-request + log + interface + (string-append "/build/" build-id) + #:method 'PUT ; TODO Should be PATCH + #:body result)) + args)) + +(define-method (report-build-start + (interface <http-agent-interface>) + . + args) + (apply + (lambda* (build-id #:key (log default-log)) + (coordinator-http-request + log + interface + (string-append "/build/" build-id "/report-build-start") + #:method 'POST)) + args)) + +(define-method (report-setup-failure + (interface <http-agent-interface>) + . + args) + (apply + (lambda* (build-id report #:key (log default-log)) + (coordinator-http-request + log + interface + (string-append "/build/" build-id "/report-setup-failure") + #:method 'POST + #:body report + #:succeed-on-access-denied-retry? #t)) + args)) + +(define-method (fetch-builds-for-agent + (interface <http-agent-interface>) + . + args) + (apply + (lambda* (systems target-count #:key (log default-log)) + (vector->list + (assoc-ref (coordinator-http-request + log + interface + (string-append "/agent/" + (slot-ref interface 'agent-uuid) + "/fetch-builds") + #:body `((target_count . ,target-count) + (systems . ,(list->vector systems))) + #:method 'POST) + "builds"))) + args)) diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index c7e473b..c290e95 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -42,7 +42,8 @@ #:use-module (guix-build-coordinator agent-messaging http) #:export (run-agent)) -(define (run-agent uuid coordinator-uri password +(define (run-agent uuid + coordinator-interface systems max-parallel-builds derivation-substitute-urls @@ -120,21 +121,22 @@ build-id ": setup successful, building: " derivation-name) - (report-build-start coordinator-uri uuid password + (report-build-start coordinator-interface build-id #:log (build-log-procedure lgr build-id)) (let* ((result (perform-build lgr store build-id derivation-name)) ;; TODO Check this handles timezones right (end-time (localtime (time-second (current-time)) "UTC"))) - (agent-submit-log-file lgr uuid coordinator-uri password + (agent-submit-log-file lgr + coordinator-interface build-id derivation-name) ((if result post-build-success post-build-failure) lgr - uuid coordinator-uri password + coordinator-interface build-id derivation-name end-time))) @@ -143,7 +145,7 @@ build-id ": setup failure: " (assq-ref pre-build-status 'failure_reason)) - (report-setup-failure coordinator-uri uuid password + (report-setup-failure coordinator-interface build-id pre-build-status #:log (build-log-procedure lgr @@ -159,7 +161,8 @@ (open-log! lgr) (log-msg lgr 'INFO "starting agent " uuid) - (log-msg lgr 'INFO "connecting to coordinator " coordinator-uri) + (log-msg lgr 'INFO "connecting to coordinator " + (slot-ref coordinator-interface 'coordinator-uri)) (let-values (((process-job-with-queue count-jobs count-threads list-jobs) (create-work-queue current-max-builds process-job @@ -184,7 +187,8 @@ "\n")) (current-error-port))) - (let ((details (submit-status coordinator-uri uuid password 'idle + (let ((details (submit-status coordinator-interface + 'idle #:log (build-log-procedure lgr (assoc-ref build "uuid"))))) @@ -218,7 +222,7 @@ (assoc-ref (car job-args) "uuid")) (list-jobs))) (fetched-builds - (fetch-builds-for-agent coordinator-uri uuid password + (fetch-builds-for-agent coordinator-interface systems (min (max current-threads 1) (+ 1 job-count)) @@ -258,7 +262,7 @@ components) components)))) -(define (agent-submit-log-file lgr uuid coordinator-uri password +(define (agent-submit-log-file lgr coordinator-interface build-id derivation-name) (retry-on-error (lambda () @@ -273,7 +277,7 @@ build-id ": uploading log file " log-file) (submit-log-file - coordinator-uri uuid password + coordinator-interface build-id log-file #:log (build-log-procedure lgr build-id)))) @@ -516,7 +520,8 @@ but the guix-daemon claims it's unavailable" #t) #:unwind? #t))) -(define (post-build-failure lgr uuid coordinator-uri password +(define (post-build-failure lgr + coordinator-interface build-id derivation end-time) (log-msg lgr 'INFO build-id ": build failed") (with-exception-handler @@ -546,14 +551,14 @@ but the guix-daemon claims it's unavailable" (make-exception-with-irritants details)))))) (lambda () - (submit-build-result coordinator-uri uuid password + (submit-build-result coordinator-interface build-id `((result . failure) (end_time . ,(strftime "%F %T" end-time))) #:log (build-log-procedure lgr build-id))) #:unwind? #t)) -(define (post-build-success lgr uuid coordinator-uri password +(define (post-build-success lgr coordinator-interface build-id derivation end-time) (define output-details (map @@ -593,7 +598,8 @@ but the guix-daemon claims it's unavailable" #t) ((string=? details "missing_build_log_file") ;; Retry submitting the log file - (agent-submit-log-file lgr uuid coordinator-uri password + (agent-submit-log-file lgr + coordinator-interface build-id derivation) (attempt-submit-build-result)) @@ -635,7 +641,8 @@ but the guix-daemon claims it's unavailable" details))))))) (lambda () (submit-build-result - coordinator-uri uuid password build-id + coordinator-interface + build-id `((result . success) (end_time . ,(strftime "%F %T" end-time)) (outputs . ,(list->vector output-details))) @@ -646,7 +653,7 @@ but the guix-daemon claims it's unavailable" (log-msg lgr 'INFO build-id ": submitting output " (derivation-output-path output)) - (submit-output coordinator-uri uuid password + (submit-output coordinator-interface build-id output-name (derivation-output-path output) #:log (build-log-procedure lgr build-id))) diff --git a/scripts/guix-build-coordinator-agent.in b/scripts/guix-build-coordinator-agent.in index e8a2f0c..c68f94e 100644 --- a/scripts/guix-build-coordinator-agent.in +++ b/scripts/guix-build-coordinator-agent.in @@ -28,7 +28,8 @@ (ice-9 textual-ports) ((guix config) #:prefix guix-config:) (guix-build-coordinator utils) - (guix-build-coordinator agent)) + (guix-build-coordinator agent) + (guix-build-coordinator agent-messaging http)) (define %options ;; Specifications of the command-line options @@ -117,8 +118,10 @@ %option-defaults (cdr (program-arguments))))) (run-agent (assq-ref opts 'uuid) - (assq-ref opts 'coordinator) - (assq-ref opts 'password) + (make-http-agent-interface + (assq-ref opts 'coordinator) + (assq-ref opts 'uuid) + (assq-ref opts 'password)) (delete-duplicates (assq-ref opts 'systems)) (assq-ref opts 'max-parallel-builds) (or (assq-ref opts 'derivation-substitute-urls) |