aboutsummaryrefslogtreecommitdiff
path: root/guix-data-service
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2024-01-18 14:44:04 +0000
committerChristopher Baines <mail@cbaines.net>2024-01-18 15:34:40 +0000
commitc1d2f3a1b7ba20ad3f56840a160ee1ec08cd3a64 (patch)
treea8ac56ddf1ff543a380938d7c9c3173c854fce6b /guix-data-service
parent6842a432d668f3e42de0b416813f359beef9ae6f (diff)
downloaddata-service-c1d2f3a1b7ba20ad3f56840a160ee1ec08cd3a64.tar
data-service-c1d2f3a1b7ba20ad3f56840a160ee1ec08cd3a64.tar.gz
Add meaningful parallelism to processing jobs
Make parallel use of inferiors when computing channel instance derivations, and when extracting information about a revision. This should allow for some horizontal scalability, reducing the impact of additional systems for which derivations need computing. This commit also fixes an apparent issue with package replacements, as previously the wrong id was used, and this hid some issues around deduplication.
Diffstat (limited to 'guix-data-service')
-rw-r--r--guix-data-service/jobs.scm16
-rw-r--r--guix-data-service/jobs/load-new-guix-revision.scm777
2 files changed, 476 insertions, 317 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm
index e2294e9..8217d52 100644
--- a/guix-data-service/jobs.scm
+++ b/guix-data-service/jobs.scm
@@ -125,7 +125,8 @@ guix-data-service: error: missing log line: ~A
(define* (process-jobs conn #:key max-processes
latest-branch-revision-max-processes
- skip-system-tests?)
+ skip-system-tests?
+ per-job-parallelism)
(define (fetch-new-jobs)
(fetch-unlocked-jobs conn))
@@ -133,11 +134,14 @@ guix-data-service: error: missing log line: ~A
(let ((log-port (start-thread-for-process-output job-id)))
(spawn
"guix-data-service-process-job"
- (cons* "guix-data-service-process-job"
- job-id
- (if skip-system-tests?
- '("--skip-system-tests")
- '()))
+ `("guix-data-service-process-job"
+ ,job-id
+ ,@(if skip-system-tests?
+ '("--skip-system-tests")
+ '())
+ ,@(if per-job-parallelism
+ (list (simple-format #f "--parallelism=~A" per-job-parallelism))
+ '()))
#:output log-port
#:error log-port)))
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm
index a3c6527..a07baa2 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -24,10 +24,13 @@
#:use-module (ice-9 threads)
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 hash-table)
+ #:use-module (ice-9 suspendable-ports)
+ #:use-module ((ice-9 ports internal) #:select (port-poll))
#:use-module (rnrs exceptions)
#:use-module (json)
#:use-module (squee)
#:use-module (fibers)
+ #:use-module (fibers channels)
#:use-module (guix monads)
#:use-module (guix store)
#:use-module (guix channels)
@@ -185,7 +188,7 @@
(let ((system-test-data
(with-time-logging "getting system tests"
- (inferior-eval-with-store inf store extract))))
+ (inferior-eval-with-store/non-blocking inf store extract))))
(for-each (lambda (derivation-file-names-by-system)
(for-each (lambda (derivation-file-name)
@@ -342,9 +345,11 @@
#:unwind? #t)))
gds-inferior-packages))))
+ (ensure-gds-inferior-packages-defined! inf)
+
(with-time-logging (simple-format #f "getting ~A lint warnings"
checker-name)
- (inferior-eval-with-store
+ (inferior-eval-with-store/non-blocking
inf
store
lint-warnings-for-checker)))
@@ -588,12 +593,13 @@
(with-time-logging
(simple-format #f "getting derivations for ~A" (cons system target))
- (inferior-eval-with-store
+ (inferior-eval-with-store/non-blocking
inf
store
proc)))
-(define (sort-and-deduplicate-inferior-packages packages)
+(define (sort-and-deduplicate-inferior-packages packages
+ pkg-to-replacement-hash-table)
(pair-fold
(lambda (pair result)
(if (null? (cdr pair))
@@ -604,8 +610,8 @@
(b-name (inferior-package-name b))
(a-version (inferior-package-version a))
(b-version (inferior-package-version b))
- (a-replacement (inferior-package-replacement a))
- (b-replacement (inferior-package-replacement b)))
+ (a-replacement (hashq-ref pkg-to-replacement-hash-table a))
+ (b-replacement (hashq-ref pkg-to-replacement-hash-table b)))
(if (and (string=? a-name b-name)
(string=? a-version b-version)
(eq? a-replacement b-replacement))
@@ -638,8 +644,24 @@
b-name)))))))
(define (inferior-packages-plus-replacements inf)
- (let* ((packages (inferior-packages inf))
- (replacements (filter-map inferior-package-replacement packages))
+ (let* ((packages
+ ;; The use of force in (guix inferior) introduces a continuation
+ ;; barrier
+ (call-with-temporary-thread
+ (lambda ()
+ (inferior-packages inf))))
+ (replacements (map inferior-package-replacement packages))
+ (pkg-to-replacement-hash-table
+ (let ((ht (make-hash-table)))
+ (for-each
+ (lambda (pkg replacement)
+ (when replacement
+ (hashq-set! ht
+ pkg
+ replacement)))
+ packages
+ replacements)
+ ht))
(non-exported-replacements
(let ((package-id-hash-table (make-hash-table)))
(for-each (lambda (pkg)
@@ -648,28 +670,30 @@
#t))
packages)
- (filter (lambda (pkg)
- (eq? #f
- (hash-ref package-id-hash-table
- (inferior-package-id pkg))))
- replacements)))
+ (filter
+ (lambda (pkg)
+ (and pkg
+ (eq? #f
+ (hash-ref package-id-hash-table
+ (inferior-package-id pkg)))))
+ replacements)))
(deduplicated-packages
;; This isn't perfect, sometimes there can be two packages with the
;; same name and version, but different derivations. Guix will warn
;; about this case though, generally this means only one of the
;; packages should be exported.
- (sort-and-deduplicate-inferior-packages
- (append! packages non-exported-replacements)))
+ (call-with-temporary-thread
+ (lambda ()
+ ;; TODO Sort introduces a continuation barrier
+ (sort-and-deduplicate-inferior-packages
+ (append! packages non-exported-replacements)
+ pkg-to-replacement-hash-table))))
(deduplicated-packages-length
(length deduplicated-packages)))
(inferior-eval
- `(use-modules (srfi srfi-43))
- inf)
-
- (inferior-eval
`(define gds-inferior-packages
(make-vector ,deduplicated-packages-length))
inf)
@@ -685,9 +709,14 @@
(list ,@(map inferior-package-id deduplicated-packages)))
inf)
- (list->vector deduplicated-packages)))
+ (values (list->vector deduplicated-packages)
+ pkg-to-replacement-hash-table)))
+
+(define (ensure-gds-inferior-packages-defined! inf)
+ (unless (inferior-eval '(defined? 'gds-inferior-packages) inf)
+ (inferior-packages-plus-replacements inf)))
-(define* (all-inferior-packages-data inf packages)
+(define* (all-inferior-packages-data inf packages pkg-to-replacement-hash-table)
(define inferior-package-id->packages-index-hash-table
(let ((hash-table (make-hash-table)))
(vector-for-each
@@ -717,7 +746,7 @@
(package-replacement-data
(vector-map
(lambda (_ pkg)
- (let ((replacement (inferior-package-replacement pkg)))
+ (let ((replacement (hashq-ref pkg-to-replacement-hash-table pkg)))
(if replacement
;; I'm not sure if replacements can themselves be
;; replaced, but I do know for sure that there are
@@ -726,8 +755,16 @@
;; example).
;;
;; So this might be #f in these cases
- (hash-ref inferior-package-id->packages-index-hash-table
- (inferior-package-id pkg))
+ (let ((index
+ (hash-ref inferior-package-id->packages-index-hash-table
+ (inferior-package-id replacement))))
+ (unless index
+ (simple-format
+ (current-error-port)
+ "warning: replacement for ~A (~A) is unknown\n"
+ pkg
+ replacement))
+ index)
#f)))
packages)))
@@ -743,13 +780,14 @@
(let* ((names (assq-ref inferior-packages-data 'names))
(versions (assq-ref inferior-packages-data 'versions))
(package-license-set-ids
- (inferior-packages->license-set-ids
- conn
- (inferior-packages->license-id-lists
- conn
- ;; TODO Don't needlessly convert
- (vector->list
- (assq-ref inferior-packages-data 'license-data)))))
+ (with-time-logging "inserting package license sets"
+ (inferior-packages->license-set-ids
+ conn
+ (inferior-packages->license-id-lists
+ conn
+ ;; TODO Don't needlessly convert
+ (vector->list
+ (assq-ref inferior-packages-data 'license-data))))))
(all-package-metadata-ids
new-package-metadata-ids
(with-time-logging "inserting package metadata entries"
@@ -898,15 +936,80 @@
(build-derivations store (list derivation)))
(derivation->output-path derivation)))
-(define (channel->source-and-derivation-file-names-by-system conn store channel
- fetch-with-authentication?)
+(define (non-blocking-port port)
+ "Make PORT non-blocking and return it."
+ (let ((flags (fcntl port F_GETFL)))
+ (when (zero? (logand O_NONBLOCK flags))
+ (fcntl port F_SETFL (logior O_NONBLOCK flags)))
+ port))
+
+(define (ensure-non-blocking-store-connection store)
+ (match (store-connection-socket store)
+ ((? file-port? port)
+ (non-blocking-port port))
+ (_ #f)))
+
+(define (call-with-temporary-blocking-store store proc)
+ (let* ((port (store-connection-socket store))
+ (flags (fcntl port F_GETFL)))
+ (unless (zero? (logand O_NONBLOCK flags))
+ (fcntl port F_SETFL (logxor O_NONBLOCK flags)))
+ (call-with-values
+ (lambda ()
+ (proc store))
+ (lambda vals
+ (fcntl port F_SETFL (logior O_NONBLOCK flags))
+ (apply values vals)))))
+
+(define (make-inferior-non-blocking! inferior)
+ (non-blocking-port
+ ((@@ (guix inferior) inferior-socket) inferior)))
+
+(define (call-with-temporary-thread thunk)
+ (let ((channel (make-channel)))
+ (call-with-new-thread
+ (lambda ()
+ (parameterize
+ ((current-read-waiter (lambda (port) (port-poll port "r")))
+ (current-write-waiter (lambda (port) (port-poll port "w"))))
+
+ (with-exception-handler
+ (lambda (exn)
+ (put-message channel `(exception ,exn)))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values thunk
+ (lambda values
+ (put-message channel `(values ,@values)))))
+ (lambda _
+ (backtrace))))
+ #:unwind? #t))))
+
+ (match (get-message channel)
+ (('values . results)
+ (apply values results))
+ (('exception . args)
+ (apply throw args)))))
+
+(define (inferior-eval-with-store/non-blocking inferior store proc)
+ (call-with-temporary-thread
+ (lambda ()
+ (inferior-eval-with-store inferior store proc))))
+
+(define* (channel->source-and-derivation-file-names-by-system
+ conn store channel
+ fetch-with-authentication?
+ #:key parallelism)
+
(define use-container? (defined?
'open-inferior/container
(resolve-module '(guix inferior))))
- (define (inferior-code channel-instance systems)
+ (define (inferior-code channel-instance system)
`(lambda (store)
- (let* ((instances
+ (let* ((system ,system)
+ (instances
(list
(channel-instance
(channel (name ',(channel-name channel))
@@ -915,61 +1018,57 @@
(commit ,(channel-commit channel)))
,(channel-instance-commit channel-instance)
,(channel-instance-checkout channel-instance)))))
- (map
- (lambda (system)
- (simple-format
- (current-error-port)
- "guix-data-service: computing the derivation-file-name for ~A\n"
- system)
-
- (let ((manifest
- (catch #t
- (lambda ()
- ((channel-instances->manifest instances #:system system) store))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "error: while computing manifest entry derivation for ~A\n"
- system)
- (simple-format
- (current-error-port)
- "error ~A: ~A\n" key args)
- #f))))
- (define (add-tmp-root-and-return-drv drv)
- (add-temp-root store drv)
- drv)
-
- `(,system
- .
- ((manifest-entry-item
- . ,(and manifest
+ (simple-format
+ (current-error-port)
+ "guix-data-service: computing the derivation-file-name for ~A\n"
+ system)
+
+ (let ((manifest
+ (catch #t
+ (lambda ()
+ ((channel-instances->manifest instances #:system system) store))
+ (lambda (key . args)
+ (simple-format
+ (current-error-port)
+ "error: while computing manifest entry derivation for ~A\n"
+ system)
+ (simple-format
+ (current-error-port)
+ "error ~A: ~A\n" key args)
+ #f))))
+ (define (add-tmp-root-and-return-drv drv)
+ (add-temp-root store drv)
+ drv)
+
+ `((manifest-entry-item
+ . ,(and manifest
+ (add-tmp-root-and-return-drv
+ (derivation-file-name
+ (manifest-entry-item
+ (first
+ (manifest-entries manifest)))))))
+ (profile
+ . ,(catch #t
+ (lambda ()
+ (and manifest
(add-tmp-root-and-return-drv
(derivation-file-name
- (manifest-entry-item
- (first
- (manifest-entries manifest)))))))
- (profile
- . ,(catch #t
- (lambda ()
- (and manifest
- (add-tmp-root-and-return-drv
- (derivation-file-name
- (parameterize ((%current-system system))
- (run-with-store store
- (profile-derivation
- manifest
- #:hooks %channel-profile-hooks)))))))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "error: while computing profile derivation for ~A\n"
- system)
- (simple-format
- (current-error-port)
- "error ~A: ~A\n" key args)
- #f)))))))
- (list ,@systems)))))
+ (parameterize ((%current-system system))
+ (run-with-store store
+ (profile-derivation
+ manifest
+ #:hooks %channel-profile-hooks)))))))
+ (lambda (key . args)
+ (simple-format
+ (current-error-port)
+ "error: while computing profile derivation for ~A\n"
+ system)
+ (simple-format
+ (current-error-port)
+ "error ~A: ~A\n" key args)
+ #f))))))))
+ (define (start-inferior inferior-store)
(let ((inferior
(if use-container?
(open-inferior/container
@@ -985,85 +1084,99 @@
(open-inferior (guix-store-path store)
#:error-port (current-error-port))))))
- (define (start-inferior-and-return-derivation-file-names)
- ;; /etc is only missing if open-inferior/container has been used
- (when use-container?
- (inferior-eval
- '(begin
- ;; Create /etc/pass, as %known-shorthand-profiles in (guix
- ;; profiles) tries to read from this file. Because the environment
- ;; is cleaned in build-self.scm, xdg-directory in (guix utils)
- ;; falls back to accessing /etc/passwd.
- (mkdir "/etc")
- (call-with-output-file "/etc/passwd"
- (lambda (port)
- (display "root:x:0:0::/root:/bin/bash" port))))
- inferior))
-
- (let ((channel-instance
- ;; Obtain a session level lock here, to avoid conflicts with
- ;; other jobs over the Git repository.
- (with-advisory-session-lock/log-time
- conn
- 'latest-channel-instances
- (lambda ()
- (first
- (latest-channel-instances store
- (list channel)
- #:authenticate?
- fetch-with-authentication?))))))
- (inferior-eval '(use-modules (srfi srfi-1)
- (ice-9 history)
- (guix channels)
- (guix grafts)
- (guix profiles))
- inferior)
- (inferior-eval '(%graft? #f)
- inferior)
- (inferior-eval '(disable-value-history!)
- inferior)
- (inferior-eval '(define channel-instance
- (@@ (guix channels) channel-instance))
- inferior)
-
- (let* ((systems
- (inferior-eval '(@ (guix packages) %supported-systems)
- inferior))
- (result
- (inferior-eval-with-store
- inferior
- store
- (inferior-code channel-instance systems))))
-
- (close-inferior inferior)
-
- (cons
- (channel-instance-checkout channel-instance)
- result))))
-
- (catch
- #t
- (lambda ()
- (with-throw-handler #t
- start-inferior-and-return-derivation-file-names
- (lambda (key . parameters)
- (display (backtrace) (current-error-port))
- (display "\n" (current-error-port))
- (simple-format (current-error-port)
- "error: channel->derivation-file-names-by-system: ~A: ~A\n"
- key parameters))))
- (lambda args
- (close-inferior inferior)
- #f))))
-
-(define (channel->source-and-derivations-by-system conn store channel
- fetch-with-authentication?)
+ ;; /etc is only missing if open-inferior/container has been used
+ (when use-container?
+ (inferior-eval
+ '(begin
+ ;; Create /etc/pass, as %known-shorthand-profiles in (guix
+ ;; profiles) tries to read from this file. Because the environment
+ ;; is cleaned in build-self.scm, xdg-directory in (guix utils)
+ ;; falls back to accessing /etc/passwd.
+ (mkdir "/etc")
+ (call-with-output-file "/etc/passwd"
+ (lambda (port)
+ (display "root:x:0:0::/root:/bin/bash" port))))
+ inferior))
+
+ (inferior-eval '(use-modules (srfi srfi-1)
+ (ice-9 history)
+ (guix channels)
+ (guix grafts)
+ (guix profiles))
+ inferior)
+ (inferior-eval '(%graft? #f)
+ inferior)
+ (inferior-eval '(disable-value-history!)
+ inferior)
+ (inferior-eval '(define channel-instance
+ (@@ (guix channels) channel-instance))
+ inferior)
+
+ inferior))
+
+ (let* ((channel-instance
+ ;; Obtain a session level lock here, to avoid conflicts with
+ ;; other jobs over the Git repository.
+ (with-advisory-session-lock/log-time
+ conn
+ 'latest-channel-instances
+ (lambda ()
+ ;; TODO (guix serialization) uses dynamic-wind
+ (call-with-temporary-thread
+ (lambda ()
+ (first
+ (latest-channel-instances store
+ (list channel)
+ #:authenticate?
+ fetch-with-authentication?)))))))
+ (inferior-and-store-pool
+ (make-resource-pool
+ (lambda ()
+ (let* ((inferior-store (open-connection))
+ (inferior (start-inferior inferior-store)))
+ (ensure-non-blocking-store-connection inferior-store)
+ (make-inferior-non-blocking! inferior)
+ (cons inferior inferior-store)))
+ parallelism
+ #:min-size 0
+ #:idle-seconds 10
+ #:destructor (match-lambda
+ ((inferior . store)
+ (close-inferior inferior)
+ (close-connection store)))))
+ (systems
+ (with-resource-from-pool inferior-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (inferior-eval '(@ (guix packages) %supported-systems)
+ inferior)))))
+ (result
+ (par-map&
+ (lambda (system)
+ (with-resource-from-pool inferior-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (cons system
+ (inferior-eval-with-store/non-blocking
+ inferior
+ inferior-store
+ (inferior-code channel-instance system)))))))
+ systems)))
+
+ (cons
+ (channel-instance-checkout channel-instance)
+ result)))
+
+(define* (channel->source-and-derivations-by-system conn store channel
+ fetch-with-authentication?
+ #:key parallelism)
(match (with-time-logging "computing the channel derivation"
(channel->source-and-derivation-file-names-by-system
conn
store
channel
- fetch-with-authentication?))
+ fetch-with-authentication?
+ #:parallelism parallelism))
((source . derivation-file-names-by-system)
(for-each
(match-lambda
@@ -1148,17 +1261,9 @@
output)))
-(define (start-inferior-for-data-extration store store-path)
- (let* ((guix-locpath (getenv "GUIX_LOCPATH"))
- (inf (let ((guix-locpath
- ;; Augment the GUIX_LOCPATH to include glibc-locales from
- ;; the Guix at store-path, this should mean that the
- ;; inferior Guix works, even if it's build using a different
- ;; glibc version
- (string-append
- (glibc-locales-for-guix-store-path store store-path)
- "/lib/locale"
- ":" guix-locpath)))
+(define (start-inferior-for-data-extration store store-path guix-locpath)
+ (let* ((original-guix-locpath (getenv "GUIX_LOCPATH"))
+ (inf (begin
;; Unset the GUILE_LOAD_PATH and GUILE_LOAD_COMPILED_PATH to
;; avoid the values for these being used in the
;; inferior. Even though the inferior %load-path and
@@ -1185,7 +1290,7 @@
(simple-format #t "debug: using open-inferior\n")
(open-inferior store-path
#:error-port (current-error-port)))))))
- (setenv "GUIX_LOCPATH" guix-locpath) ; restore GUIX_LOCPATH
+ (setenv "GUIX_LOCPATH" original-guix-locpath) ; restore GUIX_LOCPATH
(when (eq? inf #f)
(error "error: inferior is #f"))
@@ -1202,6 +1307,7 @@
(inferior-eval '(use-modules (srfi srfi-1)
(srfi srfi-34)
+ (srfi srfi-43)
(ice-9 history)
(guix grafts)
(guix derivations)
@@ -1221,144 +1327,187 @@
(inferior-eval '(when (defined? '%graft?) (%graft? #f)) inf)
+ ;; TODO Have Guix make this easier
+ ((@@ (guix inferior) ensure-store-bridge!) inf)
+ (non-blocking-port ((@@ (guix inferior) inferior-bridge-socket) inf))
+
inf))
(define* (extract-information-from conn store guix-revision-id commit
guix-source store-path
- #:key skip-system-tests?)
- (simple-format #t "debug: extract-information-from: ~A\n" store-path)
+ #:key skip-system-tests?
+ parallelism)
+
+ (define guix-locpath
+ ;; Augment the GUIX_LOCPATH to include glibc-locales from
+ ;; the Guix at store-path, this should mean that the
+ ;; inferior Guix works, even if it's build using a different
+ ;; glibc version
+ (string-append
+ (glibc-locales-for-guix-store-path store store-path)
+ "/lib/locale"
+ ":" (getenv "GUIX_LOCPATH")))
+
+ (define inf-and-store-pool
+ (make-resource-pool
+ (lambda ()
+ (let* ((inferior-store (open-connection))
+ (inferior (start-inferior-for-data-extration inferior-store
+ store-path
+ guix-locpath)))
+ (ensure-non-blocking-store-connection inferior-store)
+ (make-inferior-non-blocking! inferior)
+ (cons inferior inferior-store)))
+ parallelism
+ #:min-size 0
+ #:idle-seconds 10
+ #:destructor (match-lambda
+ ((inferior . store)
+ (close-inferior inferior)
+ (close-connection store)))))
- (let ((inf (start-inferior-for-data-extration store store-path)))
- (catch
- #t
- (lambda ()
- (let* ((packages
- (with-time-logging "fetching inferior packages"
- (inferior-packages-plus-replacements inf)))
- (inferior-lint-checkers-data
- (inferior-lint-checkers inf))
- (inferior-lint-warnings-data
- (and inferior-lint-checkers-data
- (with-time-logging "fetching inferior lint warnings"
- (map
- (match-lambda
- ((checker-name _ network-dependent?)
- (and (and (not network-dependent?)
- ;; Running the derivation linter is
- ;; currently infeasible
- (not (eq? checker-name 'derivation)))
- (inferior-lint-warnings inf
- store
- checker-name))))
- inferior-lint-checkers-data))))
- (inferior-system-target-pairs
- (inferior-fetch-system-target-pairs inf))
- (inferior-packages-system-and-target-to-derivations-alist
- (with-time-logging "getting inferior derivations"
- (map
- (match-lambda
- ((system . target)
- (cons (cons system target)
- (inferior-package-derivations store
- inf
- system
- target))))
- inferior-system-target-pairs)))
- (inferior-system-tests
- (if skip-system-tests?
- (begin
- (simple-format #t "debug: skipping system tests\n")
- '())
- (with-time-logging "getting inferior system tests"
- (all-inferior-system-tests inf store
- guix-source commit))))
- (packages-data
- (with-time-logging "getting all inferior package data"
- (all-inferior-packages-data inf packages))))
+ (simple-format #t "debug: extract-information-from: ~A\n" store-path)
+ (letpar& ((inferior-lint-checkers-and-warnings-data
+ (with-resource-from-pool inf-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (let ((inferior-lint-checkers-data
+ (inferior-lint-checkers inferior)))
+ (cons
+ inferior-lint-checkers-data
+ (if inferior-lint-checkers-data
+ (with-time-logging "fetching inferior lint warnings"
+ (map
+ (match-lambda
+ ((checker-name _ network-dependent?)
+ (and (and (not network-dependent?)
+ ;; Running the derivation linter is
+ ;; currently infeasible
+ (not (eq? checker-name 'derivation)))
+ (inferior-lint-warnings inferior
+ inferior-store
+ checker-name))))
+ inferior-lint-checkers-data))
+ #f)))))))
+ (inferior-packages-system-and-target-to-derivations-alist
+ (with-time-logging "getting inferior derivations"
+ (par-map&
+ (match-lambda
+ ((system . target)
+ (with-resource-from-pool inf-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (ensure-gds-inferior-packages-defined! inferior)
+
+ (cons (cons system target)
+ (inferior-package-derivations inferior-store
+ inferior
+ system
+ target)))))))
+ (with-resource-from-pool inf-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (inferior-fetch-system-target-pairs inferior)))))))
+ (inferior-system-tests
+ (if skip-system-tests?
+ (begin
+ (simple-format #t "debug: skipping system tests\n")
+ '())
+ (with-resource-from-pool inf-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (with-time-logging "getting inferior system tests"
+ (all-inferior-system-tests inferior inferior-store
+ guix-source commit)))))))
+ (packages-data
+ (with-time-logging "getting all inferior package data"
+ (with-resource-from-pool inf-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (with-time-logging "fetching inferior packages"
+ (let ((packages
+ pkg-to-replacement-hash-table
+ (inferior-packages-plus-replacements inferior)))
+ (all-inferior-packages-data inferior
+ packages
+ pkg-to-replacement-hash-table)))))))))
+
+ (destroy-resource-pool inf-and-store-pool)
+
+ (simple-format
+ #t "debug: finished loading information from inferior\n")
+
+ (with-time-logging
+ "acquiring advisory transaction lock: load-new-guix-revision-inserts"
+ ;; Wait until this is the only transaction inserting data, to
+ ;; avoid any concurrency issues
+ (obtain-advisory-transaction-lock conn
+ 'load-new-guix-revision-inserts))
+ (with-time-logging
+ "inserting data"
+ (let* ((package-ids
+ (insert-packages conn packages-data)))
+ (when inferior-lint-warnings
+ (let* ((lint-checker-ids
+ (lint-checkers->lint-checker-ids
+ conn
+ (map (match-lambda
+ ((name descriptions-by-locale network-dependent)
+ (list
+ name
+ network-dependent
+ (lint-checker-description-data->lint-checker-description-set-id
+ conn descriptions-by-locale))))
+ (car inferior-lint-checkers-and-warnings-data))))
+ (lint-warning-ids
+ (insert-lint-warnings
+ conn
+ package-ids
+ lint-checker-ids
+ (cdr inferior-lint-checkers-and-warnings-data))))
+ (insert-guix-revision-lint-checkers conn
+ guix-revision-id
+ lint-checker-ids)
+
+ (chunk-for-each!
+ (lambda (lint-warning-ids-chunk)
+ (insert-guix-revision-lint-warnings conn
+ guix-revision-id
+ lint-warning-ids-chunk))
+ 5000
+ lint-warning-ids)))
+
+ (when inferior-system-tests
+ (insert-system-tests-for-guix-revision conn
+ guix-revision-id
+ inferior-system-tests))
+
+ (let* ((package-derivation-ids
+ (with-time-logging "inferior-data->package-derivation-ids"
+ (inferior-data->package-derivation-ids
+ conn
+ inf
+ package-ids
+ inferior-packages-system-and-target-to-derivations-alist)))
+ (ids-count
+ (length package-derivation-ids)))
+ (chunk-for-each! (lambda (package-derivation-ids-chunk)
+ (insert-guix-revision-package-derivations
+ conn
+ guix-revision-id
+ package-derivation-ids-chunk))
+ 2000
+ package-derivation-ids)
(simple-format
- #t "debug: finished loading information from inferior\n")
- (close-inferior inf)
-
- (with-time-logging
- "acquiring advisory transaction lock: load-new-guix-revision-inserts"
- ;; Wait until this is the only transaction inserting data, to
- ;; avoid any concurrency issues
- (obtain-advisory-transaction-lock conn
- 'load-new-guix-revision-inserts))
- (with-time-logging
- "inserting data"
- (let* ((package-ids
- (insert-packages conn packages-data)))
- (when inferior-lint-warnings
- (let* ((lint-checker-ids
- (lint-checkers->lint-checker-ids
- conn
- (map (match-lambda
- ((name descriptions-by-locale network-dependent)
- (list
- name
- network-dependent
- (lint-checker-description-data->lint-checker-description-set-id
- conn descriptions-by-locale))))
- inferior-lint-checkers-data)))
- (lint-warning-ids
- (insert-lint-warnings
- conn
- package-ids
- lint-checker-ids
- inferior-lint-warnings-data)))
- (insert-guix-revision-lint-checkers conn
- guix-revision-id
- lint-checker-ids)
-
- (chunk-for-each!
- (lambda (lint-warning-ids-chunk)
- (insert-guix-revision-lint-warnings conn
- guix-revision-id
- lint-warning-ids-chunk))
- 5000
- lint-warning-ids)))
-
- (when inferior-system-tests
- (insert-system-tests-for-guix-revision conn
- guix-revision-id
- inferior-system-tests))
-
- (let* ((package-derivation-ids
- (with-time-logging "inferior-data->package-derivation-ids"
- (inferior-data->package-derivation-ids
- conn
- inf
- package-ids
- inferior-packages-system-and-target-to-derivations-alist)))
- (ids-count
- (length package-derivation-ids)))
- (chunk-for-each! (lambda (package-derivation-ids-chunk)
- (insert-guix-revision-package-derivations
- conn
- guix-revision-id
- package-derivation-ids-chunk))
- 2000
- package-derivation-ids)
- (simple-format
- #t "Successfully loaded ~A package/derivation pairs\n"
- ids-count))
+ #t "Successfully loaded ~A package/derivation pairs\n"
+ ids-count))
- (with-time-logging
- "insert-guix-revision-package-derivation-distribution-counts"
- (insert-guix-revision-package-derivation-distribution-counts
- conn
- guix-revision-id)))))
- #t)
- (lambda (key . args)
- (simple-format (current-error-port)
- "Failed extracting information from commit: ~A\n\n" commit)
- (simple-format (current-error-port)
- " ~A ~A\n\n" key args)
- #f)
- (lambda (key . args)
- (display-backtrace (make-stack #t) (current-error-port))))))
+ (with-time-logging
+ "insert-guix-revision-package-derivation-distribution-counts"
+ (insert-guix-revision-package-derivation-distribution-counts
+ conn
+ guix-revision-id))))))
(prevent-inlining-for-tests extract-information-from)
@@ -1409,7 +1558,7 @@
(prevent-inlining-for-tests load-channel-instances)
(define* (load-new-guix-revision conn store git-repository-id commit
- #:key skip-system-tests?)
+ #:key skip-system-tests? parallelism)
(let* ((git-repository-fields
(select-git-repository conn git-repository-id))
(git-repository-url
@@ -1421,10 +1570,12 @@
(url git-repository-url)
(commit commit)))
(source-and-channel-derivations-by-system
- (channel->source-and-derivations-by-system conn
- store
- channel-for-commit
- fetch-with-authentication?))
+ (channel->source-and-derivations-by-system
+ conn
+ store
+ channel-for-commit
+ fetch-with-authentication?
+ #:parallelism parallelism))
(guix-source
(car source-and-channel-derivations-by-system))
(channel-derivations-by-system
@@ -1442,7 +1593,8 @@
guix-revision-id
commit guix-source store-item
#:skip-system-tests?
- skip-system-tests?)
+ skip-system-tests?
+ #:parallelism parallelism)
(if (defined? 'channel-news-for-commit
(resolve-module '(guix channels)))
@@ -1817,13 +1969,15 @@ SKIP LOCKED")
(define (with-store-connection f)
(with-store store
+ (ensure-non-blocking-store-connection store)
(set-build-options store #:fallback? #t)
(f store)))
(prevent-inlining-for-tests with-store-connection)
-(define* (process-load-new-guix-revision-job id #:key skip-system-tests?)
+(define* (process-load-new-guix-revision-job id #:key skip-system-tests?
+ parallelism)
(with-postgresql-connection
(simple-format #f "load-new-guix-revision ~A" id)
(lambda (conn)
@@ -1860,7 +2014,8 @@ SKIP LOCKED")
store
git-repository-id
commit
- #:skip-system-tests? #t))))
+ #:skip-system-tests? #t
+ #:parallelism parallelism))))
(lambda (key . args)
(simple-format (current-error-port)
"error: load-new-guix-revision: ~A ~A\n"