;;; Guix Build Coordinator ;;; ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of the guix-build-coordinator. ;;; ;;; The Guix Build Coordinator is free software; you can redistribute ;;; it and/or modify it under the terms of the GNU General Public ;;; License as published by the Free Software Foundation; either ;;; version 3 of the License, or (at your option) any later version. ;;; ;;; The Guix Build Coordinator is distributed in the hope that it will ;;; be useful, but WITHOUT ANY WARRANTY; without even the implied ;;; warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ;;; See the GNU General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (guix-build-coordinator agent-messaging http server) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (ice-9 match) #:use-module (ice-9 ftw) #:use-module (ice-9 format) #:use-module (ice-9 exceptions) #:use-module (ice-9 textual-ports) #:use-module (ice-9 binary-ports) #:use-module (system repl error-handling) #:use-module (rnrs bytevectors) #:use-module (logging logger) #:use-module (json) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (web uri) #:use-module (lzlib) #:use-module (gcrypt base16) #:use-module (gcrypt hash) #:use-module (prometheus) #:use-module (guix base32) #:use-module (guix base64) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator coordinator) #:export (http-agent-messaging-start-server)) (define (bad-request message . args) (throw 'bad-request message args)) (define (fixed/read-request-body r) "Reads the request body from R, as a bytevector. Return ‘#f’ if there was no request body." (cond ((member '(chunked) (request-transfer-encoding r)) (make-chunked-input-port* (request-port r) ;; closing the port is handled elsewhere #:keep-alive? #t)) (else (let ((nbytes (request-content-length r))) (and nbytes (let ((bv (get-bytevector-n (request-port r) nbytes))) (if (= (bytevector-length bv) nbytes) bv (bad-request "EOF while reading request body: ~a bytes of ~a" (bytevector-length bv) nbytes)))))))) (module-set! (resolve-module '(web request)) 'read-request-body fixed/read-request-body) (define (http-agent-messaging-start-server port host secret-key-base build-coordinator chunked-request-channel) (define gc-metrics-updater (get-gc-metrics-updater (build-coordinator-metrics-registry build-coordinator))) (define datastore-metrics-updater (base-datastore-metrics-updater build-coordinator)) (define (update-managed-metrics!) (call-with-delay-logging datastore-metrics-updater) (call-with-delay-logging gc-metrics-updater)) (call-with-error-handling (lambda () (run-server/patched (lambda (request body) (log-msg (build-coordinator-logger build-coordinator) 'INFO (format #f "~4a ~a\n" (request-method request) (uri-path (request-uri request)))) (apply values (controller request (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) body secret-key-base build-coordinator chunked-request-channel update-managed-metrics!))) #:host host #:port port)) #:on-error 'backtrace #:post-error (lambda (key . args) (when (eq? key 'system-error) (match args (("bind" "~A" ("Address already in use") _) (simple-format (current-error-port) "\n error: guix-build-coordinator could not start, as it could not bind to port ~A Check if it's already running, or whether another process is using that port. Also, the port used can be changed by passing the --port option.\n" port))))))) (define* (render-json json #:key (extra-headers '()) (code 200)) (list (build-response #:code code #:headers (append extra-headers '((content-type . (application/json)) (vary . (accept))))) (lambda (port) (scm->json json port)))) (define* (render-text text #:key (extra-headers '()) (code 200)) (list (build-response #:code code #:headers (append extra-headers '((content-type . (text/plain)) (vary . (accept))))) (lambda (port) (display text port)))) (define (no-content) (list (build-response #:code 204) "")) (define (base-datastore-metrics-updater build-coordinator) (define datastore (build-coordinator-datastore build-coordinator)) (define registry (build-coordinator-metrics-registry build-coordinator)) (let ((internal-real-time (make-gauge-metric registry "guile_internal_real_time")) (internal-run-time (make-gauge-metric registry "guile_internal_run_time")) (allocated-builds-total (make-gauge-metric registry "allocated_builds_total" #:labels '(agent_id))) (unprocessed-hook-events-total (make-gauge-metric registry "unprocessed_hook_events_total" #:labels '(event)))) (define (zero-metric-for-agents metric) (for-each (lambda (agent-details) (metric-set metric 0 #:label-values `((agent_id . ,(assq-ref agent-details 'uuid))))) (datastore-list-agents datastore))) (lambda () (metric-set internal-real-time (get-internal-real-time)) (metric-set internal-run-time (get-internal-run-time)) ;; These are the db size metrics (datastore-update-metrics! datastore) (zero-metric-for-agents allocated-builds-total) (for-each (match-lambda ((agent-id . count) (metric-set allocated-builds-total count #:label-values `((agent_id . ,agent-id))))) (datastore-count-allocated-builds datastore)) (for-each (match-lambda ((event . _) (metric-set unprocessed-hook-events-total 0 #:label-values `((event . ,event))))) (build-coordinator-hooks build-coordinator)) (for-each (lambda (event-count) (metric-set unprocessed-hook-events-total (assq-ref event-count 'count) #:label-values `((event . ,(assq-ref event-count 'event))))) (datastore-count-unprocessed-hook-events datastore))))) (define (controller request method-and-path-components body secret-key-base build-coordinator chunked-request-channel update-managed-metrics!) (define (authenticated? uuid request) (let* ((authorization-base64 (match (assq-ref (request-headers request) 'authorization) (('basic . s) s))) (authorization (utf8->string (base64-decode authorization-base64)))) (match (string-split authorization #\:) ((auth-uuid auth-password) (and (string? uuid) (string=? auth-uuid uuid) (datastore-agent-password-exists? datastore uuid auth-password))) (_ #f)))) (define datastore (build-coordinator-datastore build-coordinator)) (define logger (build-coordinator-logger build-coordinator)) (define (controller-thunk) (match method-and-path-components (('GET "agent" uuid) (let ((agent (datastore-find-agent datastore uuid))) (if agent (render-json `((agent . ,uuid) ,@agent)) (render-json (simple-format #f "no agent found with id: ~A" uuid) #:code 404)))) (('PUT "agent" uuid) (if (authenticated? uuid request) (begin ;; TODO Update status (render-json (agent-details datastore uuid))) (render-json '(("error" . "access denied")) #:code 403))) (('POST "agent" "fetch-session-credentials") (let* ((query-parameters (request-query-parameters request)) (name (assq-ref query-parameters 'name)) (token (assq-ref query-parameters 'token))) (if (and (string? name) (string? token)) (if (datastore-dynamic-auth-token-exists? datastore token) (let ((agent-uuid (or (datastore-find-agent-by-name datastore name) (new-agent datastore #:name name)))) (let ((password (match (datastore-agent-list-passwords datastore agent-uuid) (() (new-agent-password datastore #:agent agent-uuid)) ((password . rest) password)))) (render-json `((id . ,agent-uuid) (password . ,password))))) (render-json '(("error" . "token not recognised")) #:code 403)) (render-json '(("error" . "access denied")) #:code 403)))) (('POST "agent" uuid "fetch-builds") (if (authenticated? uuid request) (let* ((json-body (json-string->scm (utf8->string body))) ;; count is deprecated, use target_count instead (count (assoc-ref json-body "count")) (target-count (assoc-ref json-body "target_count")) (systems (assoc-ref json-body "systems")) (builds (fetch-builds build-coordinator uuid (vector->list systems) target-count count))) (render-json `((builds . ,(list->vector builds))))) (render-json '(("error" . "access denied")) #:code 403))) (('PUT "build" uuid) (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (begin (handle-build-result build-coordinator agent-id-for-build uuid (json-string->scm (utf8->string body))) (render-json "message received")) (render-json '(("error" . "access denied")) #:code 403)))) (('POST "build" uuid "report-build-start") (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (begin (handle-build-start-report build-coordinator agent-id-for-build uuid) (render-json "message received")) (render-json '(("error" . "access denied")) #:code 403)))) (('POST "build" uuid "report-setup-failure") (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (begin (handle-setup-failure-report build-coordinator agent-id-for-build uuid (json-string->scm (utf8->string body))) (render-json "message received")) (render-json '(("error" . "access denied")) #:code 403)))) (('PUT "build" uuid "log" format) (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (let* ((output-file-name (build-log-file-destination uuid format)) (tmp-output-file-name (string-append output-file-name ".tmp"))) (mkdir-p (dirname output-file-name)) (when (file-exists? tmp-output-file-name) (delete-file tmp-output-file-name)) (for-each (lambda (file) (simple-format #t "~A: removing stale log file ~A\n" uuid file) (delete-file (string-append (dirname output-file-name) "/" file))) (scandir (dirname output-file-name) (lambda (file) (not (member file '("." "..")))))) (if (bytevector? body) (call-with-output-file tmp-output-file-name (lambda (output-port) (put-bytevector output-port body))) (call-with-worker-thread chunked-request-channel (lambda () (call-with-output-file tmp-output-file-name (lambda (output-port) (let loop ((bv (get-bytevector-some body))) (unless (eof-object? bv) (put-bytevector output-port bv) (loop (get-bytevector-some body))))))))) (rename-file tmp-output-file-name output-file-name) (no-content)) (render-json '(("error" . "access denied")) #:code 403)))) (('HEAD "build" uuid "output" output-name) (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (let* ((output-file-name (build-output-file-location datastore uuid output-name))) (if (file-exists? output-file-name) (let ((bytes (stat:size (stat output-file-name)))) (list (build-response #:code 200 #:headers `((content-length . ,bytes))) #f)) (list (build-response #:code 404) #f))) (list (build-response #:code 403) #f)))) (('PUT "build" uuid "output" output-name) (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (define (compute-hash file-name) (bytevector->nix-base32-string (with-exception-handler (lambda (exn) (log-msg logger 'WARN (simple-format #f "error computing hash: ~A" exn)) (when (file-exists? file-name) (let ((md5-hash (bytevector->base16-string (file-hash (hash-algorithm md5) file-name))) (file-bytes (stat:size (stat file-name)))) ;; I've seen exceptions happen here from lzip, so try ;; deleting the tmp file so that it's re-uploaded. (log-msg logger 'WARN (simple-format #f "deleting ~A" file-name)) (delete-file file-name) (raise-exception (make-exception exn (make-exception-with-irritants `((file-bytes . ,file-bytes) (md5-hash . ,md5-hash))))))) (raise-exception exn)) (lambda () (call-with-input-file file-name (lambda (compressed-port) (call-with-lzip-input-port compressed-port port-sha256)))) #:unwind? #t))) (define (receive-file output-file-name tmp-output-file-name) (call-with-worker-thread chunked-request-channel (lambda () (call-with-output-file tmp-output-file-name (lambda (output-port) (let ((start-time (current-time time-utc))) (let loop ((bv (get-bytevector-some body)) (bytes-read 0) (last-progress-update-bytes-read 0)) (if (eof-object? bv) (let* ((end-time (current-time time-utc)) (elapsed (time-difference end-time start-time)) (seconds-elapsed (+ (time-second elapsed) (/ (time-nanosecond elapsed) 1e9)))) (display (simple-format #f "received ~A took ~A seconds data transfered: ~AMB (~A bytes) speed (MB/s): ~A " (basename output-file-name) seconds-elapsed (format #f "~2,2f" (/ bytes-read 1000000)) bytes-read (format #f "~2,2f" (/ (/ bytes-read 1000000) seconds-elapsed))))) (begin (put-bytevector output-port bv) (loop (get-bytevector-some body) (+ bytes-read (bytevector-length bv)) (if (> (- bytes-read last-progress-update-bytes-read) 50000000) ; ~50MB (begin (display (simple-format #f "receiving ~A ~AMB read so far... " (basename output-file-name) (format #f "~2,2f" (/ bytes-read 1000000)))) bytes-read) last-progress-update-bytes-read)))))))) ;; Compute the hash of the file (let ((hash (compute-hash tmp-output-file-name))) (rename-file tmp-output-file-name output-file-name) hash)))) (if (authenticated? agent-id-for-build request) (let* ((output-file-name (build-output-file-location datastore uuid output-name)) (tmp-output-file-name (string-append output-file-name ".tmp"))) (mkdir-p (dirname output-file-name)) (when (file-exists? output-file-name) (delete-file output-file-name)) (when (file-exists? tmp-output-file-name) (delete-file tmp-output-file-name)) (let ((hash (if (bytevector? body) (begin (call-with-output-file tmp-output-file-name (lambda (output-port) (put-bytevector output-port body))) (let ((hash (compute-hash tmp-output-file-name))) (rename-file tmp-output-file-name output-file-name) hash)) (receive-file output-file-name tmp-output-file-name)))) (call-with-output-file (string-append output-file-name ".hash") (lambda (port) (simple-format port "~A\n" hash))) (no-content))) (render-json '(("error" . "access denied")) #:code 403)))) (('HEAD "build" uuid "output" output-name "partial") (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (if (authenticated? agent-id-for-build request) (let* ((output-file-name (build-output-file-location datastore uuid output-name)) (tmp-output-file-name (string-append output-file-name ".tmp"))) (if (file-exists? tmp-output-file-name) (let ((bytes (stat:size (stat tmp-output-file-name)))) (list (build-response #:code 200 #:headers `((content-length . ,bytes))) #f)) (list (build-response #:code 404) #f))) (list (build-response #:code 403) #f)))) (('POST "build" uuid "output" output-name "partial") (let ((agent-id-for-build (datastore-agent-for-build datastore uuid))) (define (compute-hash file-name) (bytevector->nix-base32-string (with-exception-handler (lambda (exn) (log-msg logger 'WARN (simple-format #f "error computing hash: ~A" exn)) (when (file-exists? file-name) (let ((md5-hash (bytevector->base16-string (file-hash (hash-algorithm md5) file-name))) (file-bytes (stat:size (stat file-name)))) ;; I've seen exceptions happen here from lzip, so try ;; deleting the tmp file so that it's re-uploaded. (log-msg logger 'WARN (simple-format #f "deleting ~A" file-name)) (delete-file file-name) (raise-exception (make-exception exn (make-exception-with-irritants `((file-bytes . ,file-bytes) (md5-hash . ,md5-hash))))))) (raise-exception exn)) (lambda () (call-with-input-file file-name (lambda (compressed-port) (call-with-lzip-input-port compressed-port port-sha256)))) #:unwind? #t))) (define (receive-file output-file-name tmp-output-file-name) (call-with-worker-thread chunked-request-channel (lambda () (let ((output-port (open-file tmp-output-file-name "a"))) (let ((start-time (current-time time-utc))) (let loop ((bv (get-bytevector-some body)) (bytes-read 0) (last-progress-update-bytes-read 0)) (if (eof-object? bv) (let* ((end-time (current-time time-utc)) (elapsed (time-difference end-time start-time)) (seconds-elapsed (+ (time-second elapsed) (/ (time-nanosecond elapsed) 1e9)))) (display (simple-format #f "received ~A took ~A seconds data transfered: ~AMB (~A bytes) speed (MB/s): ~A " (basename output-file-name) seconds-elapsed (format #f "~2,2f" (/ bytes-read 1000000)) bytes-read (format #f "~2,2f" (/ (/ bytes-read 1000000) seconds-elapsed))))) (begin (put-bytevector output-port bv) (loop (get-bytevector-some body) (+ bytes-read (bytevector-length bv)) (if (> (- bytes-read last-progress-update-bytes-read) 50000000) ; ~50MB (begin (display (simple-format #f "receiving ~A ~AMB read so far... " (basename output-file-name) (format #f "~2,2f" (/ bytes-read 1000000)))) bytes-read) last-progress-update-bytes-read)))))) (close-port output-port)) ;; Compute the hash of the file (let ((hash (compute-hash tmp-output-file-name))) (rename-file tmp-output-file-name output-file-name) hash)))) (if (authenticated? agent-id-for-build request) (let* ((output-file-name (build-output-file-location datastore uuid output-name)) (tmp-output-file-name (string-append output-file-name ".tmp"))) ;; If the output file exists, delete it, as it's being uploaded ;; again (when (file-exists? output-file-name) (delete-file output-file-name)) (let ((hash (if (bytevector? body) (let ((output-port (open-file tmp-output-file-name "a"))) (put-bytevector output-port body) (close-port output-port) (let ((hash (compute-hash tmp-output-file-name))) (rename-file tmp-output-file-name output-file-name) hash)) (receive-file output-file-name tmp-output-file-name)))) (call-with-output-file (string-append output-file-name ".hash") (lambda (port) (simple-format port "~A\n" hash))) (no-content))) (render-json '(("error" . "access denied")) #:code 403)))) (('GET "metrics") (call-with-delay-logging update-managed-metrics! #:threshold 0.5) (list (build-response #:code 200 #:headers '((content-type . (text/plain)) (vary . (accept)))) (lambda (port) (write-metrics (build-coordinator-metrics-registry build-coordinator) port)))) (_ (render-json "not-found" #:code 404)))) (with-exception-handler (lambda (exn) (cond ((agent-error? exn) (render-json `((error . ,(agent-error-details exn))) #:code 400)) ((chunked-input-ended-prematurely-error? exn) (render-json `((error . chunked-input-ended-prematurely)) #:code 400)) (else (render-json `((error . ,(simple-format #f "~A" exn))) #:code 500)))) (lambda () (with-throw-handler #t controller-thunk (lambda (key . args) (unless (and (eq? '%exception key) (or (agent-error? (car args)) (chunked-input-ended-prematurely-error? (car args)))) (match method-and-path-components ((method path-components ...) (simple-format (current-error-port) "error: when processing: /~A ~A\n ~A ~A" method (string-join path-components "/") key args))) (backtrace))))) #:unwind? #t))