diff options
-rw-r--r-- | guix-data-service/jobs.scm | 16 | ||||
-rw-r--r-- | guix-data-service/jobs/load-new-guix-revision.scm | 777 | ||||
-rw-r--r-- | scripts/guix-data-service-process-job.in | 5 | ||||
-rw-r--r-- | scripts/guix-data-service-process-jobs.in | 14 |
4 files changed, 490 insertions, 322 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm index e2294e9..8217d52 100644 --- a/guix-data-service/jobs.scm +++ b/guix-data-service/jobs.scm @@ -125,7 +125,8 @@ guix-data-service: error: missing log line: ~A (define* (process-jobs conn #:key max-processes latest-branch-revision-max-processes - skip-system-tests?) + skip-system-tests? + per-job-parallelism) (define (fetch-new-jobs) (fetch-unlocked-jobs conn)) @@ -133,11 +134,14 @@ guix-data-service: error: missing log line: ~A (let ((log-port (start-thread-for-process-output job-id))) (spawn "guix-data-service-process-job" - (cons* "guix-data-service-process-job" - job-id - (if skip-system-tests? - '("--skip-system-tests") - '())) + `("guix-data-service-process-job" + ,job-id + ,@(if skip-system-tests? + '("--skip-system-tests") + '()) + ,@(if per-job-parallelism + (list (simple-format #f "--parallelism=~A" per-job-parallelism)) + '())) #:output log-port #:error log-port))) diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index a3c6527..a07baa2 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -24,10 +24,13 @@ #:use-module (ice-9 threads) #:use-module (ice-9 textual-ports) #:use-module (ice-9 hash-table) + #:use-module (ice-9 suspendable-ports) + #:use-module ((ice-9 ports internal) #:select (port-poll)) #:use-module (rnrs exceptions) #:use-module (json) #:use-module (squee) #:use-module (fibers) + #:use-module (fibers channels) #:use-module (guix monads) #:use-module (guix store) #:use-module (guix channels) @@ -185,7 +188,7 @@ (let ((system-test-data (with-time-logging "getting system tests" - (inferior-eval-with-store inf store extract)))) + (inferior-eval-with-store/non-blocking inf store extract)))) (for-each (lambda (derivation-file-names-by-system) (for-each (lambda (derivation-file-name) @@ -342,9 +345,11 @@ #:unwind? #t))) gds-inferior-packages)))) + (ensure-gds-inferior-packages-defined! inf) + (with-time-logging (simple-format #f "getting ~A lint warnings" checker-name) - (inferior-eval-with-store + (inferior-eval-with-store/non-blocking inf store lint-warnings-for-checker))) @@ -588,12 +593,13 @@ (with-time-logging (simple-format #f "getting derivations for ~A" (cons system target)) - (inferior-eval-with-store + (inferior-eval-with-store/non-blocking inf store proc))) -(define (sort-and-deduplicate-inferior-packages packages) +(define (sort-and-deduplicate-inferior-packages packages + pkg-to-replacement-hash-table) (pair-fold (lambda (pair result) (if (null? (cdr pair)) @@ -604,8 +610,8 @@ (b-name (inferior-package-name b)) (a-version (inferior-package-version a)) (b-version (inferior-package-version b)) - (a-replacement (inferior-package-replacement a)) - (b-replacement (inferior-package-replacement b))) + (a-replacement (hashq-ref pkg-to-replacement-hash-table a)) + (b-replacement (hashq-ref pkg-to-replacement-hash-table b))) (if (and (string=? a-name b-name) (string=? a-version b-version) (eq? a-replacement b-replacement)) @@ -638,8 +644,24 @@ b-name))))))) (define (inferior-packages-plus-replacements inf) - (let* ((packages (inferior-packages inf)) - (replacements (filter-map inferior-package-replacement packages)) + (let* ((packages + ;; The use of force in (guix inferior) introduces a continuation + ;; barrier + (call-with-temporary-thread + (lambda () + (inferior-packages inf)))) + (replacements (map inferior-package-replacement packages)) + (pkg-to-replacement-hash-table + (let ((ht (make-hash-table))) + (for-each + (lambda (pkg replacement) + (when replacement + (hashq-set! ht + pkg + replacement))) + packages + replacements) + ht)) (non-exported-replacements (let ((package-id-hash-table (make-hash-table))) (for-each (lambda (pkg) @@ -648,28 +670,30 @@ #t)) packages) - (filter (lambda (pkg) - (eq? #f - (hash-ref package-id-hash-table - (inferior-package-id pkg)))) - replacements))) + (filter + (lambda (pkg) + (and pkg + (eq? #f + (hash-ref package-id-hash-table + (inferior-package-id pkg))))) + replacements))) (deduplicated-packages ;; This isn't perfect, sometimes there can be two packages with the ;; same name and version, but different derivations. Guix will warn ;; about this case though, generally this means only one of the ;; packages should be exported. - (sort-and-deduplicate-inferior-packages - (append! packages non-exported-replacements))) + (call-with-temporary-thread + (lambda () + ;; TODO Sort introduces a continuation barrier + (sort-and-deduplicate-inferior-packages + (append! packages non-exported-replacements) + pkg-to-replacement-hash-table)))) (deduplicated-packages-length (length deduplicated-packages))) (inferior-eval - `(use-modules (srfi srfi-43)) - inf) - - (inferior-eval `(define gds-inferior-packages (make-vector ,deduplicated-packages-length)) inf) @@ -685,9 +709,14 @@ (list ,@(map inferior-package-id deduplicated-packages))) inf) - (list->vector deduplicated-packages))) + (values (list->vector deduplicated-packages) + pkg-to-replacement-hash-table))) + +(define (ensure-gds-inferior-packages-defined! inf) + (unless (inferior-eval '(defined? 'gds-inferior-packages) inf) + (inferior-packages-plus-replacements inf))) -(define* (all-inferior-packages-data inf packages) +(define* (all-inferior-packages-data inf packages pkg-to-replacement-hash-table) (define inferior-package-id->packages-index-hash-table (let ((hash-table (make-hash-table))) (vector-for-each @@ -717,7 +746,7 @@ (package-replacement-data (vector-map (lambda (_ pkg) - (let ((replacement (inferior-package-replacement pkg))) + (let ((replacement (hashq-ref pkg-to-replacement-hash-table pkg))) (if replacement ;; I'm not sure if replacements can themselves be ;; replaced, but I do know for sure that there are @@ -726,8 +755,16 @@ ;; example). ;; ;; So this might be #f in these cases - (hash-ref inferior-package-id->packages-index-hash-table - (inferior-package-id pkg)) + (let ((index + (hash-ref inferior-package-id->packages-index-hash-table + (inferior-package-id replacement)))) + (unless index + (simple-format + (current-error-port) + "warning: replacement for ~A (~A) is unknown\n" + pkg + replacement)) + index) #f))) packages))) @@ -743,13 +780,14 @@ (let* ((names (assq-ref inferior-packages-data 'names)) (versions (assq-ref inferior-packages-data 'versions)) (package-license-set-ids - (inferior-packages->license-set-ids - conn - (inferior-packages->license-id-lists - conn - ;; TODO Don't needlessly convert - (vector->list - (assq-ref inferior-packages-data 'license-data))))) + (with-time-logging "inserting package license sets" + (inferior-packages->license-set-ids + conn + (inferior-packages->license-id-lists + conn + ;; TODO Don't needlessly convert + (vector->list + (assq-ref inferior-packages-data 'license-data)))))) (all-package-metadata-ids new-package-metadata-ids (with-time-logging "inserting package metadata entries" @@ -898,15 +936,80 @@ (build-derivations store (list derivation))) (derivation->output-path derivation))) -(define (channel->source-and-derivation-file-names-by-system conn store channel - fetch-with-authentication?) +(define (non-blocking-port port) + "Make PORT non-blocking and return it." + (let ((flags (fcntl port F_GETFL))) + (when (zero? (logand O_NONBLOCK flags)) + (fcntl port F_SETFL (logior O_NONBLOCK flags))) + port)) + +(define (ensure-non-blocking-store-connection store) + (match (store-connection-socket store) + ((? file-port? port) + (non-blocking-port port)) + (_ #f))) + +(define (call-with-temporary-blocking-store store proc) + (let* ((port (store-connection-socket store)) + (flags (fcntl port F_GETFL))) + (unless (zero? (logand O_NONBLOCK flags)) + (fcntl port F_SETFL (logxor O_NONBLOCK flags))) + (call-with-values + (lambda () + (proc store)) + (lambda vals + (fcntl port F_SETFL (logior O_NONBLOCK flags)) + (apply values vals))))) + +(define (make-inferior-non-blocking! inferior) + (non-blocking-port + ((@@ (guix inferior) inferior-socket) inferior))) + +(define (call-with-temporary-thread thunk) + (let ((channel (make-channel))) + (call-with-new-thread + (lambda () + (parameterize + ((current-read-waiter (lambda (port) (port-poll port "r"))) + (current-write-waiter (lambda (port) (port-poll port "w")))) + + (with-exception-handler + (lambda (exn) + (put-message channel `(exception ,exn))) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values thunk + (lambda values + (put-message channel `(values ,@values))))) + (lambda _ + (backtrace)))) + #:unwind? #t)))) + + (match (get-message channel) + (('values . results) + (apply values results)) + (('exception . args) + (apply throw args))))) + +(define (inferior-eval-with-store/non-blocking inferior store proc) + (call-with-temporary-thread + (lambda () + (inferior-eval-with-store inferior store proc)))) + +(define* (channel->source-and-derivation-file-names-by-system + conn store channel + fetch-with-authentication? + #:key parallelism) + (define use-container? (defined? 'open-inferior/container (resolve-module '(guix inferior)))) - (define (inferior-code channel-instance systems) + (define (inferior-code channel-instance system) `(lambda (store) - (let* ((instances + (let* ((system ,system) + (instances (list (channel-instance (channel (name ',(channel-name channel)) @@ -915,61 +1018,57 @@ (commit ,(channel-commit channel))) ,(channel-instance-commit channel-instance) ,(channel-instance-checkout channel-instance))))) - (map - (lambda (system) - (simple-format - (current-error-port) - "guix-data-service: computing the derivation-file-name for ~A\n" - system) - - (let ((manifest - (catch #t - (lambda () - ((channel-instances->manifest instances #:system system) store)) - (lambda (key . args) - (simple-format - (current-error-port) - "error: while computing manifest entry derivation for ~A\n" - system) - (simple-format - (current-error-port) - "error ~A: ~A\n" key args) - #f)))) - (define (add-tmp-root-and-return-drv drv) - (add-temp-root store drv) - drv) - - `(,system - . - ((manifest-entry-item - . ,(and manifest + (simple-format + (current-error-port) + "guix-data-service: computing the derivation-file-name for ~A\n" + system) + + (let ((manifest + (catch #t + (lambda () + ((channel-instances->manifest instances #:system system) store)) + (lambda (key . args) + (simple-format + (current-error-port) + "error: while computing manifest entry derivation for ~A\n" + system) + (simple-format + (current-error-port) + "error ~A: ~A\n" key args) + #f)))) + (define (add-tmp-root-and-return-drv drv) + (add-temp-root store drv) + drv) + + `((manifest-entry-item + . ,(and manifest + (add-tmp-root-and-return-drv + (derivation-file-name + (manifest-entry-item + (first + (manifest-entries manifest))))))) + (profile + . ,(catch #t + (lambda () + (and manifest (add-tmp-root-and-return-drv (derivation-file-name - (manifest-entry-item - (first - (manifest-entries manifest))))))) - (profile - . ,(catch #t - (lambda () - (and manifest - (add-tmp-root-and-return-drv - (derivation-file-name - (parameterize ((%current-system system)) - (run-with-store store - (profile-derivation - manifest - #:hooks %channel-profile-hooks))))))) - (lambda (key . args) - (simple-format - (current-error-port) - "error: while computing profile derivation for ~A\n" - system) - (simple-format - (current-error-port) - "error ~A: ~A\n" key args) - #f))))))) - (list ,@systems))))) + (parameterize ((%current-system system)) + (run-with-store store + (profile-derivation + manifest + #:hooks %channel-profile-hooks))))))) + (lambda (key . args) + (simple-format + (current-error-port) + "error: while computing profile derivation for ~A\n" + system) + (simple-format + (current-error-port) + "error ~A: ~A\n" key args) + #f)))))))) + (define (start-inferior inferior-store) (let ((inferior (if use-container? (open-inferior/container @@ -985,85 +1084,99 @@ (open-inferior (guix-store-path store) #:error-port (current-error-port)))))) - (define (start-inferior-and-return-derivation-file-names) - ;; /etc is only missing if open-inferior/container has been used - (when use-container? - (inferior-eval - '(begin - ;; Create /etc/pass, as %known-shorthand-profiles in (guix - ;; profiles) tries to read from this file. Because the environment - ;; is cleaned in build-self.scm, xdg-directory in (guix utils) - ;; falls back to accessing /etc/passwd. - (mkdir "/etc") - (call-with-output-file "/etc/passwd" - (lambda (port) - (display "root:x:0:0::/root:/bin/bash" port)))) - inferior)) - - (let ((channel-instance - ;; Obtain a session level lock here, to avoid conflicts with - ;; other jobs over the Git repository. - (with-advisory-session-lock/log-time - conn - 'latest-channel-instances - (lambda () - (first - (latest-channel-instances store - (list channel) - #:authenticate? - fetch-with-authentication?)))))) - (inferior-eval '(use-modules (srfi srfi-1) - (ice-9 history) - (guix channels) - (guix grafts) - (guix profiles)) - inferior) - (inferior-eval '(%graft? #f) - inferior) - (inferior-eval '(disable-value-history!) - inferior) - (inferior-eval '(define channel-instance - (@@ (guix channels) channel-instance)) - inferior) - - (let* ((systems - (inferior-eval '(@ (guix packages) %supported-systems) - inferior)) - (result - (inferior-eval-with-store - inferior - store - (inferior-code channel-instance systems)))) - - (close-inferior inferior) - - (cons - (channel-instance-checkout channel-instance) - result)))) - - (catch - #t - (lambda () - (with-throw-handler #t - start-inferior-and-return-derivation-file-names - (lambda (key . parameters) - (display (backtrace) (current-error-port)) - (display "\n" (current-error-port)) - (simple-format (current-error-port) - "error: channel->derivation-file-names-by-system: ~A: ~A\n" - key parameters)))) - (lambda args - (close-inferior inferior) - #f)))) - -(define (channel->source-and-derivations-by-system conn store channel - fetch-with-authentication?) + ;; /etc is only missing if open-inferior/container has been used + (when use-container? + (inferior-eval + '(begin + ;; Create /etc/pass, as %known-shorthand-profiles in (guix + ;; profiles) tries to read from this file. Because the environment + ;; is cleaned in build-self.scm, xdg-directory in (guix utils) + ;; falls back to accessing /etc/passwd. + (mkdir "/etc") + (call-with-output-file "/etc/passwd" + (lambda (port) + (display "root:x:0:0::/root:/bin/bash" port)))) + inferior)) + + (inferior-eval '(use-modules (srfi srfi-1) + (ice-9 history) + (guix channels) + (guix grafts) + (guix profiles)) + inferior) + (inferior-eval '(%graft? #f) + inferior) + (inferior-eval '(disable-value-history!) + inferior) + (inferior-eval '(define channel-instance + (@@ (guix channels) channel-instance)) + inferior) + + inferior)) + + (let* ((channel-instance + ;; Obtain a session level lock here, to avoid conflicts with + ;; other jobs over the Git repository. + (with-advisory-session-lock/log-time + conn + 'latest-channel-instances + (lambda () + ;; TODO (guix serialization) uses dynamic-wind + (call-with-temporary-thread + (lambda () + (first + (latest-channel-instances store + (list channel) + #:authenticate? + fetch-with-authentication?))))))) + (inferior-and-store-pool + (make-resource-pool + (lambda () + (let* ((inferior-store (open-connection)) + (inferior (start-inferior inferior-store))) + (ensure-non-blocking-store-connection inferior-store) + (make-inferior-non-blocking! inferior) + (cons inferior inferior-store))) + parallelism + #:min-size 0 + #:idle-seconds 10 + #:destructor (match-lambda + ((inferior . store) + (close-inferior inferior) + (close-connection store))))) + (systems + (with-resource-from-pool inferior-and-store-pool res + (match res + ((inferior . inferior-store) + (inferior-eval '(@ (guix packages) %supported-systems) + inferior))))) + (result + (par-map& + (lambda (system) + (with-resource-from-pool inferior-and-store-pool res + (match res + ((inferior . inferior-store) + (cons system + (inferior-eval-with-store/non-blocking + inferior + inferior-store + (inferior-code channel-instance system))))))) + systems))) + + (cons + (channel-instance-checkout channel-instance) + result))) + +(define* (channel->source-and-derivations-by-system conn store channel + fetch-with-authentication? + #:key parallelism) (match (with-time-logging "computing the channel derivation" (channel->source-and-derivation-file-names-by-system conn store channel - fetch-with-authentication?)) + fetch-with-authentication? + #:parallelism parallelism)) ((source . derivation-file-names-by-system) (for-each (match-lambda @@ -1148,17 +1261,9 @@ output))) -(define (start-inferior-for-data-extration store store-path) - (let* ((guix-locpath (getenv "GUIX_LOCPATH")) - (inf (let ((guix-locpath - ;; Augment the GUIX_LOCPATH to include glibc-locales from - ;; the Guix at store-path, this should mean that the - ;; inferior Guix works, even if it's build using a different - ;; glibc version - (string-append - (glibc-locales-for-guix-store-path store store-path) - "/lib/locale" - ":" guix-locpath))) +(define (start-inferior-for-data-extration store store-path guix-locpath) + (let* ((original-guix-locpath (getenv "GUIX_LOCPATH")) + (inf (begin ;; Unset the GUILE_LOAD_PATH and GUILE_LOAD_COMPILED_PATH to ;; avoid the values for these being used in the ;; inferior. Even though the inferior %load-path and @@ -1185,7 +1290,7 @@ (simple-format #t "debug: using open-inferior\n") (open-inferior store-path #:error-port (current-error-port))))))) - (setenv "GUIX_LOCPATH" guix-locpath) ; restore GUIX_LOCPATH + (setenv "GUIX_LOCPATH" original-guix-locpath) ; restore GUIX_LOCPATH (when (eq? inf #f) (error "error: inferior is #f")) @@ -1202,6 +1307,7 @@ (inferior-eval '(use-modules (srfi srfi-1) (srfi srfi-34) + (srfi srfi-43) (ice-9 history) (guix grafts) (guix derivations) @@ -1221,144 +1327,187 @@ (inferior-eval '(when (defined? '%graft?) (%graft? #f)) inf) + ;; TODO Have Guix make this easier + ((@@ (guix inferior) ensure-store-bridge!) inf) + (non-blocking-port ((@@ (guix inferior) inferior-bridge-socket) inf)) + inf)) (define* (extract-information-from conn store guix-revision-id commit guix-source store-path - #:key skip-system-tests?) - (simple-format #t "debug: extract-information-from: ~A\n" store-path) + #:key skip-system-tests? + parallelism) + + (define guix-locpath + ;; Augment the GUIX_LOCPATH to include glibc-locales from + ;; the Guix at store-path, this should mean that the + ;; inferior Guix works, even if it's build using a different + ;; glibc version + (string-append + (glibc-locales-for-guix-store-path store store-path) + "/lib/locale" + ":" (getenv "GUIX_LOCPATH"))) + + (define inf-and-store-pool + (make-resource-pool + (lambda () + (let* ((inferior-store (open-connection)) + (inferior (start-inferior-for-data-extration inferior-store + store-path + guix-locpath))) + (ensure-non-blocking-store-connection inferior-store) + (make-inferior-non-blocking! inferior) + (cons inferior inferior-store))) + parallelism + #:min-size 0 + #:idle-seconds 10 + #:destructor (match-lambda + ((inferior . store) + (close-inferior inferior) + (close-connection store))))) - (let ((inf (start-inferior-for-data-extration store store-path))) - (catch - #t - (lambda () - (let* ((packages - (with-time-logging "fetching inferior packages" - (inferior-packages-plus-replacements inf))) - (inferior-lint-checkers-data - (inferior-lint-checkers inf)) - (inferior-lint-warnings-data - (and inferior-lint-checkers-data - (with-time-logging "fetching inferior lint warnings" - (map - (match-lambda - ((checker-name _ network-dependent?) - (and (and (not network-dependent?) - ;; Running the derivation linter is - ;; currently infeasible - (not (eq? checker-name 'derivation))) - (inferior-lint-warnings inf - store - checker-name)))) - inferior-lint-checkers-data)))) - (inferior-system-target-pairs - (inferior-fetch-system-target-pairs inf)) - (inferior-packages-system-and-target-to-derivations-alist - (with-time-logging "getting inferior derivations" - (map - (match-lambda - ((system . target) - (cons (cons system target) - (inferior-package-derivations store - inf - system - target)))) - inferior-system-target-pairs))) - (inferior-system-tests - (if skip-system-tests? - (begin - (simple-format #t "debug: skipping system tests\n") - '()) - (with-time-logging "getting inferior system tests" - (all-inferior-system-tests inf store - guix-source commit)))) - (packages-data - (with-time-logging "getting all inferior package data" - (all-inferior-packages-data inf packages)))) + (simple-format #t "debug: extract-information-from: ~A\n" store-path) + (letpar& ((inferior-lint-checkers-and-warnings-data + (with-resource-from-pool inf-and-store-pool res + (match res + ((inferior . inferior-store) + (let ((inferior-lint-checkers-data + (inferior-lint-checkers inferior))) + (cons + inferior-lint-checkers-data + (if inferior-lint-checkers-data + (with-time-logging "fetching inferior lint warnings" + (map + (match-lambda + ((checker-name _ network-dependent?) + (and (and (not network-dependent?) + ;; Running the derivation linter is + ;; currently infeasible + (not (eq? checker-name 'derivation))) + (inferior-lint-warnings inferior + inferior-store + checker-name)))) + inferior-lint-checkers-data)) + #f))))))) + (inferior-packages-system-and-target-to-derivations-alist + (with-time-logging "getting inferior derivations" + (par-map& + (match-lambda + ((system . target) + (with-resource-from-pool inf-and-store-pool res + (match res + ((inferior . inferior-store) + (ensure-gds-inferior-packages-defined! inferior) + + (cons (cons system target) + (inferior-package-derivations inferior-store + inferior + system + target))))))) + (with-resource-from-pool inf-and-store-pool res + (match res + ((inferior . inferior-store) + (inferior-fetch-system-target-pairs inferior))))))) + (inferior-system-tests + (if skip-system-tests? + (begin + (simple-format #t "debug: skipping system tests\n") + '()) + (with-resource-from-pool inf-and-store-pool res + (match res + ((inferior . inferior-store) + (with-time-logging "getting inferior system tests" + (all-inferior-system-tests inferior inferior-store + guix-source commit))))))) + (packages-data + (with-time-logging "getting all inferior package data" + (with-resource-from-pool inf-and-store-pool res + (match res + ((inferior . inferior-store) + (with-time-logging "fetching inferior packages" + (let ((packages + pkg-to-replacement-hash-table + (inferior-packages-plus-replacements inferior))) + (all-inferior-packages-data inferior + packages + pkg-to-replacement-hash-table))))))))) + + (destroy-resource-pool inf-and-store-pool) + + (simple-format + #t "debug: finished loading information from inferior\n") + + (with-time-logging + "acquiring advisory transaction lock: load-new-guix-revision-inserts" + ;; Wait until this is the only transaction inserting data, to + ;; avoid any concurrency issues + (obtain-advisory-transaction-lock conn + 'load-new-guix-revision-inserts)) + (with-time-logging + "inserting data" + (let* ((package-ids + (insert-packages conn packages-data))) + (when inferior-lint-warnings + (let* ((lint-checker-ids + (lint-checkers->lint-checker-ids + conn + (map (match-lambda + ((name descriptions-by-locale network-dependent) + (list + name + network-dependent + (lint-checker-description-data->lint-checker-description-set-id + conn descriptions-by-locale)))) + (car inferior-lint-checkers-and-warnings-data)))) + (lint-warning-ids + (insert-lint-warnings + conn + package-ids + lint-checker-ids + (cdr inferior-lint-checkers-and-warnings-data)))) + (insert-guix-revision-lint-checkers conn + guix-revision-id + lint-checker-ids) + + (chunk-for-each! + (lambda (lint-warning-ids-chunk) + (insert-guix-revision-lint-warnings conn + guix-revision-id + lint-warning-ids-chunk)) + 5000 + lint-warning-ids))) + + (when inferior-system-tests + (insert-system-tests-for-guix-revision conn + guix-revision-id + inferior-system-tests)) + + (let* ((package-derivation-ids + (with-time-logging "inferior-data->package-derivation-ids" + (inferior-data->package-derivation-ids + conn + inf + package-ids + inferior-packages-system-and-target-to-derivations-alist))) + (ids-count + (length package-derivation-ids))) + (chunk-for-each! (lambda (package-derivation-ids-chunk) + (insert-guix-revision-package-derivations + conn + guix-revision-id + package-derivation-ids-chunk)) + 2000 + package-derivation-ids) (simple-format - #t "debug: finished loading information from inferior\n") - (close-inferior inf) - - (with-time-logging - "acquiring advisory transaction lock: load-new-guix-revision-inserts" - ;; Wait until this is the only transaction inserting data, to - ;; avoid any concurrency issues - (obtain-advisory-transaction-lock conn - 'load-new-guix-revision-inserts)) - (with-time-logging - "inserting data" - (let* ((package-ids - (insert-packages conn packages-data))) - (when inferior-lint-warnings - (let* ((lint-checker-ids - (lint-checkers->lint-checker-ids - conn - (map (match-lambda - ((name descriptions-by-locale network-dependent) - (list - name - network-dependent - (lint-checker-description-data->lint-checker-description-set-id - conn descriptions-by-locale)))) - inferior-lint-checkers-data))) - (lint-warning-ids - (insert-lint-warnings - conn - package-ids - lint-checker-ids - inferior-lint-warnings-data))) - (insert-guix-revision-lint-checkers conn - guix-revision-id - lint-checker-ids) - - (chunk-for-each! - (lambda (lint-warning-ids-chunk) - (insert-guix-revision-lint-warnings conn - guix-revision-id - lint-warning-ids-chunk)) - 5000 - lint-warning-ids))) - - (when inferior-system-tests - (insert-system-tests-for-guix-revision conn - guix-revision-id - inferior-system-tests)) - - (let* ((package-derivation-ids - (with-time-logging "inferior-data->package-derivation-ids" - (inferior-data->package-derivation-ids - conn - inf - package-ids - inferior-packages-system-and-target-to-derivations-alist))) - (ids-count - (length package-derivation-ids))) - (chunk-for-each! (lambda (package-derivation-ids-chunk) - (insert-guix-revision-package-derivations - conn - guix-revision-id - package-derivation-ids-chunk)) - 2000 - package-derivation-ids) - (simple-format - #t "Successfully loaded ~A package/derivation pairs\n" - ids-count)) + #t "Successfully loaded ~A package/derivation pairs\n" + ids-count)) - (with-time-logging - "insert-guix-revision-package-derivation-distribution-counts" - (insert-guix-revision-package-derivation-distribution-counts - conn - guix-revision-id))))) - #t) - (lambda (key . args) - (simple-format (current-error-port) - "Failed extracting information from commit: ~A\n\n" commit) - (simple-format (current-error-port) - " ~A ~A\n\n" key args) - #f) - (lambda (key . args) - (display-backtrace (make-stack #t) (current-error-port)))))) + (with-time-logging + "insert-guix-revision-package-derivation-distribution-counts" + (insert-guix-revision-package-derivation-distribution-counts + conn + guix-revision-id)))))) (prevent-inlining-for-tests extract-information-from) @@ -1409,7 +1558,7 @@ (prevent-inlining-for-tests load-channel-instances) (define* (load-new-guix-revision conn store git-repository-id commit - #:key skip-system-tests?) + #:key skip-system-tests? parallelism) (let* ((git-repository-fields (select-git-repository conn git-repository-id)) (git-repository-url @@ -1421,10 +1570,12 @@ (url git-repository-url) (commit commit))) (source-and-channel-derivations-by-system - (channel->source-and-derivations-by-system conn - store - channel-for-commit - fetch-with-authentication?)) + (channel->source-and-derivations-by-system + conn + store + channel-for-commit + fetch-with-authentication? + #:parallelism parallelism)) (guix-source (car source-and-channel-derivations-by-system)) (channel-derivations-by-system @@ -1442,7 +1593,8 @@ guix-revision-id commit guix-source store-item #:skip-system-tests? - skip-system-tests?) + skip-system-tests? + #:parallelism parallelism) (if (defined? 'channel-news-for-commit (resolve-module '(guix channels))) @@ -1817,13 +1969,15 @@ SKIP LOCKED") (define (with-store-connection f) (with-store store + (ensure-non-blocking-store-connection store) (set-build-options store #:fallback? #t) (f store))) (prevent-inlining-for-tests with-store-connection) -(define* (process-load-new-guix-revision-job id #:key skip-system-tests?) +(define* (process-load-new-guix-revision-job id #:key skip-system-tests? + parallelism) (with-postgresql-connection (simple-format #f "load-new-guix-revision ~A" id) (lambda (conn) @@ -1860,7 +2014,8 @@ SKIP LOCKED") store git-repository-id commit - #:skip-system-tests? #t)))) + #:skip-system-tests? #t + #:parallelism parallelism)))) (lambda (key . args) (simple-format (current-error-port) "error: load-new-guix-revision: ~A ~A\n" diff --git a/scripts/guix-data-service-process-job.in b/scripts/guix-data-service-process-job.in index fd7ab3f..633d8db 100644 --- a/scripts/guix-data-service-process-job.in +++ b/scripts/guix-data-service-process-job.in @@ -78,7 +78,8 @@ (lambda () (process-load-new-guix-revision-job job - #:skip-system-tests? (assq-ref opts 'skip-system-tests))) + #:skip-system-tests? (assq-ref opts 'skip-system-tests) + #:parallelism (assq-ref opts 'parallelism))) #:hz 0 - #:parallelism (assq-ref opts 'parallelism) + #:parallelism 1 #:drain? #t))))) diff --git a/scripts/guix-data-service-process-jobs.in b/scripts/guix-data-service-process-jobs.in index 6ad1ec9..da4f614 100644 --- a/scripts/guix-data-service-process-jobs.in +++ b/scripts/guix-data-service-process-jobs.in @@ -44,11 +44,17 @@ result))) (option '("skip-system-tests") #f #f (lambda (opt name _ result) - (alist-cons 'skip-system-tests #t result))))) + (alist-cons 'skip-system-tests #t result))) + (option '("per-job-parallelism") #t #f + (lambda (opt name arg result) + (alist-cons 'per-job-parallelism + (string->number arg) + result))))) (define %default-options ;; Alist of default option values - `((max-processes . ,default-max-processes))) + `((max-processes . ,default-max-processes) + (per-job-parallelism . 1))) (define (parse-options args) (args-fold @@ -77,4 +83,6 @@ (or (assq-ref opts 'latest-branch-revision-max-processes) (* 2 (assq-ref opts 'max-processes))) #:skip-system-tests? - (assq-ref opts 'skip-system-tests))))) + (assq-ref opts 'skip-system-tests) + #:per-job-parallelism + (assq-ref opts 'per-job-parallelism))))) |