;;; Build Farm Front-End ;;; ;;; Copyright © 2023 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (bffe server) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-71) #:use-module (ice-9 vlist) #:use-module (ice-9 threads) #:use-module (ice-9 match) #:use-module (ice-9 atomic) #:use-module (ice-9 textual-ports) #:use-module (rnrs bytevectors) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (web uri) #:use-module (json) #:use-module (prometheus) #:use-module (system repl error-handling) #:use-module (fibers) #:use-module (fibers scheduler) #:use-module (fibers channels) #:use-module (fibers conditions) #:use-module (fibers web server) #:use-module ((guix store) #:select (%store-prefix)) #:use-module ((guix build utils) #:select (dump-port)) #:use-module (guix-data-service web util) #:use-module ((guix-build-coordinator utils) #:select (with-time-logging call-with-delay-logging)) #:use-module ((guix-build-coordinator utils fibers) #:select (run-server/patched retry-on-error)) #:use-module (guix-build-coordinator client-communication) #:use-module (bffe config) #:use-module (bffe view util) #:use-module (bffe view home) #:use-module (bffe view build) #:use-module (bffe view agent) #:use-module (bffe view activity) #:export (http-get* start-bffe-web-server)) ;; TODO Work around this causing problems with backtraces ;; https://github.com/wingo/fibers/issues/76 (set-record-type-printer! (@@ (fibers scheduler) ) (lambda (scheduler port) (display "#" port))) ;; Returns the port as well as the raw socket (define* (open-socket-for-uri* uri #:key (verify-certificate? #t) (non-blocking? #t)) (define tls-wrap (@@ (web client) tls-wrap)) (define https? (eq? 'https (uri-scheme uri))) (define plain-uri (if https? (build-uri 'http #:userinfo (uri-userinfo uri) #:host (uri-host uri) #:port (or (uri-port uri) 443) #:path (uri-path uri) #:query (uri-query uri) #:fragment (uri-fragment uri)) uri)) (let ((s (open-socket-for-uri plain-uri))) (values (if https? (tls-wrap s (uri-host uri) #:verify-certificate? verify-certificate?) s) s))) (define* (http-get* uri #:key (method 'GET) (body #f) (verify-certificate? #t) (port #f) (version '(1 . 1)) (keep-alive? #f) (headers '()) (decode-body? #t) ;; Default to streaming? #t since read-response-body calls ;; get-bytevector-all, which is implemented in C and ;; therefore can't be suspended (streaming? #t)) (let ((port socket (open-socket-for-uri* uri #:verify-certificate? verify-certificate?))) ;; Guile/guile-gnutls don't handle the handshake happening on a non ;; blocking socket, so change the behavior here. (let ((flags (fcntl socket F_GETFL))) (fcntl socket F_SETFL (logior O_NONBLOCK flags))) (http-request uri #:method method #:body body #:verify-certificate? verify-certificate? #:port port #:version version #:keep-alive? keep-alive? #:headers headers #:decode-body? decode-body? #:streaming? streaming?))) (define (make-state-channel event-source) (let ((channel (make-channel))) (spawn-fiber (lambda () (let loop ((last-fetch-time #f) (state #f)) (let ((reply-channel (get-message channel)) (state-age-seconds (and last-fetch-time (- (time-second (current-time)) last-fetch-time)))) (if (or (not state-age-seconds) (> state-age-seconds 120)) (let ((new-state (with-exception-handler (lambda (exn) (put-message reply-channel (cons 'exception exn)) #f) (lambda () (with-throw-handler #t (lambda () (let ((response body (http-get* (string->uri (string-append event-source "/state"))))) (let ((state (json->scm body))) (put-message reply-channel state) state))) (lambda _ (backtrace)))) #:unwind? #t))) (if new-state (loop (time-second (current-time)) new-state) (loop last-fetch-time state))) (begin (put-message reply-channel state) (loop last-fetch-time state))))))) channel)) (define (get-state state-channel) (let ((reply-channel (make-channel))) (put-message state-channel reply-channel) (match (get-message reply-channel) (('exception . exn) (raise-exception exn)) (result result)))) (define* (make-events-channel event-source initial-state-id #:key metrics-registry) (let* ((submission-channel (make-channel)) (listener-channels-box (make-atomic-box vlist-null)) (buffer-size 10000) (event-buffer (make-vector buffer-size)) (current-state-id-and-event-buffer-index-box (make-atomic-box (cons 0 -1))) (events-channel-state-id-gauge (make-gauge-metric metrics-registry "events_channel_state_id"))) (define (get-state-id) (match (atomic-box-ref current-state-id-and-event-buffer-index-box) ((current-state-id . event-buffer-index) current-state-id))) (define (spawn-fiber-for-listener callback after-state-id submission-channel listener-channel listening-finished-channel) (spawn-fiber (lambda () (let loop ((last-sent-state-id after-state-id)) (let ((new-state-id (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception listening for events: ~A\n" exn) (put-message submission-channel (list 'remove-listener listener-channel)) (put-message listening-finished-channel #t) #f) (lambda () (with-throw-handler #t (lambda () (match (atomic-box-ref current-state-id-and-event-buffer-index-box) ((current-state-id . event-buffer-index) (let ((event-count-to-send (- current-state-id last-sent-state-id))) (for-each (lambda (index index-state-id) (match (vector-ref event-buffer index) ((event-state-id event-name data) (when (not (= event-state-id index-state-id)) (error "listener behind")) (callback event-state-id event-name data)))) (map (lambda (i) (modulo i buffer-size)) (iota event-count-to-send (- event-buffer-index (- event-count-to-send 1)))) (iota event-count-to-send (+ 1 last-sent-state-id)))) current-state-id))) (lambda (key . args) (if (and (eq? key 'system-error) (match args (("fport_write" "~A" ("Broken pipe") rest ...) #t) (_ #f))) #f (backtrace))))) #:unwind? #t))) (unless (eq? #f new-state-id) (get-message listener-channel) (loop new-state-id))))) #:parallel? #t)) (define (store-event id event-name data) (match (atomic-box-ref current-state-id-and-event-buffer-index-box) ((current-state-id . event-buffer-index) (let ((new-event-index (modulo (+ 1 event-buffer-index) buffer-size))) (atomic-box-set! current-state-id-and-event-buffer-index-box (cons id new-event-index)) (vector-set! event-buffer new-event-index (list id event-name data)) (metric-set events-channel-state-id-gauge id))))) (spawn-fiber (lambda () (let* ((response remote-port (http-get* (string->uri (string-append event-source "/events")) #:headers `((last-event-id . ,(number->string (- initial-state-id ;; Get some earlier events as well, as clients that ;; are reconnecting might want them 100))))))) (setvbuf remote-port 'none) (let loop ((line (get-line remote-port))) (or (eof-object? line) ;; TODO: It would be good to replace this with a more general way ;; of processing the event stream that handles (begin (when (string-prefix? "id: " line) (let ((id (string->number (string-drop line 4))) (event-name (string-drop (get-line remote-port) (string-length "event: "))) (data (json-string->scm (string-drop (get-line remote-port) (string-length "data: "))))) (put-message submission-channel (list id event-name data)))) (loop (get-line remote-port))))))) #:parallel? #t) (spawn-fiber (lambda () (while #t (match (get-message submission-channel) (('new-listener callback requested-after-state-id listening-finished-channel) (let ((listener-channel (make-channel)) (after-state-id (match (atomic-box-ref current-state-id-and-event-buffer-index-box) ((current-state-id . event-buffer-index) (if (and requested-after-state-id (<= requested-after-state-id current-state-id) (match (vector-ref event-buffer (modulo (+ event-buffer-index (- requested-after-state-id current-state-id)) buffer-size)) ((? unspecified? _) #f) ((event-state-id event-name data) (= event-state-id requested-after-state-id)))) requested-after-state-id current-state-id))))) (atomic-box-set! listener-channels-box (vhash-consq listener-channel #t (atomic-box-ref listener-channels-box))) (spawn-fiber-for-listener callback after-state-id submission-channel listener-channel listening-finished-channel))) (('remove-listener listener-channel) (atomic-box-set! listener-channels-box (vhash-delq listener-channel (atomic-box-ref listener-channels-box)))) ((id event-name data) (store-event id event-name data) (vhash-fold (lambda (listener-channel val res) (spawn-fiber (lambda () (put-message listener-channel #t)) #:parallel? #t) #f) #f (atomic-box-ref listener-channels-box))))))) (values submission-channel get-state-id))) (define* (listen-for-events events-channel callback #:key after-state-id) (let ((listening-finished-channel (make-channel))) (put-message events-channel (list 'new-listener callback after-state-id listening-finished-channel)) ;; This is designed to be useful in the controller, so this procedure ;; shouldn't exit until callback has finished being called (get-message listening-finished-channel))) (define* (make-controller assets-directory metrics-registry events-channel state-channel event-source #:key title (template-directory (%config 'template-dir)) (derivation-link-target (const #f)) (tag-link-target (const #f))) (define handle-static-assets (if (string-prefix? (%store-prefix) assets-directory) (static-asset-from-store-renderer assets-directory) (static-asset-from-directory-renderer assets-directory))) (define gc-metrics-updater! (get-gc-metrics-updater metrics-registry)) (define home-template-content (let ((filename (string-append template-directory "/home.html"))) (if (file-exists? filename) (call-with-input-file filename get-string-all) "

