;;; Guix Build Coordinator ;;; ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of the guix-build-coordinator. ;;; ;;; The Guix Build Coordinator is free software; you can redistribute ;;; it and/or modify it under the terms of the GNU General Public ;;; License as published by the Free Software Foundation; either ;;; version 3 of the License, or (at your option) any later version. ;;; ;;; The Guix Build Coordinator is distributed in the hope that it will ;;; be useful, but WITHOUT ANY WARRANTY; without even the implied ;;; warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ;;; See the GNU General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (guix-build-coordinator agent-messaging http) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (ice-9 textual-ports) #:use-module (ice-9 binary-ports) #:use-module (system repl error-handling) #:use-module (fibers web server) #:use-module (rnrs bytevectors) #:use-module (json) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (web uri) #:use-module (fibers channels) #:use-module (guix store) #:use-module (guix lzlib) #:use-module (guix base64) #:use-module (guix serialization) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator metrics) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator metrics) #:use-module (guix-build-coordinator coordinator) #:export (http-agent-messaging-start-server submit-status submit-log-file submit-build-result report-build-start report-setup-failure submit-output fetch-builds-for-agent)) (define (fixed/read-request-body r) "Reads the request body from R, as a bytevector. Return ‘#f’ if there was no request body." (cond ((member '(chunked) (request-transfer-encoding r)) (make-chunked-input-port* (request-port r) ;; closing the port is handled elsewhere #:keep-alive? #t)) (else (let ((nbytes (request-content-length r))) (and nbytes (let ((bv (get-bytevector-n (request-port r) nbytes))) (if (= (bytevector-length bv) nbytes) bv (bad-request "EOF while reading request body: ~a bytes of ~a" (bytevector-length bv) nbytes)))))))) (module-set! (resolve-module '(web request)) 'read-request-body fixed/read-request-body) (define (http-agent-messaging-start-server port host secret-key-base build-coordinator chunked-request-channel) (define update-base-datastore-metrics! (base-datastore-metrics-updater build-coordinator)) (call-with-error-handling (lambda () (run-server (lambda (request body) (display (format #f "~4a ~a\n" (request-method request) (uri-path (request-uri request)))) (apply values (controller request (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) body secret-key-base build-coordinator chunked-request-channel update-base-datastore-metrics!))) #:host host #:port port)) #:on-error 'backtrace #:post-error (lambda (key . args) (when (eq? key 'system-error) (match args (("bind" "~A" ("Address already in use") _) (simple-format (current-error-port) "\n error: guix-build-coordinator could not start, as it could not bind to port ~A Check if it's already running, or whether another process is using that port. Also, the port used can be changed by passing the --port option.\n" port))))))) (define* (render-json json #:key (extra-headers '()) (code 200)) (list (build-response #:code code #:headers (append extra-headers '((content-type . (application/json)) (vary . (accept))))) (lambda (port) (scm->json json port)))) (define* (render-text text #:key (extra-headers '()) (code 200)) (list (build-response #:code code #:headers (append extra-headers '((content-type . (text/plain)) (vary . (accept))))) (lambda (port) (display text port)))) (define (no-content) (list (build-response #:code 204) "")) (define (base-datastore-metrics-updater build-coordinator) (define namespace "guixbuildcoordinator") (define datastore (build-coordinator-datastore build-coordinator)) (define registry (build-coordinator-metrics-registry build-coordinator)) (let ((builds-total (make-gauge-metric registry (string-append namespace "_builds_total") #:labels '(system))) (allocated-builds-total (make-gauge-metric registry (string-append namespace "_allocated_builds_total") #:labels '(agent_id))) (build-results-total (make-gauge-metric registry (string-append namespace "_build_results_total") #:labels '(agent_id result))) (setup-failures-total (make-gauge-metric registry (string-append namespace "_setup_failures_total") #:labels '(agent_id reason))) (build-allocation-plan-total (make-gauge-metric registry (string-append namespace "_build_allocation_plan_total") #:labels '(agent_id))) (unprocessed-hook-events-total (make-gauge-metric registry (string-append namespace "_unprocessed_hook_events_total") #:labels '(event)))) (define (zero-metric-for-agents metric) (for-each (lambda (agent-details) (metric-set metric 0 #:label-values `((agent_id . ,(assq-ref agent-details 'uuid))))) (datastore-list-agents datastore))) (lambda () (for-each (match-lambda ((system . count) (metric-set builds-total count #:label-values `((system . ,system))))) (datastore-count-builds datastore)) (zero-metric-for-agents allocated-builds-total) (for-each (match-lambda ((agent-id . count) (metric-set allocated-builds-total count #:label-values `((agent_id . ,agent-id))))) (datastore-count-allocated-builds datastore)) (for-each (match-lambda (((agent-id result) . count) (metric-set build-results-total count #:label-values `((agent_id . ,agent-id) (result . ,result))))) (datastore-count-build-results datastore)) (for-each (match-lambda (((agent-id reason) . count) (metric-set setup-failures-total count #:label-values `((agent_id . ,agent-id) (reason . ,reason))))) (datastore-count-setup-failures datastore)) (zero-metric-for-agents build-allocation-plan-total) (for-each (match-lambda ((agent-id . count) (metric-set build-allocation-plan-total count #:label-values `((agent_id . ,agent-id))))) (datastore-count-build-allocation-plan-entries datastore)) (for-each (match-lambda ((event . _) (metric-set unprocessed-hook-events-total 0 #:label-values `((event . ,event))))) (build-coordinator-hooks build-coordinator)) (for-each (lambda (event-count) (metric-set unprocessed-hook-events-total (assq-ref event-count 'count) #:label-values `((event . ,(assq-ref event-count 'event))))) (datastore-count-unprocessed-hook-events datastore))))) (define (controller request method-and-path-components body secret-key-base build-coordinator chunked-request-channel update-base-datastore-metrics!) (define (authenticated? uuid request) (let* ((authorization-base64 (match (assq-ref (request-headers request) 'authorization) (('basic . s) s))) (authorization (utf8->string (base64-decode authorization-base64)))) (match (string-split authorization #\:) ((auth-uuid auth-password) (and (string? uuid) (string=? auth-uuid uuid) (datastore-agent-password-exists? datastore uuid auth-password))) (_ #f)))) (define datastore (build-coordinator-datastore build-coordinator)) (define (controller-thunk) (match method-and-path-components (('GET "agent" uuid) (let ((agent (datastore-find-agent datastore uuid))) (if agent (render-json `((agent . ,uuid) ,@agent)) (render-json (simple-format #f "no agent found with id: ~A" uuid) #:code 404)))) (('PUT "agent" uuid) (if (authenticated? uuid request) (begin ;; TODO Update status (render-json (agent-details datastore uuid))) (render-json '(("error" . "access denied")) #:code 403))) (('POST "agent" uuid "fetch-builds") (if (authenticated? uuid request) (let* ((json-body (json-string->scm (utf8->string body))) (count (assoc-ref json-body "count")) (systems (assoc-ref json-body "systems")) (builds (fetch-builds build-coordinator uuid (vector->list systems) count))) (render-json `((builds . ,(list->vector builds))))) (render-json '(("error" . "access denied")) #:code 403))) (('PUT "build" uuid) (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (begin (handle-build-result build-coordinator agent-id-for-build uuid (json-string->scm (utf8->string body))) ;; Trigger build allocation, as the result of this build ;; could change the allocation (trigger-build-allocation build-coordinator) (render-json "message received")) (render-json '(("error" . "access denied")) #:code 403)))) (('POST "build" uuid "report-build-start") (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (begin (handle-build-start-report datastore agent-id-for-build uuid) (render-json "message received")) (render-json '(("error" . "access denied")) #:code 403)))) (('POST "build" uuid "report-setup-failure") (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (begin (handle-setup-failure-report datastore agent-id-for-build uuid (json-string->scm (utf8->string body))) ;; Trigger build allocation, so that the allocator can handle ;; this setup failure (trigger-build-allocation build-coordinator) (render-json "message received")) (render-json '(("error" . "access denied")) #:code 403)))) (('PUT "build" uuid "log" format) (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (let ((output-file-name (build-log-file-location datastore uuid format))) (mkdir-p (dirname output-file-name)) (if (call-with-worker-thread chunked-request-channel (lambda () (call-with-output-file output-file-name (lambda (output-port) (let loop ((bv (get-bytevector-some body))) (unless (eof-object? bv) (put-bytevector output-port bv) (loop (get-bytevector-some body)))))) #t)) (no-content) (render-json "error" #:code 500))) (render-json '(("error" . "access denied")) #:code 403)))) (('PUT "build" uuid "output" output-name) (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (let* ((output-file-name (build-output-file-location datastore uuid output-name)) (tmp-output-file-name (string-append output-file-name ".tmp"))) (mkdir-p (dirname output-file-name)) (when (file-exists? tmp-output-file-name) (delete-file tmp-output-file-name)) (if (call-with-worker-thread chunked-request-channel (lambda () (call-with-output-file tmp-output-file-name (lambda (output-port) (let ((start-time (current-time time-utc))) (let loop ((bv (get-bytevector-some body)) (bytes-read 0) (last-progress-update-bytes-read 0)) (if (eof-object? bv) (let* ((end-time (current-time time-utc)) (elapsed (time-difference end-time start-time)) (seconds-elapsed (+ (time-second elapsed) (/ (time-nanosecond elapsed) 1e9)))) (display (simple-format #f "receiving ~A\n took ~A seconds\n data transfered: ~AMB\n speed (MB/s): ~A\n" (basename output-file-name) seconds-elapsed (rationalize (exact->inexact (/ bytes-read 1000000)) 0.1) (rationalize (/ (/ bytes-read 1000000) seconds-elapsed) 0.1)))) (begin (put-bytevector output-port bv) (loop (get-bytevector-some body) (+ bytes-read (bytevector-length bv)) (if (> (- bytes-read last-progress-update-bytes-read) 50000000) ; ~50MB (begin (display (simple-format #f "receiving ~A\n ~AMB read so far...\n" (basename output-file-name) (rationalize (exact->inexact (/ bytes-read 1000000)) 0.1))) bytes-read) last-progress-update-bytes-read)))))))) (rename-file tmp-output-file-name output-file-name) #t)) (no-content) (render-json "error" #:code 500))) (render-json '(("error" . "access denied")) #:code 403)))) (('GET "metrics") (update-base-datastore-metrics!) (list (build-response #:code 200 #:headers '((content-type . (text/plain)) (vary . (accept)))) (lambda (port) (write-metrics (build-coordinator-metrics-registry build-coordinator) port)))) (_ (render-json "not-found" #:code 404)))) (call-with-error-handling controller-thunk #:on-error 'backtrace #:post-error (lambda args (match method-and-path-components ((method path-components ...) (simple-format (current-error-port) "error: when processing: /~A ~A\n" method (string-join path-components "/")))) (render-json `((error . ,(simple-format #f "~A" args))) #:code 500)))) (define (coordinator-uri-for-path base-uri-string agent-path) (let* ((base-uri (string->uri base-uri-string)) (scheme (uri-scheme base-uri)) (host (uri-host base-uri)) (port (uri-port base-uri)) (path (uri-path base-uri))) (build-uri scheme #:host host #:port port #:path (string-append path (if (string-suffix? path "/") agent-path (string-drop agent-path 1)))))) (define (with-request-mutex thunk) (monitor (thunk))) (define* (coordinator-handle-failed-request method path response body #:key first-request-failed?) (simple-format (current-error-port) "error: coordinator-http-request: ~A ~A: ~A\n" method path (response-code response)) (catch #t (lambda () (if (equal? '(application/json (charset . "utf-8")) (response-content-type response)) (json-string->scm (utf8->string body)) (utf8->string body))) (lambda (key . args) (simple-format (current-error-port) "error decoding body ~A ~A\n" key args) #f))) (define* (coordinator-http-request coordinator-uri agent-uuid password path #:key method body (headers '()) succeed-on-access-denied-retry?) (define auth-value (string-append "Basic " (base64-encode (string->utf8 (string-append agent-uuid ":" password))))) (define uri (coordinator-uri-for-path coordinator-uri path)) (define first-request-failed? #f) (define (make-request) (let-values (((response body) (with-request-mutex (lambda () (http-request uri #:method method #:body (scm->json-string body) #:decode-body? #f #:headers `((Authorization . ,auth-value) ,@headers)))))) (if (>= (response-code response) 400) (let ((body (coordinator-handle-failed-request method path response body))) (if (and first-request-failed? succeed-on-access-denied-retry? (equal? body '(("error" . "access denied")))) (begin (simple-format (current-error-port) "warning: treating access denied response as success\n") (values body response)) (begin (set! first-request-failed? #t) (raise-exception (make-exception-with-message body))))) (values (json-string->scm (utf8->string body)) response)))) (retry-on-error make-request #:times 9 #:delay 10)) (define (submit-status coordinator-uri agent-uuid password status) (coordinator-http-request coordinator-uri agent-uuid password (string-append "/agent/" agent-uuid) #:method 'PUT ; TODO Should be PATCH #:body `((status . ,status)))) (define (submit-output coordinator-uri agent-uuid password build-id output-name file) (define auth-value (string-append "Basic " (base64-encode (string->utf8 (string-append agent-uuid ":" password))))) (define uri (coordinator-uri-for-path coordinator-uri (string-append "/build/" build-id "/output/" output-name))) (define path-info (with-store store (query-path-info store file))) ;; For small outputs, compress while sending the data, but for bigger store ;; items, do all the compression up front to hopefully reduce the time to ;; send them. (if (< (path-info-nar-size path-info) 5000000) ; 5MB (retry-on-error (lambda () (with-request-mutex (lambda () (call-with-streaming-http-request uri (lambda (port) (call-with-lzip-output-port port (lambda (port) (write-file file port)) #:level 9)) #:headers `((Authorization . ,auth-value)))))) #:times 3 #:delay 30) (let* ((directory (or (getenv "TMPDIR") "/tmp")) (template (string-append directory "/guix-build-coordinator-file.XXXXXX")) (out (mkstemp! template))) (simple-format #t "compressing ~A -> ~A prior to sending\n" file template) (call-with-lzip-output-port out (lambda (port) (write-file file port)) #:level 9) (close-port out) (simple-format #t "finished compressing ~A, now sending\n" file) (retry-on-error (lambda () (with-request-mutex (lambda () (call-with-input-file template (lambda (file-port) (let-values (((response body) (call-with-streaming-http-request uri (lambda (port) (with-time-logging (simple-format #f "sending ~A" file) (dump-port file-port port #:buffer-size (expt 2 20)))) #:headers `((Authorization . ,auth-value))))) (when (>= (response-code response) 400) (raise-exception (make-exception-with-message (coordinator-handle-failed-request 'PUT (uri-path uri) response body)))))))))) #:times 9 #:delay (+ 60 (random 120))) (delete-file template)))) (define (submit-log-file coordinator-uri agent-uuid password build-id file) (define auth-value (string-append "Basic " (base64-encode (string->utf8 (string-append agent-uuid ":" password))))) (define format (cond ((string-suffix? ".bz2" file) "bzip2") ((string-suffix? ".gz" file) "gzip") (else (error "unsupported log format for" file)))) (define uri (coordinator-uri-for-path coordinator-uri (string-append "/build/" build-id "/log/" format))) (retry-on-error (lambda () (with-request-mutex (lambda () (let-values (((response body) (call-with-streaming-http-request uri (lambda (request-port) (call-with-input-file file (lambda (file-port) (dump-port file-port request-port)) #:binary #t)) #:headers `((Authorization . ,auth-value))))) (if (>= (response-code response) 400) (raise-exception (make-exception-with-message (coordinator-handle-failed-request 'PUT (uri-path uri) response body))) (begin (simple-format #t "~A: successfully uploaded log file (~A)\n" build-id (response-code response)) #t)))))) #:times 9 #:delay (+ 30 (random 60)))) (define (submit-build-result coordinator-uri agent-uuid password build-id result) (coordinator-http-request coordinator-uri agent-uuid password (string-append "/build/" build-id) #:method 'PUT ; TODO Should be PATCH #:body result)) (define (report-build-start coordinator-uri agent-uuid password build-id) (coordinator-http-request coordinator-uri agent-uuid password (string-append "/build/" build-id "/report-build-start") #:method 'POST)) (define (report-setup-failure coordinator-uri agent-uuid password build-id report) (coordinator-http-request coordinator-uri agent-uuid password (string-append "/build/" build-id "/report-setup-failure") #:method 'POST #:body report #:succeed-on-access-denied-retry? #t)) (define* (fetch-builds-for-agent coordinator-uri agent-uuid password systems #:key (count 1)) (vector->list (assoc-ref (coordinator-http-request coordinator-uri agent-uuid password (string-append "/agent/" agent-uuid "/fetch-builds") #:body `((count . ,count) (systems . ,(list->vector systems))) #:method 'POST) "builds")))