diff options
author | Christopher Baines <mail@cbaines.net> | 2024-10-17 17:10:25 +0200 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2024-10-27 14:39:50 +0000 |
commit | b6551842d10e0a36cc9dd38d8beabe37b89459c7 (patch) | |
tree | c686b565ec3e054c846ef1d121792ce117d3335b | |
parent | f1071cbd4d4a5a9a2026fe32e20777cc628f4686 (diff) | |
download | data-service-b6551842d10e0a36cc9dd38d8beabe37b89459c7.tar data-service-b6551842d10e0a36cc9dd38d8beabe37b89459c7.tar.gz |
Increase parallelism when loading revisions
-rw-r--r-- | guix-data-service/jobs/load-new-guix-revision.scm | 442 |
1 files changed, 240 insertions, 202 deletions
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index f4968de..f52b8e1 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -107,7 +107,7 @@ missing-store-item-error? (item missing-store-item-error-item)) -(define (retry-on-missing-store-item thunk) +(define* (retry-on-missing-store-item thunk #:key on-exception) (with-exception-handler (lambda (exn) (if (missing-store-item-error? exn) @@ -116,6 +116,7 @@ "missing store item ~A, retrying ~A\n" (missing-store-item-error-item exn) thunk) + (when on-exception (on-exception)) (retry-on-missing-store-item thunk)) (raise-exception exn))) thunk @@ -1691,7 +1692,8 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" inf)))) -(define* (extract-information-from db-conn guix-revision-id commit +(define* (extract-information-from db-conn guix-revision-id-promise + commit guix-source store-item guix-derivation utility-thread-channel @@ -1885,9 +1887,10 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (let ((package-ids (fibers-force package-ids-promise))) (with-resource-from-pool postgresql-connection-pool conn - (insert-guix-revision-lint-checkers conn - guix-revision-id - lint-checker-ids) + (insert-guix-revision-lint-checkers + conn + (fibers-force guix-revision-id-promise) + lint-checker-ids) (let ((lint-warning-ids (insert-lint-warnings @@ -1897,9 +1900,10 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" lint-warnings-data))) (chunk-for-each! (lambda (lint-warning-ids-chunk) - (insert-guix-revision-lint-warnings conn - guix-revision-id - lint-warning-ids-chunk)) + (insert-guix-revision-lint-warnings + conn + (fibers-force guix-revision-id-promise) + lint-warning-ids-chunk)) 5000 lint-warning-ids))))))) @@ -1913,62 +1917,66 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (define chunk-size 1000) - (define (process-system-and-target system target) + (define (get-derivations system target) + (let ((derivations-vector (make-vector packages-count))) + (with-time-logging + (simple-format #f "getting derivations for ~A" (cons system target)) + (let loop ((start-index 0)) + (let* ((count + (if (>= (+ start-index chunk-size) packages-count) + (- packages-count start-index) + chunk-size)) + (chunk + (call-with-inferior + (lambda (inferior inferior-store) + (ensure-gds-inferior-packages-defined! inferior) + + (inferior-package-derivations + inferior-store + inferior + system + target + start-index + count))))) + (vector-copy! derivations-vector + start-index + chunk) + (unless (>= (+ start-index chunk-size) packages-count) + (loop (+ start-index chunk-size)))))) + derivations-vector)) + + (define (process-system-and-target system target get-derivations) (with-time-logging (simple-format #f "processing derivations for ~A" (cons system target)) - (let ((derivations-vector (make-vector packages-count))) - (with-time-logging - (simple-format #f "getting derivations for ~A" (cons system target)) - (let loop ((start-index 0)) - (let* ((count - (if (>= (+ start-index chunk-size) packages-count) - (- packages-count start-index) - chunk-size)) - (chunk - (call-with-inferior - (lambda (inferior inferior-store) - (ensure-gds-inferior-packages-defined! inferior) - - (inferior-package-derivations - inferior-store - inferior - system - target - start-index - count))))) - (vector-copy! derivations-vector - start-index - chunk) - (unless (>= (+ start-index chunk-size) packages-count) - (loop (+ start-index chunk-size)))))) - - (let* ((derivation-ids + (let* ((derivations-vector (get-derivations system target)) + (derivation-ids + (with-time-logging + (simple-format #f "derivation-file-names->derivation-ids (~A ~A)" + system target) + (derivation-file-names->derivation-ids/fiberized + derivations-vector))) + (guix-revision-id + (fibers-force guix-revision-id-promise)) + (package-ids (fibers-force package-ids-promise)) + (package-derivation-ids + (with-resource-from-pool postgresql-connection-pool conn (with-time-logging - (simple-format #f "derivation-file-names->derivation-ids (~A ~A)" + (simple-format #f "insert-package-derivations (~A ~A)" system target) - (derivation-file-names->derivation-ids/fiberized - derivations-vector)))) - - (let* ((package-ids (fibers-force package-ids-promise)) - (package-derivation-ids - (with-resource-from-pool postgresql-connection-pool conn - (with-time-logging - (simple-format #f "insert-package-derivations (~A ~A)" - system target) - (insert-package-derivations conn - system - (or target "") - package-ids - derivation-ids))))) - (chunk-for-each! - (lambda (package-derivation-ids-chunk) - (with-resource-from-pool postgresql-connection-pool conn - (insert-guix-revision-package-derivations - conn - guix-revision-id - package-derivation-ids-chunk))) - 2000 - package-derivation-ids))))) + (insert-package-derivations conn + system + (or target "") + package-ids + derivation-ids))))) + (chunk-for-each! + (lambda (package-derivation-ids-chunk) + (with-resource-from-pool postgresql-connection-pool conn + (insert-guix-revision-package-derivations + conn + guix-revision-id + package-derivation-ids-chunk))) + 2000 + package-derivation-ids))) (with-resource-from-pool postgresql-connection-pool conn (with-time-logging @@ -1977,23 +1985,24 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" system target) (insert-guix-revision-package-derivation-distribution-counts conn - guix-revision-id + (fibers-force guix-revision-id-promise) (number->string (system->system-id conn system)) (or target ""))))) - (let ((process-system-and-target/fiberized - (fiberize process-system-and-target + (let ((get-derivations/fiberized + (fiberize get-derivations #:parallelism parallelism))) (par-map& (match-lambda ((system . target) (retry-on-missing-store-item (lambda () - (process-system-and-target/fiberized system target))))) - (call-with-inferior - (lambda (inferior inferior-store) - (inferior-fetch-system-target-pairs inferior)))))) + (process-system-and-target system target + get-derivations/fiberized))))) + (call-with-inferior + (lambda (inferior inferior-store) + (inferior-fetch-system-target-pairs inferior)))))) (define (extract-and-store-system-tests) (if skip-system-tests? @@ -2027,7 +2036,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (with-resource-from-pool postgresql-connection-pool conn (insert-system-tests-for-guix-revision conn - guix-revision-id + (fibers-force guix-revision-id-promise) data-with-derivation-ids))))))) (with-time-logging @@ -2124,34 +2133,48 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (channel-for-commit (channel (name 'guix) (url git-repository-url) - (commit commit))) - (guix-source - channel-derivations-by-system - guix-revision-id - (retry-on-missing-store-item - (lambda () - (let ((guix-source - channel-derivations-by-system - (channel->source-and-derivations-by-system - conn - channel-for-commit - fetch-with-authentication? - #:parallelism parallelism))) - (let ((guix-revision-id - (load-channel-instances utility-thread-channel - git-repository-id commit - channel-derivations-by-system))) - (values guix-source - channel-derivations-by-system - guix-revision-id))))))) - (let ((store-item - guix-derivation - (channel-derivations-by-system->guix-store-item - channel-derivations-by-system))) + (commit commit)))) + + (define channel-derivations-by-system-promise + (fibers-delay + (lambda () + (channel->source-and-derivations-by-system + conn + channel-for-commit + fetch-with-authentication? + #:parallelism parallelism)))) + + (define guix-revision-id-promise + (fibers-delay + (lambda () + (retry-on-missing-store-item + (lambda () + (let ((guix-source + channel-derivations-by-system + (fibers-force channel-derivations-by-system-promise))) + (load-channel-instances utility-thread-channel + git-repository-id commit + channel-derivations-by-system))) + #:on-exception + (lambda () + (fibers-promise-reset channel-derivations-by-system-promise)))))) + + ;; Prompt getting the guix-revision-id as soon as possible + (spawn-fiber + (lambda () + (fibers-force guix-revision-id-promise))) + + (let* ((guix-source + channel-derivations-by-system + (fibers-force channel-derivations-by-system-promise)) + (store-item + guix-derivation + (channel-derivations-by-system->guix-store-item + channel-derivations-by-system))) (if store-item (and (extract-information-from conn - guix-revision-id + guix-revision-id-promise commit guix-source store-item guix-derivation utility-thread-channel @@ -2166,21 +2189,22 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (with-time-logging "inserting channel news entries" (insert-channel-news-entries-for-guix-revision conn - guix-revision-id + (fibers-force guix-revision-id-promise) (channel-news-for-commit channel-for-commit commit))) (begin (simple-format #t "debug: importing channel news not supported\n") #t)) - (update-package-derivations-table conn - git-repository-id - guix-revision-id - commit) + (update-package-derivations-table + conn + git-repository-id + (fibers-force guix-revision-id-promise) + commit) (with-time-logging "updating builds.derivation_output_details_set_id" (update-builds-derivation-output-details-set-id conn - (string->number guix-revision-id)))) + (string->number (fibers-force guix-revision-id-promise))))) (begin (simple-format #t "Failed to generate store item for ~A\n" commit) @@ -2572,109 +2596,123 @@ SKIP LOCKED") (define* (process-load-new-guix-revision-job id #:key skip-system-tests? extra-inferior-environment-variables parallelism) - (with-postgresql-connection - (simple-format #f "load-new-guix-revision ~A" id) - (lambda (conn) - ;; Fix the hash encoding of derivation_output_details. This'll only run - ;; once on any given database, but is kept here just to make sure any - ;; instances have the data updated. - (fix-derivation-output-details-hash-encoding conn) + (define result + (with-postgresql-connection + (simple-format #f "load-new-guix-revision ~A" id) + (lambda (conn) + ;; Fix the hash encoding of derivation_output_details. This'll only run + ;; once on any given database, but is kept here just to make sure any + ;; instances have the data updated. + (fix-derivation-output-details-hash-encoding conn) + + (exec-query conn "BEGIN") + + (spawn-fiber + (lambda () + (while #t + (sleep 30) + + (let ((stats (gc-stats))) + (simple-format + (current-error-port) + "process-job heap: ~a MiB used (~a MiB heap)~%" + (round + (/ (- (assoc-ref stats 'heap-size) + (assoc-ref stats 'heap-free-size)) + (expt 2. 20))) + (round + (/ (assoc-ref stats 'heap-size) + (expt 2. 20)))))))) - (exec-query conn "BEGIN") + (match (select-job-for-update conn id) + (((id commit source git-repository-id)) - (spawn-fiber - (lambda () - (while #t - (sleep 30) + ;; With a separate connection, outside of the transaction so the event + ;; gets persisted regardless. + (with-postgresql-connection + (simple-format #f "load-new-guix-revision ~A start-event" id) + (lambda (start-event-conn) + (record-job-event start-event-conn id "start"))) - (let ((stats (gc-stats))) - (simple-format - (current-error-port) - "process-job heap: ~a MiB used (~a MiB heap)~%" - (round - (/ (- (assoc-ref stats 'heap-size) - (assoc-ref stats 'heap-free-size)) - (expt 2. 20))) - (round - (/ (assoc-ref stats 'heap-size) - (expt 2. 20)))))))) - - (match (select-job-for-update conn id) - (((id commit source git-repository-id)) - - ;; With a separate connection, outside of the transaction so the event - ;; gets persisted regardless. - (with-postgresql-connection - (simple-format #f "load-new-guix-revision ~A start-event" id) - (lambda (start-event-conn) - (record-job-event start-event-conn id "start"))) - - (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n" - id commit source) - - (if (eq? - (with-time-logging (string-append "processing revision " commit) - (with-exception-handler - (const #f) - (lambda () - (with-throw-handler #t - (lambda () - (load-new-guix-revision - conn - git-repository-id - commit - #:skip-system-tests? #t - #:extra-inferior-environment-variables - extra-inferior-environment-variables - #:parallelism parallelism)) - (lambda (key . args) - (simple-format (current-error-port) - "error: load-new-guix-revision: ~A ~A\n" - key args) - (backtrace)))) - #:unwind? #t)) - #t) - (begin - (record-job-succeeded conn id) - (record-job-event conn id "success") - (exec-query conn "COMMIT") - - (with-time-logging - "vacuuming package derivations by guix revision range table" - (vacuum-package-derivations-table conn)) - - (with-time-logging - "vacuum-derivation-inputs-table" - (vacuum-derivation-inputs-table conn)) - - (match (exec-query - conn - "SELECT reltuples::bigint FROM pg_class WHERE relname = 'derivation_inputs'") - (((rows)) - ;; Don't attempt counting distinct values if there are too - ;; many rows, as that is far to slow and could use up all the - ;; disk space. - (when (< (string->number rows) - 1000000000) - (with-time-logging - "update-derivation-inputs-statistics" - (update-derivation-inputs-statistics conn))))) - - (with-time-logging - "vacuum-derivation-outputs-table" - (vacuum-derivation-outputs-table conn)) - - (with-time-logging - "update-derivation-outputs-statistics" - (update-derivation-outputs-statistics conn)) - - #t) - (begin - (exec-query conn "ROLLBACK") - (record-job-event conn id "failure") - - #f))) - (() - (exec-query conn "ROLLBACK") - (simple-format #t "job ~A not found to be processed\n" - id)))))) + (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n" + id commit source) + + (if (eq? + (with-time-logging (string-append "processing revision " commit) + (with-exception-handler + (const #f) + (lambda () + (with-throw-handler #t + (lambda () + (load-new-guix-revision + conn + git-repository-id + commit + #:skip-system-tests? #t + #:extra-inferior-environment-variables + extra-inferior-environment-variables + #:parallelism parallelism)) + (lambda (key . args) + (simple-format (current-error-port) + "error: load-new-guix-revision: ~A ~A\n" + key args) + (backtrace)))) + #:unwind? #t)) + #t) + (begin + (record-job-succeeded conn id) + (record-job-event conn id "success") + (exec-query conn "COMMIT") + + #t) + (begin + (exec-query conn "ROLLBACK") + (record-job-event conn id "failure") + + #f))) + (() + (exec-query conn "ROLLBACK") + (simple-format #t "job ~A not found to be processed\n" + id)))))) + + (when result + (parallel-via-fibers + (with-postgresql-connection + (simple-format #f "post load-new-guix-revision ~A" id) + (lambda (conn) + (with-time-logging + "vacuuming package derivations by guix revision range table" + (vacuum-package-derivations-table conn)))) + + (with-postgresql-connection + (simple-format #f "post load-new-guix-revision ~A" id) + (lambda (conn) + (with-time-logging + "vacuum-derivation-inputs-table" + (vacuum-derivation-inputs-table conn)) + + (match (exec-query + conn + "SELECT reltuples::bigint FROM pg_class WHERE relname = 'derivation_inputs'") + (((rows)) + ;; Don't attempt counting distinct values if there are too + ;; many rows, as that is far to slow and could use up all the + ;; disk space. + (when (< (string->number rows) + 1000000000) + (with-time-logging + "update-derivation-inputs-statistics" + (update-derivation-inputs-statistics conn))))))) + + (with-postgresql-connection + (simple-format #f "post load-new-guix-revision ~A" id) + (lambda (conn) + (with-time-logging + "vacuum-derivation-outputs-table" + (vacuum-derivation-outputs-table conn)) + + (with-time-logging + "update-derivation-outputs-statistics" + (update-derivation-outputs-statistics conn)))))) + + result) |