;;; Build Farm Front-End ;;; ;;; Copyright © 2023 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (bffe manage-builds) #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (ice-9 textual-ports) #:use-module (logging logger) #:use-module (prometheus) #:use-module (zlib) #:use-module (json) #:use-module (fibers) #:use-module (fibers channels) #:use-module (guix records) #:use-module (web uri) #:use-module (web response) #:use-module (guix-build-coordinator client-communication) #:use-module ((guix-build-coordinator utils fibers) #:select (retry-on-error)) #:use-module (bffe server) #:export (build-from-guix-data-service build-from-guix-data-service-data-service-url build-from-guix-data-service-branches build-from-guix-data-service-systems build-from-guix-data-service-systems-and-targets build-from-guix-data-service-submit-builds-for-channel-instances? build-from-guix-data-service-build-priority build-from-guix-data-service-data-service-build-server-id start-submit-builds-fibers)) (define-record-type* build-from-guix-data-service make-build-from-guix-data-service build-from-guix-data-service? (data-service-url build-from-guix-data-service-data-service-url) (build-coordinator-url build-from-guix-data-service-build-coordinator-url) (branches build-from-guix-data-service-branches) (systems build-from-guix-data-service-systems) (systems-and-targets build-from-guix-data-service-systems-and-targets (default '())) (submit-builds-for-channel-instances? build-from-guix-data-service-submit-builds-for-channel-instances? (default #f)) (build-priority build-from-guix-data-service-build-priority (default #f)) (data-service-build-server-id build-from-guix-data-service-data-service-build-server-id (default #f)) (revision-parameters build-from-guix-data-service-revision-parameters (default '())) (parallelism build-from-guix-data-service-parallelism (default 2))) (define-exception-type &guix-data-service-error &error make-guix-data-service-error guix-data-service-error? (response-body guix-data-service-error-response-body) (response-code guix-data-service-error-response-code)) (define* (guix-data-service-request guix-data-service path #:optional (query-parameters '()) #:key (retry-times 0) (retry-delay 15)) (define uri (string->uri (string-append guix-data-service path (if (null? query-parameters) "" (string-append "?" (string-join (map (match-lambda ((key . value) (simple-format #f "~A=~A" key value))) query-parameters) "&")))))) (define (make-request) (let ((response body (http-get* uri #:headers '((accept-encoding . ((1 . "gzip")))) #:streaming? #t))) (cond ((eq? (response-code response) 404) #f) ((>= (response-code response) 400) (let ((json-body (with-exception-handler (lambda (exn) 'error-decoding-body) (lambda () (match (response-content-encoding response) (('gzip) (let ((zlib-input (make-zlib-input-port body #:format 'gzip))) (json->scm zlib-input))) (_ (json->scm body)))) #:unwind? #t))) (raise-exception (make-guix-data-service-error json-body (response-code response))))) (else (let ((json-body (match (response-content-encoding response) (('gzip) (let ((zlib-input (make-zlib-input-port body #:format 'gzip))) (json->scm zlib-input))) (_ (json->scm body))))) (values json-body response)))))) (if (= 0 retry-times) (make-request) (retry-on-error make-request #:times retry-times #:delay retry-delay #:ignore (lambda (exn) (and (guix-data-service-error? exn) (< (guix-data-service-error-response-code exn) 500)))))) (define* (fiberize proc #:key (parallelism 1)) (let ((channel (make-channel))) (for-each (lambda _ (spawn-fiber (lambda () (while #t (let ((reply-channel args (car+cdr (get-message channel)))) (put-message reply-channel (with-exception-handler (lambda (exn) (cons 'exception exn)) (lambda () (with-throw-handler #t (lambda () (call-with-values (lambda () (apply proc args)) (lambda vals (cons 'result vals)))) (lambda _ (backtrace)))) #:unwind? #t))))) #:parallel? #t)) (iota parallelism)) (lambda args (let ((reply-channel (make-channel))) (put-message channel (cons reply-channel args)) (match (get-message reply-channel) (('result . vals) (apply values vals)) (('exception . exn) (raise-exception exn))))))) (define (fibers-map proc . lists) (let ((channels (apply map (lambda args (let ((channel (make-channel))) (spawn-fiber (lambda () (put-message channel (with-exception-handler (lambda (exn) (cons 'exception exn)) (lambda () (with-throw-handler #t (lambda () (call-with-values (lambda () (apply proc args)) (lambda val (cons 'result val)))) (lambda _ (backtrace)))) #:unwind? #t)))) channel)) lists))) (map (match-lambda (('result . val) val) (('exception . exn) (raise-exception exn))) (map get-message channels)))) (define (fibers-for-each proc . lists) ;; Like split-at, but don't care about the order of the resulting lists, and ;; don't error if the list is shorter than i elements (define (split-at* lst i) (let lp ((l lst) (n i) (acc '())) (if (or (<= n 0) (null? l)) (values (reverse! acc) l) (lp (cdr l) (- n 1) (cons (car l) acc))))) ;; As this can be called with lists with tens of thousands of items in them, ;; batch the (define batch-size 20) (define (get-batch lists) (let ((split-lists (map (lambda (lst) (let ((batch rest (split-at* lst batch-size))) (cons batch rest))) lists))) (values (map car split-lists) (map cdr split-lists)))) (let loop ((lists lists)) (call-with-values (lambda () (get-batch lists)) (lambda (batch rest) (apply fibers-map proc batch) (unless (null? (car rest)) (loop rest))))) *unspecified*) (define (all-repository-ids guix-data-service) (let ((data (guix-data-service-request guix-data-service "/repositories.json" #:retry-times 3))) (map (lambda (entry) (assoc-ref entry "id")) (vector->list (assoc-ref data "repositories"))))) (define (all-repository-branches guix-data-service repository-id) (let ((data (guix-data-service-request guix-data-service (string-append "/repository/" (number->string repository-id) ".json") #:retry-times 3))) (map (lambda (entry) (assoc-ref entry "name")) (vector->list (assoc-ref data "branches"))))) (define* (channel-instance-derivations-for-commit guix-data-service commit) (let ((data (guix-data-service-request guix-data-service (string-append "/revision/" commit "/channel-instances.json") #:retry-times 3))) (assoc-ref data "channel_instances"))) (define* (package-derivations-for-commit guix-data-service commit #:key system target guix-data-service-build-server-id) (let ((data (guix-data-service-request guix-data-service (string-append "/revision/" commit "/package-derivations.json") `((system . ,system) (target . ,target) (field . "(no-additional-fields)") ,@(if guix-data-service-build-server-id `((no_build_from_build_server . ,guix-data-service-build-server-id)) '()) (all_results . "on")) #:retry-times 3))) (map (lambda (entry) (assoc-ref entry "derivation")) (vector->list (assoc-ref data "derivations"))))) (define* (submit-channel-instance-builds-for-revision submit-build/fiberized coordinator guix-data-service commit systems priority-for-derivation #:key guix-data-service-build-server-id branch) (log-msg 'INFO "requesting channel instance derivations (" commit ")") (let* ((all-channel-instance-derivations (channel-instance-derivations-for-commit guix-data-service commit)) (channel-instance-derivations-to-submit (filter (lambda (channel-instance-derivation) (and (member (assoc-ref channel-instance-derivation "system") systems) (or (not guix-data-service-build-server-id) (not (find (lambda (build) (and (= (assoc-ref build "build_server_id") guix-data-service-build-server-id) (not (string=? (assoc-ref build "status") "canceled")))) (vector->list (assoc-ref channel-instance-derivation "builds"))))))) (or (and=> all-channel-instance-derivations vector->list) '())))) (unless (null? channel-instance-derivations-to-submit) (log-msg 'INFO "submitting " (length channel-instance-derivations-to-submit) " channel instance builds for (" commit ")") (fibers-for-each (lambda (channel-instance-derivation) (let ((derivation (assoc-ref channel-instance-derivation "derivation")) (system (assoc-ref channel-instance-derivation "system"))) (submit-build/fiberized coordinator guix-data-service derivation #:priority (priority-for-derivation 'channel-instance system "none") #:log-prefix (simple-format #f "channel instance (~A): ~A: " system derivation) #:tags `(((key . category) (value . channel-instance)) ((key . revision) (value . ,commit)) ,@(if branch `(((key . branch) (value . ,branch))) '()))))) channel-instance-derivations-to-submit)))) (define* (submit-package-builds-for-revision submit-build/fiberized coordinator guix-data-service commit system target priority-for-derivation #:key guix-data-service-build-server-id branch) (define log-suffix (if (string=? target "none") (simple-format #f "~A (~A)" system commit) (simple-format #f "~A=>~A (~A)" system target commit))) (log-msg 'INFO "requesting package derivations for " log-suffix) (let ((unprocessed-package-derivations (package-derivations-for-commit guix-data-service commit #:system system #:target target #:guix-data-service-build-server-id guix-data-service-build-server-id))) (unless (null? unprocessed-package-derivations) (log-msg 'INFO "submitting " (length unprocessed-package-derivations) " package builds for " log-suffix) (fibers-for-each (lambda (derivation) (submit-build/fiberized coordinator guix-data-service derivation #:priority (priority-for-derivation 'package system target) #:log-prefix (if (string=? target "none") (simple-format #f "package (~A): ~A: " system derivation) (simple-format #f "package (~A=>~A): ~A: " system target derivation)) #:tags `(((key . category) (value . package)) ((key . revision) (value . ,commit)) ,@(if branch `(((key . branch) (value . ,branch))) '())))) unprocessed-package-derivations) (log-msg 'INFO "finished submitting " (length unprocessed-package-derivations) " package builds for " log-suffix)))) (define (submit-builds-pass metrics-registry submit-build/fiberized guix-data-service specification processed-commits-hash record-revision-as-processed systems-and-targets) (define unseen-revisions-metric (or (metrics-registry-fetch-metric metrics-registry "unseen_revisions_total") (make-gauge-metric metrics-registry "unseen_revisions_total"))) (fibers-for-each (lambda (repository-id) (fibers-for-each (lambda (branch) (let* ((branch-revisions (guix-data-service-request guix-data-service (string-append "/repository/" (number->string repository-id) "/branch/" branch ".json") (build-from-guix-data-service-revision-parameters specification) #:retry-times 3)) (unseen-revisions (filter-map (lambda (entry) (let ((commit (assoc-ref entry "commit-hash"))) (and (not (hash-ref processed-commits-hash commit)) (assoc-ref entry "data_available") commit))) (vector->list (assoc-ref branch-revisions "revisions"))))) (log-msg 'DEBUG (length unseen-revisions) " unseen revisions") (metric-set unseen-revisions-metric (length unseen-revisions)) (for-each (lambda (commit) (log-msg 'INFO "looking at revision " commit) (when (build-from-guix-data-service-submit-builds-for-channel-instances? specification) (submit-channel-instance-builds-for-revision submit-build/fiberized (build-from-guix-data-service-build-coordinator-url specification) guix-data-service commit (map car systems-and-targets) (build-from-guix-data-service-build-priority specification) #:guix-data-service-build-server-id (build-from-guix-data-service-data-service-build-server-id specification) #:branch branch)) (fibers-for-each (match-lambda ((system . target) (submit-package-builds-for-revision submit-build/fiberized (build-from-guix-data-service-build-coordinator-url specification) guix-data-service commit system target (build-from-guix-data-service-build-priority specification) #:guix-data-service-build-server-id (build-from-guix-data-service-data-service-build-server-id specification) #:branch branch))) systems-and-targets) (log-msg 'INFO "finished looking at revision " commit) (metric-decrement unseen-revisions-metric) (record-revision-as-processed commit)) (reverse unseen-revisions)))) (let ((all-branches (all-repository-branches guix-data-service repository-id)) (specified-branches (build-from-guix-data-service-branches specification))) (or specified-branches all-branches)))) (all-repository-ids guix-data-service)) (sleep 60)) (define* (start-submit-builds-fibers metrics-registry specification #:key processed-commits-file) (define processed-commits-hash (make-hash-table)) (define (record-revision-as-processed commit) (hash-set! processed-commits-hash commit #t) (when processed-commits-file (let ((port (open-file processed-commits-file "a"))) (simple-format port "~A\n" commit) (close-port port)))) (define* (submit-build coordinator guix-data-service derivation #:key (priority 0) (log-prefix "") (tags '())) (retry-on-error (lambda () (let ((response (send-submit-build-request coordinator derivation (list guix-data-service) #f priority #t #t #t tags))) (let ((no-build-submitted-response (assoc-ref response "no-build-submitted"))) (if no-build-submitted-response (log-msg 'DEBUG log-prefix "skipped: " no-build-submitted-response) (log-msg 'DEBUG log-prefix "build submitted as " (assoc-ref response "build-submitted")))))) ;; The TTL Guix uses for transient failures fetching substitutes is 10 ;; minutes, so we need to retry for longer than that #:times 30 #:delay 30)) (let* ((systems-and-targets (append (map (lambda (system) (cons system "none")) (build-from-guix-data-service-systems specification)) (build-from-guix-data-service-systems-and-targets specification))) (guix-data-service (build-from-guix-data-service-data-service-url specification))) (when (and processed-commits-file (file-exists? processed-commits-file)) (call-with-input-file processed-commits-file (lambda (port) (let ((commits (string-split (get-string-all port) #\newline))) (for-each (lambda (commit) (unless (string-null? commit) (simple-format #t "marking ~A as already processed\n" commit) (hash-set! processed-commits-hash commit #t))) commits))))) (let ((submit-build/fiberized (fiberize submit-build #:parallelism (build-from-guix-data-service-parallelism specification)))) (spawn-fiber (lambda () (while #t (with-exception-handler (lambda (exn) (with-exception-handler (const #t) (lambda () (log-msg 'ERROR "exception in submit builds fiber: " exn)) #:unwind? #t)) (lambda () (with-throw-handler #t (lambda () (submit-builds-pass metrics-registry submit-build/fiberized guix-data-service specification processed-commits-hash record-revision-as-processed systems-and-targets)) (lambda _ (backtrace)))) #:unwind? #t)))))))