diff options
author | Christopher Baines <mail@cbaines.net> | 2023-07-24 10:13:38 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2023-07-24 13:59:18 +0100 |
commit | cafb37751c4f6d4560780d3a7062780c6a3507fb (patch) | |
tree | 65306adb1e7eef84eec86ce55d520e967ec523ce | |
parent | cae04bb5eb4e07205f45e808ac48f1581be02d78 (diff) | |
download | bffe-cafb37751c4f6d4560780d3a7062780c6a3507fb.tar bffe-cafb37751c4f6d4560780d3a7062780c6a3507fb.tar.gz |
Submit builds faster
Using more parallelism through fibers.
-rw-r--r-- | bffe.scm | 27 | ||||
-rw-r--r-- | bffe/manage-builds.scm | 603 | ||||
-rw-r--r-- | bffe/server.scm | 100 |
3 files changed, 466 insertions, 264 deletions
@@ -22,6 +22,12 @@ #:use-module (prometheus) #:use-module (logging logger) #:use-module (logging port-log) + #:use-module (fibers) + #:use-module (fibers scheduler) + #:use-module (fibers channels) + #:use-module (fibers conditions) + #:use-module ((guix-build-coordinator utils fibers) + #:select (call-with-sigint)) #:use-module (bffe server) #:use-module (bffe manage-builds) #:export (run-bffe-service)) @@ -51,10 +57,19 @@ (open-log! lgr) (set-default-logger! lgr) - (for-each start-submit-builds-thread - build) + (let ((finished? (make-condition))) + (call-with-sigint + (lambda () + (run-fibers + (lambda () + (for-each start-submit-builds-fibers + build) - (apply start-bffe-web-server - `(,@web-server-args - #:pid-file ,pid-file - #:metrics-registry ,metrics-registry)))) + (apply start-bffe-web-server + `(,@web-server-args + #:pid-file ,pid-file + #:metrics-registry ,metrics-registry)) + (wait finished?)) + #:hz 10 + #:parallelism 4)) + finished?)))) diff --git a/bffe/manage-builds.scm b/bffe/manage-builds.scm index 9ab5a3d..c72d40d 100644 --- a/bffe/manage-builds.scm +++ b/bffe/manage-builds.scm @@ -20,18 +20,23 @@ #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) #:use-module (ice-9 match) - #:use-module (ice-9 threads) + #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (ice-9 textual-ports) #:use-module (logging logger) #:use-module (logging port-log) + #:use-module (zlib) #:use-module (json) + #:use-module (fibers) + #:use-module (fibers channels) #:use-module (guix records) #:use-module (web uri) #:use-module (web client) + #:use-module (web response) #:use-module (guix-build-coordinator client-communication) #:use-module ((guix-build-coordinator utils) #:select (create-work-queue retry-on-error)) + #:use-module (bffe server) #:export (build-from-guix-data-service build-from-guix-data-service-data-service-url build-from-guix-data-service-branches @@ -41,7 +46,7 @@ build-from-guix-data-service-build-priority build-from-guix-data-service-data-service-build-server-id - start-submit-builds-thread)) + start-submit-builds-fibers)) (define-record-type* <build-from-guix-data-service> build-from-guix-data-service make-build-from-guix-data-service @@ -60,9 +65,16 @@ build-from-guix-data-service-data-service-build-server-id (default #f))) +(define-exception-type &guix-data-service-error &error + make-guix-data-service-error + guix-data-service-error? + (response-body guix-data-service-error-response-body) + (response-code guix-data-service-error-response-code)) + (define* (guix-data-service-request guix-data-service path - #:optional (query-parameters '())) + #:optional (query-parameters '()) + #:key (retry-times 0) (retry-delay 15)) (define uri (string->uri (string-append guix-data-service @@ -78,18 +90,132 @@ query-parameters) "&")))))) - (retry-on-error - (lambda () - (let ((response - body - (http-get uri))) - (json-string->scm (utf8->string body)))) - #:times 6 - #:delay (+ 15 (random 30)))) + (define (make-request) + (let ((response + body + (http-get* uri + #:headers + '((accept-encoding . ((1 . "gzip")))) + #:streaming? #t))) + (cond + ((eq? (response-code response) 404) #f) + ((>= (response-code response) 400) + (let ((json-body + (with-exception-handler + (lambda (exn) + 'error-decoding-body) + (lambda () + (match (response-content-encoding response) + (('gzip) + (let ((zlib-input + (make-zlib-input-port body + #:format 'gzip))) + (json->scm zlib-input))) + (_ + (json->scm body)))) + #:unwind? #t))) + (raise-exception + (make-guix-data-service-error json-body + (response-code response))))) + (else + (let ((json-body + (match (response-content-encoding response) + (('gzip) + (let ((zlib-input + (make-zlib-input-port body + #:format 'gzip))) + (json->scm zlib-input))) + (_ + (json->scm body))))) + (values json-body + response)))))) + + (if (= 0 retry-times) + (make-request) + (retry-on-error + make-request + #:times retry-times + #:delay retry-delay + #:ignore (lambda (exn) + (and (guix-data-service-error? exn) + (< (guix-data-service-error-response-code exn) + 500)))))) + +(define* (fiberize proc #:key (parallelism 1)) + (let ((channel (make-channel))) + (for-each + (lambda _ + (spawn-fiber + (lambda () + (while #t + (let ((reply-channel args (car+cdr + (get-message channel)))) + (put-message + reply-channel + (with-exception-handler + (lambda (exn) + (cons 'exception exn)) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals + (cons 'result vals)))) + (lambda _ + (backtrace)))) + #:unwind? #t))))) + #:parallel? #t)) + (iota parallelism)) + + (lambda args + (let ((reply-channel (make-channel))) + (put-message channel (cons reply-channel args)) + (match (get-message reply-channel) + (('result . vals) (apply values vals)) + (('exception . exn) (raise-exception exn))))))) + +(define (fibers-map proc . lists) + (let ((channels + (apply + map + (lambda args + (let ((channel (make-channel))) + (spawn-fiber + (lambda () + (put-message + channel + (with-exception-handler + (lambda (exn) + (cons 'exception exn)) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda val + (cons 'result val)))) + (lambda _ + (backtrace)))) + #:unwind? #t)))) + channel)) + lists))) + (map (lambda (channel) + (match (get-message channel) + (('result . val) val) + (('exception . exn) (raise-exception exn)))) + channels))) + +(define (fibers-for-each proc . lists) + (apply fibers-map proc lists) + *unspecified*) (define (all-repository-ids guix-data-service) (let ((data (guix-data-service-request guix-data-service - "/repositories.json"))) + "/repositories.json" + #:retry-times 3))) (map (lambda (entry) (assoc-ref entry "id")) (vector->list @@ -100,25 +226,21 @@ (string-append "/repository/" (number->string repository-id) - ".json")))) + ".json") + #:retry-times 3))) (map (lambda (entry) (assoc-ref entry "name")) (vector->list (assoc-ref data "branches"))))) -(define (channel-instance-derivations-for-commit guix-data-service - commit - system) +(define* (channel-instance-derivations-for-commit + guix-data-service + commit) (let ((data (guix-data-service-request guix-data-service - (string-append "/revision/" commit "/channel-instances.json")))) - (filter-map (lambda (entry) - (if (string=? system - (assoc-ref entry "system")) - (assoc-ref entry "derivation") - #f)) - (vector->list - (assoc-ref data "channel_instances"))))) + (string-append "/revision/" commit "/channel-instances.json") + #:retry-times 3))) + (assoc-ref data "channel_instances"))) (define* (package-derivations-for-commit guix-data-service commit @@ -134,120 +256,218 @@ `((no_build_from_build_server . ,guix-data-service-build-server-id)) '()) - (all_results . "on"))))) + (all_results . "on")) + #:retry-times 3))) (map (lambda (entry) (assoc-ref entry "derivation")) (vector->list (assoc-ref data "derivations"))))) -(define* (submit-build coordinator guix-data-service derivation - #:key (priority 0) (log-prefix "") - (tags '())) - (retry-on-error - (lambda () - (let ((response - (send-submit-build-request - coordinator - derivation - (list guix-data-service) - #f - priority - #t - #t - #t - tags))) - (let ((no-build-submitted-response - (assoc-ref response "no-build-submitted"))) - (if no-build-submitted-response - (log-msg 'DEBUG log-prefix "skipped: " no-build-submitted-response) - (log-msg 'DEBUG log-prefix "build submitted as " - (assoc-ref response "build-submitted")))))) - ;; The TTL Guix uses for transient failures fetching substitutes is 10 - ;; minutes, so we need to retry for longer than that - #:times 30 - #:delay 30)) - -(define* (submit-builds-for-revision submit-build/async - coordinator - guix-data-service - commit - systems-and-targets - priority-for-derivation - #:key (submit-builds-for-channel-instances? - #t) - guix-data-service-build-server-id - branch) - (log-msg 'INFO "looking at revision " commit) - (par-for-each - (match-lambda - ((system . target) - (when (string=? target "none") - (when submit-builds-for-channel-instances? - (log-msg 'INFO "requesting channel instance derivations for " - system " (" commit ")") - (for-each (lambda (derivation) - (submit-build/async - coordinator - guix-data-service - derivation - #:priority - (priority-for-derivation 'channel-instance - system - target) - #:log-prefix - (simple-format #f "channel instance (~A): ~A: " - system - derivation) - #:tags `(((key . category) - (value . channel-instance)) - ((key . revision) - (value . ,commit)) - ,@(if branch - `(((key . branch) - (value . ,branch))) - '())))) - (channel-instance-derivations-for-commit guix-data-service - commit - system)))) - - (let ((unprocessed-package-derivations - ;; Only request derivations in one thread at a time, just - ;; in cause doing this in parallel could lead to timeouts - (monitor - (log-msg 'INFO "requesting package derivations for " - system "=>" target " (" commit ")") - (package-derivations-for-commit guix-data-service - commit - #:system system - #:target target - #:guix-data-service-build-server-id - guix-data-service-build-server-id)))) - - (log-msg 'INFO "submitting package builds for " - system "=>" target " (" commit ")") - (for-each (lambda (derivation) - (submit-build coordinator - guix-data-service - derivation - #:priority - (priority-for-derivation 'package - system - target) - #:log-prefix - (simple-format #f "package (~A=>~A): ~A: " - system target derivation) - #:tags `(((key . category) - (value . package)) - ((key . revision) - (value . ,commit)) - ,@(if branch - `(((key . branch) - (value . ,branch))) - '())))) - unprocessed-package-derivations)))) - systems-and-targets)) - -(define* (start-submit-builds-thread specification +(define* (submit-channel-instance-builds-for-revision + submit-build/fiberized + coordinator + guix-data-service + commit + systems + priority-for-derivation + #:key + guix-data-service-build-server-id + branch) + (log-msg 'INFO "requesting channel instance derivations (" commit ")") + (let* ((all-channel-instance-derivations + (channel-instance-derivations-for-commit + guix-data-service + commit)) + (channel-instance-derivations-to-submit + (filter + (lambda (channel-instance-derivation) + (and (member (assoc-ref channel-instance-derivation "system") + systems) + (or (not guix-data-service-build-server-id) + (not + (find + (lambda (build) + (and (= (assoc-ref build "build_server_id") + guix-data-service-build-server-id) + (not (string=? (assoc-ref build "status") + "canceled")))) + (vector->list + (assoc-ref channel-instance-derivation + "builds"))))))) + (vector->list all-channel-instance-derivations)))) + + (unless (null? channel-instance-derivations-to-submit) + (log-msg 'INFO "submitting " (length channel-instance-derivations-to-submit) + " channel instance builds for (" commit ")") + (fibers-for-each + (lambda (channel-instance-derivation) + (let ((derivation + (assoc-ref channel-instance-derivation "derivation")) + (system + (assoc-ref channel-instance-derivation "system"))) + (submit-build/fiberized + coordinator + guix-data-service + derivation + #:priority + (priority-for-derivation 'channel-instance + system + "none") + #:log-prefix + (simple-format #f "channel instance (~A): ~A: " + system + derivation) + #:tags `(((key . category) + (value . channel-instance)) + ((key . revision) + (value . ,commit)) + ,@(if branch + `(((key . branch) + (value . ,branch))) + '()))))) + channel-instance-derivations-to-submit)))) + +(define* (submit-package-builds-for-revision + submit-build/fiberized + coordinator + guix-data-service + commit + system + target + priority-for-derivation + #:key + guix-data-service-build-server-id + branch) + (define log-suffix + (if (string=? target "none") + (simple-format #f "~A (~A)" + system commit) + (simple-format #f "~A=>~A (~A)" + system target commit))) + + (log-msg 'INFO "requesting package derivations for " log-suffix) + (let ((unprocessed-package-derivations + (package-derivations-for-commit guix-data-service + commit + #:system system + #:target target + #:guix-data-service-build-server-id + guix-data-service-build-server-id))) + + (unless (null? unprocessed-package-derivations) + (log-msg 'INFO "submitting " (length unprocessed-package-derivations) + " package builds for " log-suffix) + (fibers-for-each + (lambda (derivation) + (submit-build/fiberized + coordinator + guix-data-service + derivation + #:priority + (priority-for-derivation 'package + system + target) + #:log-prefix + (if (string=? target "none") + (simple-format #f "package (~A): ~A: " + system derivation) + (simple-format #f "package (~A=>~A): ~A: " + system target derivation)) + #:tags `(((key . category) + (value . package)) + ((key . revision) + (value . ,commit)) + ,@(if branch + `(((key . branch) + (value . ,branch))) + '())))) + unprocessed-package-derivations)))) + +(define (submit-builds-pass + submit-build/fiberized + guix-data-service + specification + processed-commits-hash + record-revision-as-processed + systems-and-targets) + (fibers-for-each + (lambda (repository-id) + (fibers-for-each + (lambda (branch) + (let* ((branch-revisions + (guix-data-service-request guix-data-service + (string-append + "/repository/" + (number->string repository-id) + "/branch/" + branch + ".json") + #:retry-times 3)) + (unseen-revisions + (filter-map (lambda (entry) + (let ((commit (assoc-ref entry "commit-hash"))) + (and (not (hash-ref processed-commits-hash + commit)) + (assoc-ref entry "data_available") + commit))) + (vector->list + (assoc-ref branch-revisions + "revisions"))))) + (log-msg 'DEBUG (length unseen-revisions) + " unseen revisions") + (for-each + (lambda (commit) + (log-msg 'INFO "looking at revision " commit) + + (when (build-from-guix-data-service-submit-builds-for-channel-instances? + specification) + (submit-channel-instance-builds-for-revision + submit-build/fiberized + (build-from-guix-data-service-build-coordinator-url + specification) + guix-data-service + commit + (map car systems-and-targets) + (build-from-guix-data-service-build-priority + specification) + #:guix-data-service-build-server-id + (build-from-guix-data-service-data-service-build-server-id + specification) + #:branch + branch)) + + (fibers-for-each + (match-lambda + ((system . target) + (submit-package-builds-for-revision + submit-build/fiberized + (build-from-guix-data-service-build-coordinator-url + specification) + guix-data-service + commit + system + target + (build-from-guix-data-service-build-priority + specification) + #:guix-data-service-build-server-id + (build-from-guix-data-service-data-service-build-server-id + specification) + #:branch branch))) + systems-and-targets) + (record-revision-as-processed commit)) + (reverse unseen-revisions)))) + (let ((all-branches (all-repository-branches guix-data-service + repository-id)) + (specified-branches + (build-from-guix-data-service-branches + specification))) + (or specified-branches + all-branches)))) + (all-repository-ids guix-data-service)) + + (sleep 60)) + +(define* (start-submit-builds-fibers specification #:key processed-commits-file) (define processed-commits-hash @@ -257,10 +477,36 @@ (hash-set! processed-commits-hash commit #t) (when processed-commits-file - (monitor - (let ((port (open-file processed-commits-file "a"))) - (simple-format port "~A\n" commit) - (close-port port))))) + (let ((port (open-file processed-commits-file "a"))) + (simple-format port "~A\n" commit) + (close-port port)))) + +(define* (submit-build coordinator guix-data-service derivation + #:key (priority 0) (log-prefix "") + (tags '())) + (retry-on-error + (lambda () + (let ((response + (send-submit-build-request + coordinator + derivation + (list guix-data-service) + #f + priority + #t + #t + #t + tags))) + (let ((no-build-submitted-response + (assoc-ref response "no-build-submitted"))) + (if no-build-submitted-response + (log-msg 'DEBUG log-prefix "skipped: " no-build-submitted-response) + (log-msg 'DEBUG log-prefix "build submitted as " + (assoc-ref response "build-submitted")))))) + ;; The TTL Guix uses for transient failures fetching substitutes is 10 + ;; minutes, so we need to retry for longer than that + #:times 30 + #:delay 30)) (let* ((systems-and-targets (append @@ -286,14 +532,11 @@ #t))) commits))))) - (let ((submit-build/async - count-jobs - count-threads - list-jobs - (create-work-queue 4 - submit-build - #:name "submit-builds"))) - (call-with-new-thread + (let ((submit-build/fiberized + (fiberize submit-build + #:parallelism 4))) + + (spawn-fiber (lambda () (while #t (with-exception-handler @@ -301,65 +544,17 @@ (with-exception-handler (const #t) (lambda () - (log-msg 'ERROR "exception in submit builds thread: " exn)) + (log-msg 'ERROR "exception in submit builds fiber: " exn)) #:unwind? #t)) (lambda () - (for-each - (lambda (repository-id) - (for-each - (lambda (branch) - (let* ((branch-revisions - (guix-data-service-request guix-data-service - (string-append - "/repository/" - (number->string repository-id) - "/branch/" - branch - ".json"))) - (unseen-revisions - (filter-map (lambda (entry) - (let ((commit (assoc-ref entry "commit-hash"))) - (and (not (hash-ref processed-commits-hash - commit)) - (assoc-ref entry "data_available") - commit))) - (vector->list - (assoc-ref branch-revisions - "revisions"))))) - (log-msg 'DEBUG (length unseen-revisions) - " unseen revisions") - (for-each - (lambda (commit) - (submit-builds-for-revision - submit-build/async - (build-from-guix-data-service-build-coordinator-url - specification) - guix-data-service - commit - systems-and-targets - (build-from-guix-data-service-build-priority - specification) - #:submit-builds-for-channel-instances? - (build-from-guix-data-service-submit-builds-for-channel-instances? - specification) - #:guix-data-service-build-server-id - (build-from-guix-data-service-data-service-build-server-id - specification) - #:branch branch) - (record-revision-as-processed commit)) - (reverse unseen-revisions)))) - (let ((all-branches (all-repository-branches guix-data-service - repository-id)) - (specified-branches - (build-from-guix-data-service-branches - specification))) - (or specified-branches - all-branches)))) - (all-repository-ids guix-data-service)) - - (while (not (= (count-jobs) 0)) - ;; Wait until all the builds have been submitted - (sleep 5)) - - (sleep 60)) + (with-throw-handler #t + (lambda () + (submit-builds-pass submit-build/fiberized + guix-data-service + specification + processed-commits-hash + record-revision-as-processed + systems-and-targets)) + (lambda _ + (backtrace)))) #:unwind? #t))))))) diff --git a/bffe/server.scm b/bffe/server.scm index 79376d6..ff36a44 100644 --- a/bffe/server.scm +++ b/bffe/server.scm @@ -50,7 +50,7 @@ call-with-delay-logging retry-on-error)) #:use-module ((guix-build-coordinator utils fibers) - #:select (run-server/patched call-with-sigint)) + #:select (run-server/patched)) #:use-module (guix-build-coordinator client-communication) #:use-module (bffe config) #:use-module (bffe view util) @@ -58,7 +58,9 @@ #:use-module (bffe view build) #:use-module (bffe view agent) #:use-module (bffe view activity) - #:export (start-bffe-web-server)) + #:export (http-get* + + start-bffe-web-server)) ;; TODO Work around this causing problems with backtraces ;; https://github.com/wingo/fibers/issues/76 @@ -600,55 +602,45 @@ pid-file metrics-registry) - (let ((finished? (make-condition))) - (call-with-sigint - (lambda () - (run-fibers - (lambda () - (let* ((state-channel - (make-state-channel - event-source)) - (initial-state-id - (retry-on-error - (lambda () - (assoc-ref - (get-state state-channel) - "state_id")) - #:times 6 - #:delay 10))) - (simple-format #t "Starting from state ~A\n" - initial-state-id) - - (let* ((events-channel - get-state-id - (make-events-channel - event-source - initial-state-id - #:metrics-registry metrics-registry)) - (controller - (apply make-controller assets-directory - metrics-registry - events-channel state-channel - event-source - controller-args))) - - ;; Wait until the events channel catches up - (while (< (get-state-id) initial-state-id) - (sleep 1)) - - (when pid-file - (call-with-output-file pid-file - (lambda (port) - (simple-format port "~A\n" (getpid))))) - - (simple-format #t "Starting the server\n") - (run-server/patched (lambda (request body) - (apply values - (handler request body controller))) - #:host host - #:port port))) - - (wait finished?)) - #:hz 10 - #:parallelism 4)) - finished?))) + (let* ((state-channel + (make-state-channel + event-source)) + (initial-state-id + (retry-on-error + (lambda () + (assoc-ref + (get-state state-channel) + "state_id")) + #:times 6 + #:delay 10))) + (simple-format #t "Starting from state ~A\n" + initial-state-id) + + (let* ((events-channel + get-state-id + (make-events-channel + event-source + initial-state-id + #:metrics-registry metrics-registry)) + (controller + (apply make-controller assets-directory + metrics-registry + events-channel state-channel + event-source + controller-args))) + + ;; Wait until the events channel catches up + (while (< (get-state-id) initial-state-id) + (sleep 1)) + + (when pid-file + (call-with-output-file pid-file + (lambda (port) + (simple-format port "~A\n" (getpid))))) + + (simple-format #t "Starting the server\n") + (run-server/patched (lambda (request body) + (apply values + (handler request body controller))) + #:host host + #:port port)))) |