;;; Guix Build Coordinator ;;; ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of the guix-build-coordinator. ;;; ;;; The Guix Build Coordinator is free software; you can redistribute ;;; it and/or modify it under the terms of the GNU General Public ;;; License as published by the Free Software Foundation; either ;;; version 3 of the License, or (at your option) any later version. ;;; ;;; The Guix Build Coordinator is distributed in the hope that it will ;;; be useful, but WITHOUT ANY WARRANTY; without even the implied ;;; warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ;;; See the GNU General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (guix-build-coordinator client-communication) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) #:use-module (ice-9 format) #:use-module (ice-9 match) #:use-module (ice-9 streams) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (json) #:use-module (logging logger) #:use-module (gcrypt random) #:use-module (web uri) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (system repl error-handling) #:use-module (guix store) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator coordinator) #:export (start-client-request-server send-submit-build-request send-cancel-build-request send-update-build-priority-request fold-builds request-build-details request-output-details request-agent-details request-agent-build-allocation-plan request-agents-list request-failed-builds-with-blocking-count-list request-setup-failures send-create-agent-request send-agent-set-active-request send-create-agent-password-request send-create-dynamic-auth-token-request send-replace-agent-tags-request)) (define (start-client-request-server secret-key-base host port build-coordinator) (run-server/patched (lambda (request body) (log-msg (build-coordinator-logger build-coordinator) 'INFO (format #f "~4a ~a\n" (request-method request) (uri-path (request-uri request)))) (apply values (controller request (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) body secret-key-base build-coordinator))) #:host host #:port port)) (define (controller request method-and-path-components raw-body secret-key-base build-coordinator) (define datastore (build-coordinator-datastore build-coordinator)) (define body (if raw-body (json-string->scm (utf8->string raw-body)) '())) (define (controller-thunk) (match method-and-path-components (('GET "build" uuid) (match (datastore-find-build datastore uuid) (#f (render-json '((error . "no build found")) #:code 404)) (build-details (let ((derivation-inputs (map (lambda (derivation-input-details) (let ((builds (datastore-list-builds-for-output datastore (assq-ref derivation-input-details 'output)))) `(,@derivation-input-details (builds . ,(list->vector builds))))) (datastore-find-derivation-inputs datastore (assq-ref build-details 'derivation-name)))) (setup-failures (map (lambda (setup-failure) `(,@setup-failure ,@(if (string=? (assq-ref setup-failure 'failure-reason) "missing_inputs") `((missing-inputs . ,(list->vector (map (lambda (missing-input) (let ((builds-for-missing-input (datastore-list-builds-for-output datastore missing-input))) `((missing-input . ,missing-input) (builds . ,(list->vector builds-for-missing-input))))) (datastore-list-setup-failure-missing-inputs datastore (assq-ref setup-failure 'id)))))) '()))) (datastore-list-setup-failures-for-build datastore (assq-ref build-details 'uuid)))) (tags (datastore-fetch-build-tags datastore uuid)) (result (datastore-find-build-result datastore uuid))) (render-json `(,@(alist-delete 'created-at (alist-delete 'end-time build-details)) ,@(if (assq-ref build-details 'processed) '() (datastore-find-unprocessed-build-entry datastore uuid)) (created-at . ,(or (and=> (assq-ref build-details 'created-at) (lambda (time) (strftime "%F %T" time))) 'null)) (end-time . ,(or (and=> (assq-ref build-details 'end-time) (lambda (time) (strftime "%F %T" time))) 'null)) (tags . ,(vector-map (lambda (_ tag) (match tag ((key . value) `((key . ,key) (value . ,value))))) tags)) ,@(or (and=> result (lambda (result) `((result . ,result)))) '()) (derivation-inputs . ,(list->vector derivation-inputs)) (setup-failures . ,(list->vector setup-failures)))))))) (('POST "build" uuid "cancel") (let ((query-parameters (request-query-parameters request))) (render-json `((result . ,(cancel-build build-coordinator uuid #:ignore-if-build-required-by-another? (string=? (or (assq-ref query-parameters 'ignore-if-build-required-by-another) "") "true") #:skip-updating-derived-priorities? (string=? (or (assq-ref query-parameters 'skip-updating-derived-priorities) "") "true"))))))) (('POST "build" uuid "update-priority") (let ((query-parameters (request-query-parameters request))) (update-build-priority build-coordinator uuid (assoc-ref body "new_priority") #:skip-updating-derived-priorities? (string=? (or (assq-ref query-parameters 'skip-updating-derived-priorities) "") "true") #:override-derived-priority (assoc-ref body "override_derived_priority")) (render-json `((result . "build-priority-updated"))))) (('GET "builds" "blocking") (let ((query-parameters (request-query-parameters request))) (render-json `((builds . ,(list->vector (datastore-list-failed-builds-with-blocking-count datastore (assq-ref query-parameters 'system) #:include-cancelled? (assq-ref query-parameters 'include_cancelled?)))))))) (('GET "output" output-components ...) (let* ((output (string-append "/" (string-join output-components "/"))) (builds (datastore-list-builds-for-output datastore output))) (render-json `((builds . ,(list->vector builds)))))) (('GET "agents") (render-json `((agents . ,(list->vector (map (lambda (agent) `(,@agent (allocated_builds . ,(list->vector (datastore-list-agent-builds datastore (assq-ref agent 'uuid)))) (tags . ,(vector-map (match-lambda* ((index (key . value)) `((key . ,key) (value . ,value)))) (datastore-fetch-agent-tags datastore (assq-ref agent 'uuid)))) (requested_systems . ,(list->vector (datastore-agent-requested-systems datastore (assq-ref agent 'uuid)))))) (datastore-list-agents datastore))))))) (('POST "agents") (let ((uuid (new-agent datastore #:requested-uuid (assoc-ref body "requested-uuid") #:description (assoc-ref body "description")))) (render-json `((agent-id . ,uuid))))) (('POST "dynamic-auth-tokens") (let ((token (random-token))) (datastore-insert-dynamic-auth-token datastore token) (render-json `((token . ,token))))) (('GET "agent" agent-id) (let ((agent-details (datastore-find-agent datastore agent-id))) (render-json `((id . ,agent-id) ,@agent-details (tags . ,(vector-map (lambda (_ tag) (match tag ((key . value) `((key . ,key) (value . ,value))))) (datastore-fetch-agent-tags datastore agent-id))) (allocated_builds . ,(list->vector (datastore-list-agent-builds datastore agent-id))))))) (('GET "agent" agent-id (or "build-allocation-plan" ;; TODO Remove _ variant "build_allocation_plan")) (let ((agent-details (datastore-find-agent datastore agent-id))) (render-json `((build_allocation_plan . ,(list->vector (datastore-list-allocation-plan-builds datastore agent-id))))))) (('POST "agent" agent-id "passwords") (let ((password (new-agent-password datastore #:agent agent-id))) (render-json `((new-password . ,password))))) (('POST "agent" agent-id "tags") (let ((agent-details (datastore-find-agent datastore agent-id))) (if agent-details (begin (datastore-replace-agent-tags datastore agent-id (vector-map (lambda (_ tag) `((key . ,(assoc-ref tag "key")) (value . ,(assoc-ref tag "value")))) (assoc-ref body "tags"))) (render-json `((result . success)))) (render-json `((error . 404)) #:code 404)))) (('POST "agent" agent-id "active") (let ((agent-details (datastore-find-agent datastore agent-id))) (if agent-details (let ((active? (match (assoc "active" body) (("active" . active?) active?)))) (if (boolean? active?) (begin (set-agent-active build-coordinator agent-id active?) (render-json `((result . success)))) (render-json `((error . "active must be a boolean")) #:code 400))) (render-json `((error . 404)) #:code 404)))) (('GET "setup-failures") (let ((query-parameters (request-query-parameters request))) (render-json `((setup_failures . ,(hash-map->list (lambda (k v) (cons k (list->vector v))) (datastore-fetch-setup-failures datastore #:agent-id (assq-ref query-parameters 'agent-id)))))))) (('GET "builds") (let* ((query-parameters (request-query-parameters request)) (fold-builds-args (list #:tags (filter-map (match-lambda ((key . value) (if (eq? key 'tag) (match (string-split value #\:) ((tag-key tag-value) (cons tag-key tag-value)) ((tag-key) tag-key)) #f))) query-parameters) #:not-tags (filter-map (match-lambda ((key . value) (if (eq? key 'not_tag) (match (string-split value #\:) ((tag-key tag-value) (cons tag-key tag-value)) ((tag_key) tag_key)) #f))) query-parameters) #:systems (filter-map (match-lambda ((key . value) (if (eq? key 'system) value #f))) query-parameters) #:not-systems (filter-map (match-lambda ((key . value) (if (eq? key 'not-system) value #f))) query-parameters) #:processed (match (assq 'processed query-parameters) ((_ . val) (string=? val "true")) (#f 'unset)) #:canceled (match (assq 'canceled query-parameters) ((_ . val) (string=? val "true")) (#f 'unset)) #:priority-> (or (and=> (assq-ref query-parameters 'priority_gt) string->number) 'unset) #:priority-< (or (and=> (assq-ref query-parameters 'priority_lt) string->number) 'unset) #:relationship (or (and=> (assq-ref query-parameters 'relationship) string->symbol) 'unset) #:after-id (assq-ref query-parameters 'after_id) #:limit (and=> (assq-ref query-parameters 'limit) string->number)))) (list (build-response #:code 200 #:headers '((content-type . (application/json-seq)))) (lambda (port) (apply datastore-fold-builds datastore (lambda (build-details _) (scm->json-seq `((,@(alist-delete 'created-at (alist-delete 'end-time build-details)) (created-at . ,(or (and=> (assq-ref build-details 'created-at) (lambda (time) (strftime "%F %T" time))) 'null)) (end-time . ,(or (and=> (assq-ref build-details 'end-time) (lambda (time) (strftime "%F %T" time))) 'null)) (tags . ,(vector-map (lambda (_ tag) (match tag ((key . value) `((key . ,key) (value . ,value))))) (datastore-fetch-build-tags datastore (assq-ref build-details 'uuid)))))) port) #t) #t fold-builds-args))))) (('POST "builds") (let ((derivation-file (assoc-ref body "derivation")) (substitute-urls (and=> (assoc-ref body "substitute-urls") vector->list))) (unless (string? derivation-file) (raise-exception (make-exception-with-message (simple-format #f "derivation must be a string: ~A\n" derivation)))) (define (read-drv/substitute derivation-file) (with-store/non-blocking store (unless (valid-path? store derivation-file) (substitute-derivation store derivation-file #:substitute-urls substitute-urls))) (read-derivation-from-file* derivation-file)) (let ((submit-build-result (call-with-delay-logging submit-build #:args `(,build-coordinator ,derivation-file #:read-drv ,(lambda (derivation-file) (with-exception-handler (lambda (exn) (log-msg (build-coordinator-logger build-coordinator) 'WARN "exception substituting derivation " derivation-file ": " exn) (if (null? (or substitute-urls '())) ;; Try again (read-drv/substitute derivation-file) (read-derivation-through-substitutes derivation-file substitute-urls))) (lambda () (read-drv/substitute derivation-file)) #:unwind? #t)) ,@(let ((priority (assoc-ref body "priority"))) (if priority `(#:priority ,priority) '())) ,@(if (assoc-ref body "ignore-if-build-for-derivation-exists") '(#:ignore-if-build-for-derivation-exists? #t) '()) ,@(if (assoc-ref body "ignore-if-build-for-outputs-exists") '(#:ignore-if-build-for-outputs-exists? #t) '()) ,@(if (assoc-ref body "ensure-all-related-derivation-outputs-have-builds") '(#:ensure-all-related-derivation-outputs-have-builds? #t) '()) ,@(if (assoc-ref body "tags") `(#:tags ,(map (lambda (tag) (cons (assoc-ref tag "key") (assoc-ref tag "value"))) (vector->list (assoc-ref body "tags")))) '()) ,@(or (and=> (assoc-ref body "defer-until") (lambda (date) `(#:defer-until ,(string->date date "~Y-~m-~d ~H:~M:~S")))) '())) #:threshold 10))) (render-json submit-build-result)))) (('GET "state") ;; Use a write transaction here to get a snapshot of the state plus the ;; state-id (render-json (datastore-call-with-transaction datastore (lambda (_) (let ((allocation-plan-counts (datastore-count-build-allocation-plan-entries datastore))) `((state_id . ,(build-coordinator-get-state-id build-coordinator)) (agents . ,(list->vector (map (lambda (agent-details) (let ((agent-id (assq-ref agent-details 'uuid))) `(,@agent-details (last_status_update . ,(datastore-find-agent-status datastore agent-id)) (requested_systems . ,(list->vector (datastore-agent-requested-systems datastore agent-id))) (allocation_plan . ((count . ,(or (assoc-ref allocation-plan-counts agent-id) 0)))) (tags . ,(list->vector (map (match-lambda ((key . value) `((key . ,key) (value . ,value)))) (vector->list (datastore-fetch-agent-tags datastore agent-id))))) (builds . ,(list->vector (map (lambda (build) `(,@build (tags . ,(list->vector (map (match-lambda ((key . value) `((key . ,key) (value . ,value)))) (vector->list (datastore-fetch-build-tags datastore (assq-ref build 'uuid)))))))) (datastore-list-agent-builds datastore (assq-ref agent-details 'uuid)))))))) (datastore-list-agents datastore)))))))))) (('GET "events") (let ((headers (request-headers request))) (list (build-response #:code 200 #:headers '((content-type . (text/event-stream)))) (lambda (port) (build-coordinator-listen-for-events build-coordinator (lambda (state-id event-name data) (display (simple-format #f "id: ~A\nevent: ~A\ndata: ~A\n\n" state-id event-name (scm->json-string data)) port) (force-output port) ;; TODO because the chunked output port doesn't call ;; force-output on the underlying port, do that here. We ;; want this event to be sent now, rather than when some ;; buffer fills up. (force-output (request-port request))) #:after-state-id (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: when processing Last-Event-ID header value: ~A\n" (assoc-ref headers 'last-event-id))) (lambda () (and=> (assoc-ref headers 'last-event-id) string->number)) #:unwind? #t)))))) (_ (render-json "not-found" #:code 404)))) (with-exception-handler (lambda (exn) (cond ((client-error? exn) (render-json `((error . ,(client-error-details exn))) #:code 400)) ((worker-thread-timeout-error? exn) (render-json `((error . ,(simple-format #f "~A" exn))) #:code 503)) (else (render-json `((error . 500)) #:code 500)))) (lambda () (with-throw-handler #t controller-thunk (lambda (key . args) (unless (and (eq? '%exception key) (or (worker-thread-timeout-error? (car args)) (client-error? (car args)))) (match method-and-path-components ((method path-components ...) (simple-format (current-error-port) "error: when processing client request: /~A ~A\n ~A ~A\n" method (string-join path-components "/") key args))) (let* ((stack (make-stack #t 4)) (backtrace (call-with-output-string (lambda (port) (display "\nBacktrace:\n" port) (display-backtrace stack port) (newline port) (newline port))))) (display backtrace (current-error-port))))))) #:unwind? #t)) (define* (render-json json #:key (extra-headers '()) (code 200)) (list (build-response #:code code #:headers (append extra-headers '((content-type . (application/json)) (vary . (accept))))) (scm->json-string json))) (define* (send-request coordinator-uri method path #:optional request-body) (let* ((uri (string->uri (string-append coordinator-uri path))) (port socket (open-socket-for-uri* uri))) (let ((response body (http-request uri #:port port #:method method #:body (and=> request-body scm->json-string) #:decode-body? #f #:streaming? #t))) (if (>= (response-code response) 400) (begin (simple-format (current-error-port) "error: coordinator-http-request: ~A ~A: ~A\n" method path (response-code response)) (let ((parsed-body (catch #t (lambda () (if (equal? '(application/json (charset . "utf-8")) (response-content-type response)) (json->scm body) (utf8->string (read-response-body response)))) (lambda (key . args) (simple-format (current-error-port) "error decoding body ~A ~A\n" key args) #f)))) (close-port body) (raise-exception (make-exception-with-message parsed-body)))) (begin (set-port-encoding! body "UTF-8") (values (if (equal? '(application/json-seq) (response-content-type response)) ;; TODO There's no mechanism to close the port after the ;; stream/sequence has finished (json-seq->scm body ;; TODO I would like to use 'throw, but it always raises an ;; exception, so this needs fixing upstream first #:handle-truncate 'replace) (let ((parsed-body (json->scm body))) (close-port body) parsed-body)) response)))))) (define* (send-submit-build-request coordinator-uri derivation-file-name substitute-urls requested-uuid priority ignore-if-build-for-derivation-exists? ignore-if-build-for-outputs-exists? ensure-all-related-derivation-outputs-have-builds? tags #:key defer-until) (send-request coordinator-uri 'POST "/builds" `((derivation . ,derivation-file-name) (priority . ,priority) ,@(if substitute-urls `((substitute-urls . ,(list->vector substitute-urls))) '()) ,@(if ignore-if-build-for-derivation-exists? '((ignore-if-build-for-derivation-exists . #t)) '()) ,@(if ignore-if-build-for-outputs-exists? '((ignore-if-build-for-outputs-exists . #t)) '()) ,@(if ensure-all-related-derivation-outputs-have-builds? '((ensure-all-related-derivation-outputs-have-builds . #t)) '()) ,@(if (null? tags) '() `((tags . ,(list->vector tags)))) ,@(if defer-until `((defer-until . ,(date->string defer-until "~1 ~3"))) '())))) (define* (send-cancel-build-request coordinator-uri build-id #:key (ignore-if-build-required-by-another? #t) skip-updating-derived-priorities?) (send-request coordinator-uri 'POST (string-append "/build/" build-id "/cancel" "?ignore-if-build-required-by-another=" (if ignore-if-build-required-by-another? "true" "false") (if skip-updating-derived-priorities? "&skip-updating-derived-priorities=true" "")))) (define* (send-update-build-priority-request coordinator-uri build-id new-priority #:key skip-updating-derived-priorities? override-derived-priority) (send-request coordinator-uri 'POST (string-append "/build/" build-id "/update-priority" (if skip-updating-derived-priorities? "?skip-updating-derived-priorities=true" "")) `((new_priority . ,new-priority) ,@(if override-derived-priority `((override_derived_priority . ,override-derived-priority)) '())))) (define (request-build-details coordinator-uri uuid) (send-request coordinator-uri 'GET (string-append "/build/" uuid))) (define* (fold-builds coordinator-uri proc init #:key (tags '()) (not-tags '()) (systems '()) (not-systems '()) (processed 'unset) (canceled 'unset) (priority-> 'unset) (priority-< 'unset) (relationship 'unset) (after-id #f) (limit #f)) (let ((query-parameters `(,@(if (null? tags) '() (map (match-lambda ((('key . key) ('value . value)) (simple-format #f "tag=~A:~A" key value)) (key (simple-format #f "tag=~A" key))) tags)) ,@(if (null? not-tags) '() (map (match-lambda ((('key . key) ('value . value)) (simple-format #f "not_tag=~A:~A" key value)) (key (simple-format #f "not_tag=~A" key))) not-tags)) ,@(if (null? systems) '() (map (lambda (system) (simple-format #f "system=~A" system)) systems)) ,@(if (null? not-systems) '() (map (lambda (system) (simple-format #f "not-system=~A" system)) not-systems)) ,@(if (boolean? processed) (if processed '("processed=true") '("processed=false")) '()) ,@(if (boolean? canceled) (if canceled '("canceled=true") '("canceled=false")) '()) ,@(if (number? priority->) (list (simple-format #f "priority_gt=~A" priority->)) '()) ,@(if (number? priority-<) (list (simple-format #f "priority_lt=~A" priority-<)) '()) ,@(if (and relationship (not (eq? 'unset relationship))) (list (simple-format #f "relationship=~A" relationship)) '()) ,@(if after-id (list (string-append "after_id=" after-id)) '()) ,@(if limit (list (simple-format #f "limit=~A" limit)) '())))) (let ((builds-stream response (send-request coordinator-uri 'GET (string-append "/builds" (if (null? query-parameters) "" (string-append "?" (string-join query-parameters "&"))))))) (with-exception-handler (lambda (exn) (close-port (response-port response)) (raise-exception exn)) (lambda () (call-with-values (lambda () (stream-fold proc init builds-stream)) (lambda vals (close-port (response-port response)) (apply values vals)))) #:unwind? #t)))) (define (request-output-details coordinator-uri output) (send-request coordinator-uri 'GET (string-append "/output" output))) (define (request-agent-details coordinator-uri agent-id) (send-request coordinator-uri 'GET (string-append "/agent/" agent-id))) (define (request-agent-build-allocation-plan coordinator-uri agent-id) (send-request coordinator-uri 'GET (string-append "/agent/" agent-id "/build-allocation-plan"))) (define (request-agents-list coordinator-uri) (send-request coordinator-uri 'GET (string-append "/agents"))) (define* (request-failed-builds-with-blocking-count-list coordinator-uri system #:key include-cancelled?) (send-request coordinator-uri 'GET (string-append "/builds/blocking" (if system (simple-format #f "?system=~A" system) "") (if include-cancelled? (string-append (if system "&" "?") "include_cancelled=" (if include-cancelled? "true" "false")) "")))) (define* (request-setup-failures coordinator-uri #:key agent-id) (send-request coordinator-uri 'GET (string-append "/setup-failures" (if agent-id (simple-format #f "?agent-id=~A" agent-id) "")))) (define* (send-create-agent-request coordinator-uri #:key requested-uuid description) (send-request coordinator-uri 'POST "/agents" `(,@(if requested-uuid `((requested-uuid . ,requested-uuid)) '()) ,@(if description `((description . ,description)) '())))) (define* (send-agent-set-active-request coordinator-uri agent-uuid active?) (send-request coordinator-uri 'POST (simple-format #f "/agent/~A/active" agent-uuid) `((active . ,active?)))) (define (send-create-agent-password-request coordinator-uri agent-id) (send-request coordinator-uri 'POST (string-append "/agent/" agent-id "/passwords"))) (define (send-create-dynamic-auth-token-request coordinator-uri) (send-request coordinator-uri 'POST "/dynamic-auth-tokens")) (define (send-replace-agent-tags-request coordinator-uri agent-id tags) (send-request coordinator-uri 'POST (string-append "/agent/" agent-id "/tags") `((tags . ,tags))))