Temporary home page

"))) (lambda (request method-and-path-components mime-types body) (define path (uri-path (request-uri request))) (match method-and-path-components (('GET) (render-html #:sxml (home title home-template-content))) (('GET "activity") (render-html #:sxml (activity title (get-state state-channel)))) (('GET "state") (render-json (get-state state-channel))) (('GET "events") (let ((headers (request-headers request)) (query-parameters (let lp ((lst (or (and=> (uri-query (request-uri request)) (lambda (query) (map uri-decode (string-split query (char-set #\& #\=))))) '()))) (match lst ((key value . rest) (cons (cons key value) (lp rest))) (("") '()) (() '()))))) (list (build-response #:code 200 #:headers '((content-type . (text/event-stream)))) (lambda (port) (listen-for-events events-channel (lambda (id event-name data) (display (simple-format #f "id: ~A\nevent: ~A\ndata: ~A\n\n" id event-name (scm->json-string data)) port) (force-output port) ;; TODO because the chunked output port doesn't call ;; force-output on the underlying port, do that here. We ;; want this event to be sent now, rather than when some ;; buffer fills up. (force-output (request-port request))) #:after-state-id (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: when processing Last-Event-ID header value: ~A\n" (assoc-ref headers 'last-event-id)) #f) (lambda () (or (and=> (assoc-ref headers 'last-event-id) string->number) (and=> (assoc-ref query-parameters "last_event_id") string->number))) #:unwind? #t)))))) (('GET "build" uuid) (let ((response body (http-get* (string->uri (string-append event-source "/build/" uuid)) #:headers '((accept . ((application/json))))))) (case (most-appropriate-mime-type mime-types '(application/json text/html)) ((application/json) (render-json (json->scm body))) (else (render-html #:sxml (build title (json->scm body) derivation-link-target tag-link-target)))))) (('GET "agent" agent-id) (let ((response body (http-get* (string->uri (string-append event-source "/agent/" agent-id)) #:headers '((accept . ((application/json))))))) (case (most-appropriate-mime-type mime-types '(application/json text/html)) ((application/json) (render-json (json->scm body))) (else (render-html #:sxml (agent title (json->scm body))))))) (('GET "agent" agent-id "build-allocation-plan") (let ((response body (http-get* (string->uri (string-append event-source "/agent/" agent-id "/build-allocation-plan")) #:headers '((accept . ((application/json))))))) (render-json (json->scm body)))) (('GET "assets" rest ...) (or (handle-static-assets (string-join rest "/") (request-headers request)) (list (build-response #:code 404) (string-append "Resource not found: " (uri->string (request-uri request)))))) (('GET "bffe" "metrics") (gc-metrics-updater!) (list (build-response #:code 200 #:headers '((content-type . (text/plain)) (vary . (accept)))) (lambda (port) (write-metrics metrics-registry port)))) ((method path ...) (render-html #:sxml (general-not-found "Page not found" "") #:code 404))))) (define (handler request body controller) (display (format #f "~a ~a\n" (request-method request) (uri-path (request-uri request)))) (call-with-error-handling (lambda () (let-values (((request-components mime-types) (request->path-components-and-mime-type request))) (controller request (cons (request-method request) request-components) mime-types body))) #:on-error 'backtrace #:post-error (lambda args (render-html #:sxml (error-page args) #:code 500)))) (define* (start-bffe-web-server #:key (port 8767) (host "0.0.0.0") (assets-directory (%config 'assets-dir)) (event-source "http://localhost:8746") (controller-args '()) pid-file metrics-registry) (when pid-file (call-with-output-file pid-file (lambda (port) (simple-format port "~A\n" (getpid))))) (let* ((state-channel (make-state-channel event-source)) (initial-state-id (retry-on-error (lambda () (assoc-ref (get-state state-channel) "state_id")) #:times 6 #:delay 10))) (simple-format #t "Starting from state ~A\n" initial-state-id) (let* ((events-channel get-state-id (make-events-channel event-source initial-state-id #:metrics-registry metrics-registry)) (controller (apply make-controller assets-directory metrics-registry events-channel state-channel event-source controller-args))) ;; Wait until the events channel catches up (while (< (get-state-id) initial-state-id) (sleep 1)) (simple-format #t "Starting the server\n") (run-server/patched (lambda (request body) (apply values (handler request body controller))) #:host host #:port port))))