aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-data-service/jobs.scm16
-rw-r--r--guix-data-service/jobs/load-new-guix-revision.scm777
-rw-r--r--scripts/guix-data-service-process-job.in5
-rw-r--r--scripts/guix-data-service-process-jobs.in14
4 files changed, 490 insertions, 322 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm
index e2294e9..8217d52 100644
--- a/guix-data-service/jobs.scm
+++ b/guix-data-service/jobs.scm
@@ -125,7 +125,8 @@ guix-data-service: error: missing log line: ~A
(define* (process-jobs conn #:key max-processes
latest-branch-revision-max-processes
- skip-system-tests?)
+ skip-system-tests?
+ per-job-parallelism)
(define (fetch-new-jobs)
(fetch-unlocked-jobs conn))
@@ -133,11 +134,14 @@ guix-data-service: error: missing log line: ~A
(let ((log-port (start-thread-for-process-output job-id)))
(spawn
"guix-data-service-process-job"
- (cons* "guix-data-service-process-job"
- job-id
- (if skip-system-tests?
- '("--skip-system-tests")
- '()))
+ `("guix-data-service-process-job"
+ ,job-id
+ ,@(if skip-system-tests?
+ '("--skip-system-tests")
+ '())
+ ,@(if per-job-parallelism
+ (list (simple-format #f "--parallelism=~A" per-job-parallelism))
+ '()))
#:output log-port
#:error log-port)))
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm
index a3c6527..a07baa2 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -24,10 +24,13 @@
#:use-module (ice-9 threads)
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 hash-table)
+ #:use-module (ice-9 suspendable-ports)
+ #:use-module ((ice-9 ports internal) #:select (port-poll))
#:use-module (rnrs exceptions)
#:use-module (json)
#:use-module (squee)
#:use-module (fibers)
+ #:use-module (fibers channels)
#:use-module (guix monads)
#:use-module (guix store)
#:use-module (guix channels)
@@ -185,7 +188,7 @@
(let ((system-test-data
(with-time-logging "getting system tests"
- (inferior-eval-with-store inf store extract))))
+ (inferior-eval-with-store/non-blocking inf store extract))))
(for-each (lambda (derivation-file-names-by-system)
(for-each (lambda (derivation-file-name)
@@ -342,9 +345,11 @@
#:unwind? #t)))
gds-inferior-packages))))
+ (ensure-gds-inferior-packages-defined! inf)
+
(with-time-logging (simple-format #f "getting ~A lint warnings"
checker-name)
- (inferior-eval-with-store
+ (inferior-eval-with-store/non-blocking
inf
store
lint-warnings-for-checker)))
@@ -588,12 +593,13 @@
(with-time-logging
(simple-format #f "getting derivations for ~A" (cons system target))
- (inferior-eval-with-store
+ (inferior-eval-with-store/non-blocking
inf
store
proc)))
-(define (sort-and-deduplicate-inferior-packages packages)
+(define (sort-and-deduplicate-inferior-packages packages
+ pkg-to-replacement-hash-table)
(pair-fold
(lambda (pair result)
(if (null? (cdr pair))
@@ -604,8 +610,8 @@
(b-name (inferior-package-name b))
(a-version (inferior-package-version a))
(b-version (inferior-package-version b))
- (a-replacement (inferior-package-replacement a))
- (b-replacement (inferior-package-replacement b)))
+ (a-replacement (hashq-ref pkg-to-replacement-hash-table a))
+ (b-replacement (hashq-ref pkg-to-replacement-hash-table b)))
(if (and (string=? a-name b-name)
(string=? a-version b-version)
(eq? a-replacement b-replacement))
@@ -638,8 +644,24 @@
b-name)))))))
(define (inferior-packages-plus-replacements inf)
- (let* ((packages (inferior-packages inf))
- (replacements (filter-map inferior-package-replacement packages))
+ (let* ((packages
+ ;; The use of force in (guix inferior) introduces a continuation
+ ;; barrier
+ (call-with-temporary-thread
+ (lambda ()
+ (inferior-packages inf))))
+ (replacements (map inferior-package-replacement packages))
+ (pkg-to-replacement-hash-table
+ (let ((ht (make-hash-table)))
+ (for-each
+ (lambda (pkg replacement)
+ (when replacement
+ (hashq-set! ht
+ pkg
+ replacement)))
+ packages
+ replacements)
+ ht))
(non-exported-replacements
(let ((package-id-hash-table (make-hash-table)))
(for-each (lambda (pkg)
@@ -648,28 +670,30 @@
#t))
packages)
- (filter (lambda (pkg)
- (eq? #f
- (hash-ref package-id-hash-table
- (inferior-package-id pkg))))
- replacements)))
+ (filter
+ (lambda (pkg)
+ (and pkg
+ (eq? #f
+ (hash-ref package-id-hash-table
+ (inferior-package-id pkg)))))
+ replacements)))
(deduplicated-packages
;; This isn't perfect, sometimes there can be two packages with the
;; same name and version, but different derivations. Guix will warn
;; about this case though, generally this means only one of the
;; packages should be exported.
- (sort-and-deduplicate-inferior-packages
- (append! packages non-exported-replacements)))
+ (call-with-temporary-thread
+ (lambda ()
+ ;; TODO Sort introduces a continuation barrier
+ (sort-and-deduplicate-inferior-packages
+ (append! packages non-exported-replacements)
+ pkg-to-replacement-hash-table))))
(deduplicated-packages-length
(length deduplicated-packages)))
(inferior-eval
- `(use-modules (srfi srfi-43))
- inf)
-
- (inferior-eval
`(define gds-inferior-packages
(make-vector ,deduplicated-packages-length))
inf)
@@ -685,9 +709,14 @@
(list ,@(map inferior-package-id deduplicated-packages)))
inf)
- (list->vector deduplicated-packages)))
+ (values (list->vector deduplicated-packages)
+ pkg-to-replacement-hash-table)))
+
+(define (ensure-gds-inferior-packages-defined! inf)
+ (unless (inferior-eval '(defined? 'gds-inferior-packages) inf)
+ (inferior-packages-plus-replacements inf)))
-(define* (all-inferior-packages-data inf packages)
+(define* (all-inferior-packages-data inf packages pkg-to-replacement-hash-table)
(define inferior-package-id->packages-index-hash-table
(let ((hash-table (make-hash-table)))
(vector-for-each
@@ -717,7 +746,7 @@
(package-replacement-data
(vector-map
(lambda (_ pkg)
- (let ((replacement (inferior-package-replacement pkg)))
+ (let ((replacement (hashq-ref pkg-to-replacement-hash-table pkg)))
(if replacement
;; I'm not sure if replacements can themselves be
;; replaced, but I do know for sure that there are
@@ -726,8 +755,16 @@
;; example).
;;
;; So this might be #f in these cases
- (hash-ref inferior-package-id->packages-index-hash-table
- (inferior-package-id pkg))
+ (let ((index
+ (hash-ref inferior-package-id->packages-index-hash-table
+ (inferior-package-id replacement))))
+ (unless index
+ (simple-format
+ (current-error-port)
+ "warning: replacement for ~A (~A) is unknown\n"
+ pkg
+ replacement))
+ index)
#f)))
packages)))
@@ -743,13 +780,14 @@
(let* ((names (assq-ref inferior-packages-data 'names))
(versions (assq-ref inferior-packages-data 'versions))
(package-license-set-ids
- (inferior-packages->license-set-ids
- conn
- (inferior-packages->license-id-lists
- conn
- ;; TODO Don't needlessly convert
- (vector->list
- (assq-ref inferior-packages-data 'license-data)))))
+ (with-time-logging "inserting package license sets"
+ (inferior-packages->license-set-ids
+ conn
+ (inferior-packages->license-id-lists
+ conn
+ ;; TODO Don't needlessly convert
+ (vector->list
+ (assq-ref inferior-packages-data 'license-data))))))
(all-package-metadata-ids
new-package-metadata-ids
(with-time-logging "inserting package metadata entries"
@@ -898,15 +936,80 @@
(build-derivations store (list derivation)))
(derivation->output-path derivation)))
-(define (channel->source-and-derivation-file-names-by-system conn store channel
- fetch-with-authentication?)
+(define (non-blocking-port port)
+ "Make PORT non-blocking and return it."
+ (let ((flags (fcntl port F_GETFL)))
+ (when (zero? (logand O_NONBLOCK flags))
+ (fcntl port F_SETFL (logior O_NONBLOCK flags)))
+ port))
+
+(define (ensure-non-blocking-store-connection store)
+ (match (store-connection-socket store)
+ ((? file-port? port)
+ (non-blocking-port port))
+ (_ #f)))
+
+(define (call-with-temporary-blocking-store store proc)
+ (let* ((port (store-connection-socket store))
+ (flags (fcntl port F_GETFL)))
+ (unless (zero? (logand O_NONBLOCK flags))
+ (fcntl port F_SETFL (logxor O_NONBLOCK flags)))
+ (call-with-values
+ (lambda ()
+ (proc store))
+ (lambda vals
+ (fcntl port F_SETFL (logior O_NONBLOCK flags))
+ (apply values vals)))))
+
+(define (make-inferior-non-blocking! inferior)
+ (non-blocking-port
+ ((@@ (guix inferior) inferior-socket) inferior)))
+
+(define (call-with-temporary-thread thunk)
+ (let ((channel (make-channel)))
+ (call-with-new-thread
+ (lambda ()
+ (parameterize
+ ((current-read-waiter (lambda (port) (port-poll port "r")))
+ (current-write-waiter (lambda (port) (port-poll port "w"))))
+
+ (with-exception-handler
+ (lambda (exn)
+ (put-message channel `(exception ,exn)))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values thunk
+ (lambda values
+ (put-message channel `(values ,@values)))))
+ (lambda _
+ (backtrace))))
+ #:unwind? #t))))
+
+ (match (get-message channel)
+ (('values . results)
+ (apply values results))
+ (('exception . args)
+ (apply throw args)))))
+
+(define (inferior-eval-with-store/non-blocking inferior store proc)
+ (call-with-temporary-thread
+ (lambda ()
+ (inferior-eval-with-store inferior store proc))))
+
+(define* (channel->source-and-derivation-file-names-by-system
+ conn store channel
+ fetch-with-authentication?
+ #:key parallelism)
+
(define use-container? (defined?
'open-inferior/container
(resolve-module '(guix inferior))))
- (define (inferior-code channel-instance systems)
+ (define (inferior-code channel-instance system)
`(lambda (store)
- (let* ((instances
+ (let* ((system ,system)
+ (instances
(list
(channel-instance
(channel (name ',(channel-name channel))
@@ -915,61 +1018,57 @@
(commit ,(channel-commit channel)))
,(channel-instance-commit channel-instance)
,(channel-instance-checkout channel-instance)))))
- (map
- (lambda (system)
- (simple-format
- (current-error-port)
- "guix-data-service: computing the derivation-file-name for ~A\n"
- system)
-
- (let ((manifest
- (catch #t
- (lambda ()
- ((channel-instances->manifest instances #:system system) store))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "error: while computing manifest entry derivation for ~A\n"
- system)
- (simple-format
- (current-error-port)
- "error ~A: ~A\n" key args)
- #f))))
- (define (add-tmp-root-and-return-drv drv)
- (add-temp-root store drv)
- drv)
-
- `(,system
- .
- ((manifest-entry-item
- . ,(and manifest
+ (simple-format
+ (current-error-port)
+ "guix-data-service: computing the derivation-file-name for ~A\n"
+ system)
+
+ (let ((manifest
+ (catch #t
+ (lambda ()
+ ((channel-instances->manifest instances #:system system) store))
+ (lambda (key . args)
+ (simple-format
+ (current-error-port)
+ "error: while computing manifest entry derivation for ~A\n"
+ system)
+ (simple-format
+ (current-error-port)
+ "error ~A: ~A\n" key args)
+ #f))))
+ (define (add-tmp-root-and-return-drv drv)
+ (add-temp-root store drv)
+ drv)
+
+ `((manifest-entry-item
+ . ,(and manifest
+ (add-tmp-root-and-return-drv
+ (derivation-file-name
+ (manifest-entry-item
+ (first
+ (manifest-entries manifest)))))))
+ (profile
+ . ,(catch #t
+ (lambda ()
+ (and manifest
(add-tmp-root-and-return-drv
(derivation-file-name
- (manifest-entry-item
- (first
- (manifest-entries manifest)))))))
- (profile
- . ,(catch #t
- (lambda ()
- (and manifest
- (add-tmp-root-and-return-drv
- (derivation-file-name
- (parameterize ((%current-system system))
- (run-with-store store
- (profile-derivation
- manifest
- #:hooks %channel-profile-hooks)))))))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "error: while computing profile derivation for ~A\n"
- system)
- (simple-format
- (current-error-port)
- "error ~A: ~A\n" key args)
- #f)))))))
- (list ,@systems)))))
+ (parameterize ((%current-system system))
+ (run-with-store store
+ (profile-derivation
+ manifest
+ #:hooks %channel-profile-hooks)))))))
+ (lambda (key . args)
+ (simple-format
+ (current-error-port)
+ "error: while computing profile derivation for ~A\n"
+ system)
+ (simple-format
+ (current-error-port)
+ "error ~A: ~A\n" key args)
+ #f))))))))
+ (define (start-inferior inferior-store)
(let ((inferior
(if use-container?
(open-inferior/container
@@ -985,85 +1084,99 @@
(open-inferior (guix-store-path store)
#:error-port (current-error-port))))))
- (define (start-inferior-and-return-derivation-file-names)
- ;; /etc is only missing if open-inferior/container has been used
- (when use-container?
- (inferior-eval
- '(begin
- ;; Create /etc/pass, as %known-shorthand-profiles in (guix
- ;; profiles) tries to read from this file. Because the environment
- ;; is cleaned in build-self.scm, xdg-directory in (guix utils)
- ;; falls back to accessing /etc/passwd.
- (mkdir "/etc")
- (call-with-output-file "/etc/passwd"
- (lambda (port)
- (display "root:x:0:0::/root:/bin/bash" port))))
- inferior))
-
- (let ((channel-instance
- ;; Obtain a session level lock here, to avoid conflicts with
- ;; other jobs over the Git repository.
- (with-advisory-session-lock/log-time
- conn
- 'latest-channel-instances
- (lambda ()
- (first
- (latest-channel-instances store
- (list channel)
- #:authenticate?
- fetch-with-authentication?))))))
- (inferior-eval '(use-modules (srfi srfi-1)
- (ice-9 history)
- (guix channels)
- (guix grafts)
- (guix profiles))
- inferior)
- (inferior-eval '(%graft? #f)
- inferior)
- (inferior-eval '(disable-value-history!)
- inferior)
- (inferior-eval '(define channel-instance
- (@@ (guix channels) channel-instance))
- inferior)
-
- (let* ((systems
- (inferior-eval '(@ (guix packages) %supported-systems)
- inferior))
- (result
- (inferior-eval-with-store
- inferior
- store
- (inferior-code channel-instance systems))))
-
- (close-inferior inferior)
-
- (cons
- (channel-instance-checkout channel-instance)
- result))))
-
- (catch
- #t
- (lambda ()
- (with-throw-handler #t
- start-inferior-and-return-derivation-file-names
- (lambda (key . parameters)
- (display (backtrace) (current-error-port))
- (display "\n" (current-error-port))
- (simple-format (current-error-port)
- "error: channel->derivation-file-names-by-system: ~A: ~A\n"
- key parameters))))
- (lambda args
- (close-inferior inferior)
- #f))))
-
-(define (channel->source-and-derivations-by-system conn store channel
- fetch-with-authentication?)
+ ;; /etc is only missing if open-inferior/container has been used
+ (when use-container?
+ (inferior-eval
+ '(begin
+ ;; Create /etc/pass, as %known-shorthand-profiles in (guix
+ ;; profiles) tries to read from this file. Because the environment
+ ;; is cleaned in build-self.scm, xdg-directory in (guix utils)
+ ;; falls back to accessing /etc/passwd.
+ (mkdir "/etc")
+ (call-with-output-file "/etc/passwd"
+ (lambda (port)
+ (display "root:x:0:0::/root:/bin/bash" port))))
+ inferior))
+
+ (inferior-eval '(use-modules (srfi srfi-1)
+ (ice-9 history)
+ (guix channels)
+ (guix grafts)
+ (guix profiles))
+ inferior)
+ (inferior-eval '(%graft? #f)
+ inferior)
+ (inferior-eval '(disable-value-history!)
+ inferior)
+ (inferior-eval '(define channel-instance
+ (@@ (guix channels) channel-instance))
+ inferior)
+
+ inferior))
+
+ (let* ((channel-instance
+ ;; Obtain a session level lock here, to avoid conflicts with
+ ;; other jobs over the Git repository.
+ (with-advisory-session-lock/log-time
+ conn
+ 'latest-channel-instances
+ (lambda ()
+ ;; TODO (guix serialization) uses dynamic-wind
+ (call-with-temporary-thread
+ (lambda ()
+ (first
+ (latest-channel-instances store
+ (list channel)
+ #:authenticate?
+ fetch-with-authentication?)))))))
+ (inferior-and-store-pool
+ (make-resource-pool
+ (lambda ()
+ (let* ((inferior-store (open-connection))
+ (inferior (start-inferior inferior-store)))
+ (ensure-non-blocking-store-connection inferior-store)
+ (make-inferior-non-blocking! inferior)
+ (cons inferior inferior-store)))
+ parallelism
+ #:min-size 0
+ #:idle-seconds 10
+ #:destructor (match-lambda
+ ((inferior . store)
+ (close-inferior inferior)
+ (close-connection store)))))
+ (systems
+ (with-resource-from-pool inferior-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (inferior-eval '(@ (guix packages) %supported-systems)
+ inferior)))))
+ (result
+ (par-map&
+ (lambda (system)
+ (with-resource-from-pool inferior-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (cons system
+ (inferior-eval-with-store/non-blocking
+ inferior
+ inferior-store
+ (inferior-code channel-instance system)))))))
+ systems)))
+
+ (cons
+ (channel-instance-checkout channel-instance)
+ result)))
+
+(define* (channel->source-and-derivations-by-system conn store channel
+ fetch-with-authentication?
+ #:key parallelism)
(match (with-time-logging "computing the channel derivation"
(channel->source-and-derivation-file-names-by-system
conn
store
channel
- fetch-with-authentication?))
+ fetch-with-authentication?
+ #:parallelism parallelism))
((source . derivation-file-names-by-system)
(for-each
(match-lambda
@@ -1148,17 +1261,9 @@
output)))
-(define (start-inferior-for-data-extration store store-path)
- (let* ((guix-locpath (getenv "GUIX_LOCPATH"))
- (inf (let ((guix-locpath
- ;; Augment the GUIX_LOCPATH to include glibc-locales from
- ;; the Guix at store-path, this should mean that the
- ;; inferior Guix works, even if it's build using a different
- ;; glibc version
- (string-append
- (glibc-locales-for-guix-store-path store store-path)
- "/lib/locale"
- ":" guix-locpath)))
+(define (start-inferior-for-data-extration store store-path guix-locpath)
+ (let* ((original-guix-locpath (getenv "GUIX_LOCPATH"))
+ (inf (begin
;; Unset the GUILE_LOAD_PATH and GUILE_LOAD_COMPILED_PATH to
;; avoid the values for these being used in the
;; inferior. Even though the inferior %load-path and
@@ -1185,7 +1290,7 @@
(simple-format #t "debug: using open-inferior\n")
(open-inferior store-path
#:error-port (current-error-port)))))))
- (setenv "GUIX_LOCPATH" guix-locpath) ; restore GUIX_LOCPATH
+ (setenv "GUIX_LOCPATH" original-guix-locpath) ; restore GUIX_LOCPATH
(when (eq? inf #f)
(error "error: inferior is #f"))
@@ -1202,6 +1307,7 @@
(inferior-eval '(use-modules (srfi srfi-1)
(srfi srfi-34)
+ (srfi srfi-43)
(ice-9 history)
(guix grafts)
(guix derivations)
@@ -1221,144 +1327,187 @@
(inferior-eval '(when (defined? '%graft?) (%graft? #f)) inf)
+ ;; TODO Have Guix make this easier
+ ((@@ (guix inferior) ensure-store-bridge!) inf)
+ (non-blocking-port ((@@ (guix inferior) inferior-bridge-socket) inf))
+
inf))
(define* (extract-information-from conn store guix-revision-id commit
guix-source store-path
- #:key skip-system-tests?)
- (simple-format #t "debug: extract-information-from: ~A\n" store-path)
+ #:key skip-system-tests?
+ parallelism)
+
+ (define guix-locpath
+ ;; Augment the GUIX_LOCPATH to include glibc-locales from
+ ;; the Guix at store-path, this should mean that the
+ ;; inferior Guix works, even if it's build using a different
+ ;; glibc version
+ (string-append
+ (glibc-locales-for-guix-store-path store store-path)
+ "/lib/locale"
+ ":" (getenv "GUIX_LOCPATH")))
+
+ (define inf-and-store-pool
+ (make-resource-pool
+ (lambda ()
+ (let* ((inferior-store (open-connection))
+ (inferior (start-inferior-for-data-extration inferior-store
+ store-path
+ guix-locpath)))
+ (ensure-non-blocking-store-connection inferior-store)
+ (make-inferior-non-blocking! inferior)
+ (cons inferior inferior-store)))
+ parallelism
+ #:min-size 0
+ #:idle-seconds 10
+ #:destructor (match-lambda
+ ((inferior . store)
+ (close-inferior inferior)
+ (close-connection store)))))
- (let ((inf (start-inferior-for-data-extration store store-path)))
- (catch
- #t
- (lambda ()
- (let* ((packages
- (with-time-logging "fetching inferior packages"
- (inferior-packages-plus-replacements inf)))
- (inferior-lint-checkers-data
- (inferior-lint-checkers inf))
- (inferior-lint-warnings-data
- (and inferior-lint-checkers-data
- (with-time-logging "fetching inferior lint warnings"
- (map
- (match-lambda
- ((checker-name _ network-dependent?)
- (and (and (not network-dependent?)
- ;; Running the derivation linter is
- ;; currently infeasible
- (not (eq? checker-name 'derivation)))
- (inferior-lint-warnings inf
- store
- checker-name))))
- inferior-lint-checkers-data))))
- (inferior-system-target-pairs
- (inferior-fetch-system-target-pairs inf))
- (inferior-packages-system-and-target-to-derivations-alist
- (with-time-logging "getting inferior derivations"
- (map
- (match-lambda
- ((system . target)
- (cons (cons system target)
- (inferior-package-derivations store
- inf
- system
- target))))
- inferior-system-target-pairs)))
- (inferior-system-tests
- (if skip-system-tests?
- (begin
- (simple-format #t "debug: skipping system tests\n")
- '())
- (with-time-logging "getting inferior system tests"
- (all-inferior-system-tests inf store
- guix-source commit))))
- (packages-data
- (with-time-logging "getting all inferior package data"
- (all-inferior-packages-data inf packages))))
+ (simple-format #t "debug: extract-information-from: ~A\n" store-path)
+ (letpar& ((inferior-lint-checkers-and-warnings-data
+ (with-resource-from-pool inf-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (let ((inferior-lint-checkers-data
+ (inferior-lint-checkers inferior)))
+ (cons
+ inferior-lint-checkers-data
+ (if inferior-lint-checkers-data
+ (with-time-logging "fetching inferior lint warnings"
+ (map
+ (match-lambda
+ ((checker-name _ network-dependent?)
+ (and (and (not network-dependent?)
+ ;; Running the derivation linter is
+ ;; currently infeasible
+ (not (eq? checker-name 'derivation)))
+ (inferior-lint-warnings inferior
+ inferior-store
+ checker-name))))
+ inferior-lint-checkers-data))
+ #f)))))))
+ (inferior-packages-system-and-target-to-derivations-alist
+ (with-time-logging "getting inferior derivations"
+ (par-map&
+ (match-lambda
+ ((system . target)
+ (with-resource-from-pool inf-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (ensure-gds-inferior-packages-defined! inferior)
+
+ (cons (cons system target)
+ (inferior-package-derivations inferior-store
+ inferior
+ system
+ target)))))))
+ (with-resource-from-pool inf-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (inferior-fetch-system-target-pairs inferior)))))))
+ (inferior-system-tests
+ (if skip-system-tests?
+ (begin
+ (simple-format #t "debug: skipping system tests\n")
+ '())
+ (with-resource-from-pool inf-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (with-time-logging "getting inferior system tests"
+ (all-inferior-system-tests inferior inferior-store
+ guix-source commit)))))))
+ (packages-data
+ (with-time-logging "getting all inferior package data"
+ (with-resource-from-pool inf-and-store-pool res
+ (match res
+ ((inferior . inferior-store)
+ (with-time-logging "fetching inferior packages"
+ (let ((packages
+ pkg-to-replacement-hash-table
+ (inferior-packages-plus-replacements inferior)))
+ (all-inferior-packages-data inferior
+ packages
+ pkg-to-replacement-hash-table)))))))))
+
+ (destroy-resource-pool inf-and-store-pool)
+
+ (simple-format
+ #t "debug: finished loading information from inferior\n")
+
+ (with-time-logging
+ "acquiring advisory transaction lock: load-new-guix-revision-inserts"
+ ;; Wait until this is the only transaction inserting data, to
+ ;; avoid any concurrency issues
+ (obtain-advisory-transaction-lock conn
+ 'load-new-guix-revision-inserts))
+ (with-time-logging
+ "inserting data"
+ (let* ((package-ids
+ (insert-packages conn packages-data)))
+ (when inferior-lint-warnings
+ (let* ((lint-checker-ids
+ (lint-checkers->lint-checker-ids
+ conn
+ (map (match-lambda
+ ((name descriptions-by-locale network-dependent)
+ (list
+ name
+ network-dependent
+ (lint-checker-description-data->lint-checker-description-set-id
+ conn descriptions-by-locale))))
+ (car inferior-lint-checkers-and-warnings-data))))
+ (lint-warning-ids
+ (insert-lint-warnings
+ conn
+ package-ids
+ lint-checker-ids
+ (cdr inferior-lint-checkers-and-warnings-data))))
+ (insert-guix-revision-lint-checkers conn
+ guix-revision-id
+ lint-checker-ids)
+
+ (chunk-for-each!
+ (lambda (lint-warning-ids-chunk)
+ (insert-guix-revision-lint-warnings conn
+ guix-revision-id
+ lint-warning-ids-chunk))
+ 5000
+ lint-warning-ids)))
+
+ (when inferior-system-tests
+ (insert-system-tests-for-guix-revision conn
+ guix-revision-id
+ inferior-system-tests))
+
+ (let* ((package-derivation-ids
+ (with-time-logging "inferior-data->package-derivation-ids"
+ (inferior-data->package-derivation-ids
+ conn
+ inf
+ package-ids
+ inferior-packages-system-and-target-to-derivations-alist)))
+ (ids-count
+ (length package-derivation-ids)))
+ (chunk-for-each! (lambda (package-derivation-ids-chunk)
+ (insert-guix-revision-package-derivations
+ conn
+ guix-revision-id
+ package-derivation-ids-chunk))
+ 2000
+ package-derivation-ids)
(simple-format
- #t "debug: finished loading information from inferior\n")
- (close-inferior inf)
-
- (with-time-logging
- "acquiring advisory transaction lock: load-new-guix-revision-inserts"
- ;; Wait until this is the only transaction inserting data, to
- ;; avoid any concurrency issues
- (obtain-advisory-transaction-lock conn
- 'load-new-guix-revision-inserts))
- (with-time-logging
- "inserting data"
- (let* ((package-ids
- (insert-packages conn packages-data)))
- (when inferior-lint-warnings
- (let* ((lint-checker-ids
- (lint-checkers->lint-checker-ids
- conn
- (map (match-lambda
- ((name descriptions-by-locale network-dependent)
- (list
- name
- network-dependent
- (lint-checker-description-data->lint-checker-description-set-id
- conn descriptions-by-locale))))
- inferior-lint-checkers-data)))
- (lint-warning-ids
- (insert-lint-warnings
- conn
- package-ids
- lint-checker-ids
- inferior-lint-warnings-data)))
- (insert-guix-revision-lint-checkers conn
- guix-revision-id
- lint-checker-ids)
-
- (chunk-for-each!
- (lambda (lint-warning-ids-chunk)
- (insert-guix-revision-lint-warnings conn
- guix-revision-id
- lint-warning-ids-chunk))
- 5000
- lint-warning-ids)))
-
- (when inferior-system-tests
- (insert-system-tests-for-guix-revision conn
- guix-revision-id
- inferior-system-tests))
-
- (let* ((package-derivation-ids
- (with-time-logging "inferior-data->package-derivation-ids"
- (inferior-data->package-derivation-ids
- conn
- inf
- package-ids
- inferior-packages-system-and-target-to-derivations-alist)))
- (ids-count
- (length package-derivation-ids)))
- (chunk-for-each! (lambda (package-derivation-ids-chunk)
- (insert-guix-revision-package-derivations
- conn
- guix-revision-id
- package-derivation-ids-chunk))
- 2000
- package-derivation-ids)
- (simple-format
- #t "Successfully loaded ~A package/derivation pairs\n"
- ids-count))
+ #t "Successfully loaded ~A package/derivation pairs\n"
+ ids-count))
- (with-time-logging
- "insert-guix-revision-package-derivation-distribution-counts"
- (insert-guix-revision-package-derivation-distribution-counts
- conn
- guix-revision-id)))))
- #t)
- (lambda (key . args)
- (simple-format (current-error-port)
- "Failed extracting information from commit: ~A\n\n" commit)
- (simple-format (current-error-port)
- " ~A ~A\n\n" key args)
- #f)
- (lambda (key . args)
- (display-backtrace (make-stack #t) (current-error-port))))))
+ (with-time-logging
+ "insert-guix-revision-package-derivation-distribution-counts"
+ (insert-guix-revision-package-derivation-distribution-counts
+ conn
+ guix-revision-id))))))
(prevent-inlining-for-tests extract-information-from)
@@ -1409,7 +1558,7 @@
(prevent-inlining-for-tests load-channel-instances)
(define* (load-new-guix-revision conn store git-repository-id commit
- #:key skip-system-tests?)
+ #:key skip-system-tests? parallelism)
(let* ((git-repository-fields
(select-git-repository conn git-repository-id))
(git-repository-url
@@ -1421,10 +1570,12 @@
(url git-repository-url)
(commit commit)))
(source-and-channel-derivations-by-system
- (channel->source-and-derivations-by-system conn
- store
- channel-for-commit
- fetch-with-authentication?))
+ (channel->source-and-derivations-by-system
+ conn
+ store
+ channel-for-commit
+ fetch-with-authentication?
+ #:parallelism parallelism))
(guix-source
(car source-and-channel-derivations-by-system))
(channel-derivations-by-system
@@ -1442,7 +1593,8 @@
guix-revision-id
commit guix-source store-item
#:skip-system-tests?
- skip-system-tests?)
+ skip-system-tests?
+ #:parallelism parallelism)
(if (defined? 'channel-news-for-commit
(resolve-module '(guix channels)))
@@ -1817,13 +1969,15 @@ SKIP LOCKED")
(define (with-store-connection f)
(with-store store
+ (ensure-non-blocking-store-connection store)
(set-build-options store #:fallback? #t)
(f store)))
(prevent-inlining-for-tests with-store-connection)
-(define* (process-load-new-guix-revision-job id #:key skip-system-tests?)
+(define* (process-load-new-guix-revision-job id #:key skip-system-tests?
+ parallelism)
(with-postgresql-connection
(simple-format #f "load-new-guix-revision ~A" id)
(lambda (conn)
@@ -1860,7 +2014,8 @@ SKIP LOCKED")
store
git-repository-id
commit
- #:skip-system-tests? #t))))
+ #:skip-system-tests? #t
+ #:parallelism parallelism))))
(lambda (key . args)
(simple-format (current-error-port)
"error: load-new-guix-revision: ~A ~A\n"
diff --git a/scripts/guix-data-service-process-job.in b/scripts/guix-data-service-process-job.in
index fd7ab3f..633d8db 100644
--- a/scripts/guix-data-service-process-job.in
+++ b/scripts/guix-data-service-process-job.in
@@ -78,7 +78,8 @@
(lambda ()
(process-load-new-guix-revision-job
job
- #:skip-system-tests? (assq-ref opts 'skip-system-tests)))
+ #:skip-system-tests? (assq-ref opts 'skip-system-tests)
+ #:parallelism (assq-ref opts 'parallelism)))
#:hz 0
- #:parallelism (assq-ref opts 'parallelism)
+ #:parallelism 1
#:drain? #t)))))
diff --git a/scripts/guix-data-service-process-jobs.in b/scripts/guix-data-service-process-jobs.in
index 6ad1ec9..da4f614 100644
--- a/scripts/guix-data-service-process-jobs.in
+++ b/scripts/guix-data-service-process-jobs.in
@@ -44,11 +44,17 @@
result)))
(option '("skip-system-tests") #f #f
(lambda (opt name _ result)
- (alist-cons 'skip-system-tests #t result)))))
+ (alist-cons 'skip-system-tests #t result)))
+ (option '("per-job-parallelism") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'per-job-parallelism
+ (string->number arg)
+ result)))))
(define %default-options
;; Alist of default option values
- `((max-processes . ,default-max-processes)))
+ `((max-processes . ,default-max-processes)
+ (per-job-parallelism . 1)))
(define (parse-options args)
(args-fold
@@ -77,4 +83,6 @@
(or (assq-ref opts 'latest-branch-revision-max-processes)
(* 2 (assq-ref opts 'max-processes)))
#:skip-system-tests?
- (assq-ref opts 'skip-system-tests)))))
+ (assq-ref opts 'skip-system-tests)
+ #:per-job-parallelism
+ (assq-ref opts 'per-job-parallelism)))))