aboutsummaryrefslogtreecommitdiff
path: root/guix-data-service/jobs
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2024-10-31 16:56:30 +0000
committerChristopher Baines <mail@cbaines.net>2024-10-31 20:04:16 +0000
commite67edf54bc18e552e5b80d12cd25a6ac4c58f789 (patch)
tree86fc05afc086247d38a18408a84b574dbb0fc916 /guix-data-service/jobs
parentaf93bdcf5ed2fafb85267ece1ed4e86f1883a0b2 (diff)
downloaddata-service-e67edf54bc18e552e5b80d12cd25a6ac4c58f789.tar
data-service-e67edf54bc18e552e5b80d12cd25a6ac4c58f789.tar.gz
Finish chasing the call-with-resource-pool bug
This took a while to find as process-job would just get stuck, and this wasn't directly related to any particular change, just that more fibers increased the chance of hitting it. This commit includes lots of the things I changed while debugging.
Diffstat (limited to 'guix-data-service/jobs')
-rw-r--r--guix-data-service/jobs/load-new-guix-revision.scm635
1 files changed, 345 insertions, 290 deletions
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm
index da08566..8852139 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -35,7 +35,9 @@
#:use-module (squee)
#:use-module (gcrypt hash)
#:use-module (fibers)
+ #:use-module (fibers timers)
#:use-module (fibers channels)
+ #:use-module (fibers operations)
#:use-module (guix monads)
#:use-module (guix base32)
#:use-module (guix store)
@@ -117,7 +119,9 @@
(missing-store-item-error-item exn)
thunk)
(when on-exception (on-exception))
- (retry-on-missing-store-item thunk))
+ (retry-on-missing-store-item
+ thunk
+ #:on-exception on-exception))
(raise-exception exn)))
thunk
#:unwind? #t))
@@ -929,21 +933,19 @@
(define (update-derivation-ids-hash-table! conn
derivation-ids-hash-table
- derivations)
- (define derivations-count (length derivations))
+ derivation-file-names)
+ (define derivations-count (vector-length derivation-file-names))
- (simple-format #t "debug: update-derivation-ids-hash-table!: ~A file-names\n"
- derivations-count)
(let ((missing-file-names
- (fold
- (lambda (drv result)
+ (vector-fold
+ (lambda (_ result file-name)
(if (hash-ref derivation-ids-hash-table
- (derivation-file-name drv))
+ file-name)
result
- (cons (derivation-file-name drv)
+ (cons file-name
result)))
'()
- derivations)))
+ derivation-file-names)))
(simple-format
#t "debug: update-derivation-ids-hash-table!: lookup ~A file-names, ~A not cached\n"
@@ -961,29 +963,11 @@
(exec-query conn (select-existing-derivations chunk))))
(chunk! missing-file-names 1000)))))
-(define (insert-missing-derivations postgresql-connection-pool
- utility-thread-channel
- derivation-ids-hash-table
- unfiltered-derivations)
-
- (define (ensure-input-derivations-exist input-derivation-file-names)
- (unless (null? input-derivation-file-names)
- ;; Ensure all the input derivations exist
- (for-each
- (lambda (chunk)
- (simple-format
- #t "debug: ensure-input-derivations-exist: processing ~A derivations\n"
- (length chunk))
-
- (insert-missing-derivations
- postgresql-connection-pool
- utility-thread-channel
- derivation-ids-hash-table
- (call-with-worker-thread
- utility-thread-channel
- (lambda ()
- (map read-derivation-from-file chunk)))))
- (chunk! input-derivation-file-names 1000))))
+(define* (insert-missing-derivations postgresql-connection-pool
+ utility-thread-channel
+ derivation-ids-hash-table
+ unfiltered-derivations
+ #:key (log-tag "unspecified"))
(define (insert-into-derivations conn drvs)
(string-append
@@ -1011,163 +995,203 @@
" RETURNING id"
";"))
- (with-time-logging
- (simple-format
- #f "insert-missing-derivations: inserting ~A derivations"
- (length unfiltered-derivations))
- (let ((derivations
- derivation-ids
- (with-resource-from-pool postgresql-connection-pool conn
- (update-derivation-ids-hash-table! conn
- derivation-ids-hash-table
- unfiltered-derivations)
-
- (let ((derivations
- ;; Do this while holding the PostgreSQL connection to
- ;; avoid conflicts with other fibers
- (filter-map (lambda (derivation)
- (if (hash-ref derivation-ids-hash-table
- (derivation-file-name
- derivation))
- #f
- derivation))
- unfiltered-derivations)))
- (if (null? derivations)
- (values '() '())
- (let ((derivation-ids
- (append-map!
- (lambda (chunk)
- (map (lambda (result)
- (string->number (car result)))
- (exec-query conn (insert-into-derivations conn chunk))))
- (chunk derivations 500))))
-
- ;; Do this while holding the connection so that other
- ;; fibers don't also try inserting the same derivations
- (with-time-logging
- "insert-missing-derivations: updating hash table"
- (for-each (lambda (derivation derivation-id)
- (hash-set! derivation-ids-hash-table
- (derivation-file-name derivation)
- derivation-id))
- derivations
- derivation-ids))
-
- (values derivations
- derivation-ids)))))))
-
- (unless (null? derivations)
- (parallel-via-fibers
- (with-time-logging
- "insert-missing-derivations: inserting sources"
- (fibers-for-each
- (lambda (derivation-id derivation)
- (let ((sources (derivation-sources derivation)))
- (unless (null? sources)
- (let ((sources-ids
- (with-resource-from-pool postgresql-connection-pool conn
- (insert-derivation-sources conn
- derivation-id
- sources))))
- (par-map&
- (lambda (id source-file)
- (when
- (with-resource-from-pool postgresql-connection-pool conn
- (match
- (exec-query
- conn
- "
+ (define (insert-derivations)
+ (with-resource-from-pool postgresql-connection-pool conn
+ (update-derivation-ids-hash-table!
+ conn
+ derivation-ids-hash-table
+ (let ((file-names-vector
+ (make-vector (length unfiltered-derivations))))
+ (for-each
+ (lambda (i drv)
+ (vector-set! file-names-vector
+ i
+ (derivation-file-name drv)))
+ (iota (vector-length file-names-vector))
+ unfiltered-derivations)
+ file-names-vector))
+
+ (let ((derivations
+ ;; Do this while holding the PostgreSQL connection to
+ ;; avoid conflicts with other fibers
+ (delete-duplicates
+ (filter-map (lambda (derivation)
+ (if (hash-ref derivation-ids-hash-table
+ (derivation-file-name
+ derivation))
+ #f
+ derivation))
+ unfiltered-derivations))))
+ (if (null? derivations)
+ (values '() '())
+ (begin
+ (simple-format
+ (current-error-port)
+ "insert-missing-derivations: inserting ~A derivations (~A)\n"
+ (length unfiltered-derivations)
+ log-tag)
+ (let ((derivation-ids
+ (append-map!
+ (lambda (chunk)
+ (map (lambda (result)
+ (string->number (car result)))
+ (exec-query conn (insert-into-derivations conn chunk))))
+ (chunk derivations 500))))
+
+ ;; Do this while holding the connection so that other
+ ;; fibers don't also try inserting the same derivations
+ (with-time-logging
+ (string-append "insert-missing-derivations: updating hash table (" log-tag ")")
+ (for-each (lambda (derivation derivation-id)
+ (hash-set! derivation-ids-hash-table
+ (derivation-file-name derivation)
+ derivation-id))
+ derivations
+ derivation-ids))
+
+ (simple-format
+ (current-error-port)
+ "insert-missing-derivations: finished inserting ~A derivations (~A)\n"
+ (length unfiltered-derivations)
+ log-tag)
+
+ (values derivations
+ derivation-ids)))))))
+
+ (define (insert-sources derivations derivation-ids)
+ (with-time-logging
+ (string-append "insert-missing-derivations: inserting sources (" log-tag ")")
+ (fibers-for-each
+ (lambda (derivation-id derivation)
+ (let ((sources (derivation-sources derivation)))
+ (unless (null? sources)
+ (let ((sources-ids
+ (with-resource-from-pool postgresql-connection-pool conn
+ (insert-derivation-sources conn
+ derivation-id
+ sources))))
+ (fibers-for-each
+ (lambda (id source-file)
+ (when
+ (with-resource-from-pool postgresql-connection-pool conn
+ (match
+ (exec-query
+ conn
+ "
SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
- (list (number->string id)))
- (()
- ;; Insert a placeholder to avoid other fibers
- ;; working on this source file
- (insert-placeholder-derivation-source-file-nar
- conn
- id)
- #t)
- (_ #f)))
- (let ((nar-bytevector
- (call-with-worker-thread
- utility-thread-channel
- (lambda ()
- (call-with-values
- (lambda ()
- (open-bytevector-output-port))
- (lambda (port get-bytevector)
- (unless (file-exists? source-file)
- (raise-exception
- (make-missing-store-item-error
- source-file)))
- (write-file source-file port)
- (let ((res (get-bytevector)))
- (close-port port) ; maybe reduces memory?
- res)))))))
- (letpar&
- ((compressed-nar-bytevector
- (call-with-worker-thread
- utility-thread-channel
- (lambda ()
- (call-with-values
- (lambda ()
- (open-bytevector-output-port))
- (lambda (port get-bytevector)
- (call-with-lzip-output-port port
- (lambda (port)
- (put-bytevector port nar-bytevector))
- #:level 9)
- (let ((res (get-bytevector)))
- (close-port port) ; maybe reduces memory?
- res))))))
- (hash
- (call-with-worker-thread
- utility-thread-channel
+ (list (number->string id)))
+ (()
+ ;; Insert a placeholder to avoid other fibers
+ ;; working on this source file
+ (insert-placeholder-derivation-source-file-nar
+ conn
+ id)
+ #t)
+ (_ #f)))
+ ;; Use the utility-thread-channel to control concurrency here,
+ ;; to avoid using too much memory
+ (call-with-worker-thread
+ utility-thread-channel
+ (lambda ()
+ (let ((nar-bytevector
+ (call-with-values
(lambda ()
- (bytevector->nix-base32-string
- (sha256 nar-bytevector)))))
- (uncompressed-size (bytevector-length nar-bytevector)))
- (with-resource-from-pool postgresql-connection-pool conn
- (update-derivation-source-file-nar
- conn
- id
- hash
- compressed-nar-bytevector
- uncompressed-size))))))
- sources-ids
- sources)))))
- derivation-ids
- derivations))
-
+ (open-bytevector-output-port))
+ (lambda (port get-bytevector)
+ (unless (file-exists? source-file)
+ (raise-exception
+ (make-missing-store-item-error
+ source-file)))
+ (write-file source-file port)
+ (let ((res (get-bytevector)))
+ (close-port port) ; maybe reduces memory?
+ res)))))
+ (let ((compressed-nar-bytevector
+ (call-with-values
+ (lambda ()
+ (open-bytevector-output-port))
+ (lambda (port get-bytevector)
+ (call-with-lzip-output-port port
+ (lambda (port)
+ (put-bytevector port nar-bytevector))
+ #:level 9)
+ (let ((res (get-bytevector)))
+ (close-port port) ; maybe reduces memory?
+ res))))
+ (hash
+ (bytevector->nix-base32-string
+ (sha256 nar-bytevector)))
+ (uncompressed-size
+ (bytevector-length nar-bytevector)))
+ (with-resource-from-pool postgresql-connection-pool conn
+ (update-derivation-source-file-nar
+ conn
+ id
+ hash
+ compressed-nar-bytevector
+ uncompressed-size))))))))
+ sources-ids
+ sources)))))
+ derivation-ids
+ derivations)))
+
+ (let ((derivations
+ derivation-ids
+ (insert-derivations)))
+
+ (unless (null? derivations)
+ (parallel-via-fibers
+ (insert-sources derivations
+ derivation-ids)
+ (with-time-logging
+ (string-append "insert-missing-derivations: inserting outputs ("
+ log-tag ")")
(with-resource-from-pool postgresql-connection-pool conn
- (with-time-logging
- "insert-missing-derivations: inserting outputs"
- (for-each (lambda (derivation-id derivation)
- (insert-derivation-outputs conn
- derivation-id
- (derivation-outputs derivation)))
- derivation-ids
- derivations)))
-
- (with-time-logging
- "insert-missing-derivations: ensure-input-derivations-exist"
- (ensure-input-derivations-exist (deduplicate-strings
- (map derivation-input-path
- (append-map derivation-inputs
- derivations))))))
-
- (with-resource-from-pool postgresql-connection-pool conn
- (with-time-logging
- (simple-format
- #f "insert-missing-derivations: inserting inputs for ~A derivations"
- (length derivations))
- (insert-derivation-inputs conn
- derivation-ids
- derivations)))))))
-
-(define (derivation-file-names->derivation-ids postgresql-connection-pool
- utility-thread-channel
- derivation-ids-hash-table
- derivation-file-names)
+ (for-each (lambda (derivation-id derivation)
+ (insert-derivation-outputs conn
+ derivation-id
+ (derivation-outputs derivation)))
+ derivation-ids
+ derivations)))
+
+ (with-time-logging
+ (string-append
+ "insert-missing-derivations: ensure-input-derivations-exist ("
+ log-tag ")")
+ (let ((input-derivations
+ (map
+ derivation-input-derivation
+ (append-map derivation-inputs
+ derivations))))
+ (unless (null? input-derivations)
+ ;; Ensure all the input derivations exist
+ (for-each
+ (lambda (chunk)
+ (insert-missing-derivations
+ postgresql-connection-pool
+ utility-thread-channel
+ derivation-ids-hash-table
+ chunk
+ #:log-tag log-tag))
+ (chunk! input-derivations 1000))))))
+
+ (string-append "insert-missing-derivations: done parallel (" log-tag ")")
+ (with-resource-from-pool postgresql-connection-pool conn
+ (with-time-logging
+ (simple-format
+ #f "insert-missing-derivations: inserting inputs for ~A derivations (~A)"
+ (length derivations)
+ log-tag)
+ (insert-derivation-inputs conn
+ derivation-ids
+ derivations))))))
+
+(define* (derivation-file-names->derivation-ids postgresql-connection-pool
+ utility-thread-channel
+ read-derivations/fiberized
+ derivation-ids-hash-table
+ derivation-file-names
+ #:key (log-tag "unspecified"))
(define derivations-count
(vector-length derivation-file-names))
@@ -1175,8 +1199,9 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
#()
(begin
(simple-format
- #t "debug: derivation-file-names->derivation-ids: processing ~A derivations\n"
- derivations-count)
+ #t "debug: derivation-file-names->derivation-ids: processing ~A derivations (~A)\n"
+ derivations-count
+ log-tag)
(let* ((missing-derivation-filenames
(deduplicate-strings
@@ -1189,32 +1214,24 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
result
(cons derivation-file-name result))))
'()
- derivation-file-names)))
- (missing-derivations-chunked-promises
- (map
- (lambda (chunk)
- (fibers-delay
- (lambda ()
- (map (lambda (filename)
- (if (file-exists? filename)
- (read-derivation-from-file filename)
- (raise-exception
- (make-missing-store-item-error
- filename))))
- chunk))))
- (chunk! missing-derivation-filenames 1000))))
-
- (for-each
- (lambda (missing-derivation-chunk-promise)
- (let ((missing-derivations-chunk
- (fibers-force
- missing-derivation-chunk-promise)))
- (unless (null? missing-derivations-chunk)
+ derivation-file-names))))
+ (let ((chunks (chunk! missing-derivation-filenames 1000)))
+ (for-each
+ (lambda (i missing-derivation-file-names-chunk)
+ (let ((missing-derivations-chunk
+ (read-derivations/fiberized
+ missing-derivation-file-names-chunk)))
+ (simple-format
+ #t "debug: derivation-file-names->derivation-ids: processing chunk ~A (~A)\n"
+ i
+ log-tag)
(insert-missing-derivations postgresql-connection-pool
utility-thread-channel
derivation-ids-hash-table
- missing-derivations-chunk))))
- missing-derivations-chunked-promises)
+ missing-derivations-chunk
+ #:log-tag log-tag)))
+ (iota (length chunks))
+ chunks))
(let ((all-ids
(vector-map
@@ -1222,34 +1239,12 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(if derivation-file-name
(or (hash-ref derivation-ids-hash-table
derivation-file-name)
- ;; If a derivation ID can't be found, update the
- ;; hash table then check again
- (with-resource-from-pool postgresql-connection-pool conn
- (for-each
- (lambda (missing-derivations-chunked-promise)
- (update-derivation-ids-hash-table!
- conn
- derivation-ids-hash-table
- (fibers-force missing-derivations-chunked-promise)))
- missing-derivations-chunked-promises)
- (or (hash-ref derivation-ids-hash-table
- derivation-file-name)
(error
(simple-format #f "missing derivation id (~A)"
- derivation-file-name)))))
+ derivation-file-name)))
#f))
derivation-file-names)))
- (with-resource-from-pool postgresql-connection-pool conn
- (simple-format
- (current-error-port)
- "guix-data-service: clearing the derivation-ids-hash-table\n")
- (hash-clear! derivation-ids-hash-table))
-
- ;; Just in case this helps clear memory
- (for-each fibers-promise-reset
- missing-derivations-chunked-promises)
-
all-ids)))))
(prevent-inlining-for-tests derivation-file-names->derivation-ids)
@@ -1489,6 +1484,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(cons inferior inferior-store)))
parallelism
#:min-size 0
+ #:name "inferior"
#:idle-seconds 30
#:destructor (match-lambda
((inferior . store)
@@ -1501,7 +1497,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(inferior-eval '(@ (guix packages) %supported-systems)
inferior)))))
(result
- (par-map&
+ (fibers-map
(lambda (system)
(with-resource-from-pool inferior-and-store-pool res
(match res
@@ -1725,6 +1721,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
guix-source store-item
guix-derivation
utility-thread-channel
+ read-derivations/fiberized
derivation-ids-hash-table
#:key skip-system-tests?
extra-inferior-environment-variables
@@ -1776,6 +1773,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
parallelism
#:min-size 0
#:idle-seconds 20
+ #:name "inferior"
#:destructor
(match-lambda
((inferior . store)
@@ -1853,6 +1851,8 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
'load-new-guix-revision-inserts))
db-conn)
1
+ #:name "postgres"
+ #:assume-reliable-waiters? #t
#:min-size 0))
(define package-ids-promise
@@ -1892,7 +1892,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
conn descriptions-by-locale))))
inferior-lint-checkers-data))))
(lint-warnings-data
- (par-map&
+ (fibers-map
(match-lambda
((checker-name _ network-dependent?)
(and (and (not network-dependent?)
@@ -1978,8 +1978,10 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(derivation-file-names->derivation-ids
postgresql-connection-pool
utility-thread-channel
+ read-derivations/fiberized
derivation-ids-hash-table
- derivations-vector)))
+ derivations-vector
+ #:log-tag (simple-format #f "~A:~A" system target))))
(guix-revision-id
(fibers-force guix-revision-id-promise))
(package-ids (fibers-force package-ids-promise))
@@ -2013,75 +2015,93 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(fibers-force guix-revision-id-promise)
(number->string
(system->system-id conn system))
- (or target "")))))
+ (or target ""))))
+
+ 'finished)
(let ((get-derivations/fiberized
(fiberize get-derivations
;; Limit concurrency here to keep focused on specific
;; systems until they've been fully processed
#:parallelism parallelism)))
- (par-map&
- (match-lambda
- ((system . target)
- (retry-on-missing-store-item
- (lambda ()
- (process-system-and-target system target
- get-derivations/fiberized)))))
- (call-with-inferior
- (lambda (inferior inferior-store)
- (inferior-fetch-system-target-pairs inferior))))))
+ (with-time-logging "extract-and-store-package-derivations"
+ (fibers-map-with-progress
+ (match-lambda
+ ((system . target)
+ (retry-on-missing-store-item
+ (lambda ()
+ (process-system-and-target system target
+ get-derivations/fiberized)))))
+ (list
+ (call-with-inferior
+ (lambda (inferior inferior-store)
+ (inferior-fetch-system-target-pairs inferior))))
+ #:report
+ (lambda (data)
+ (for-each
+ (match-lambda
+ ((result (system . target))
+ (simple-format #t "~A ~A: ~A\n"
+ system target result)))
+ data))))))
(define (extract-and-store-system-tests)
(if skip-system-tests?
(begin
(simple-format #t "debug: skipping system tests\n")
'())
- (let ((data-with-derivation-file-names
- (call-with-inferior
- (lambda (inferior inferior-store)
- (with-time-logging "getting inferior system tests"
- (all-inferior-system-tests
- inferior
- inferior-store
- guix-source
- commit))))))
- (when data-with-derivation-file-names
- (let ((data-with-derivation-ids
- (map (match-lambda
- ((name description derivation-file-names-by-system location-data)
- (list name
- description
- (let ((systems
- (map car derivation-file-names-by-system))
- (derivation-ids
- (derivation-file-names->derivation-ids
- postgresql-connection-pool
- utility-thread-channel
- derivation-ids-hash-table
- (list->vector
- (map cdr derivation-file-names-by-system)))))
- (map cons systems derivation-ids))
- location-data)))
- data-with-derivation-file-names)))
- (with-resource-from-pool postgresql-connection-pool conn
- (insert-system-tests-for-guix-revision
- conn
- (fibers-force guix-revision-id-promise)
- data-with-derivation-ids)))))))
+ (with-time-logging "extract-and-store-system-tests"
+ (let ((data-with-derivation-file-names
+ (call-with-inferior
+ (lambda (inferior inferior-store)
+ (with-time-logging "getting inferior system tests"
+ (all-inferior-system-tests
+ inferior
+ inferior-store
+ guix-source
+ commit))))))
+ (when data-with-derivation-file-names
+ (let ((data-with-derivation-ids
+ (map (match-lambda
+ ((name description derivation-file-names-by-system location-data)
+ (list name
+ description
+ (let ((systems
+ (map car derivation-file-names-by-system))
+ (derivation-ids
+ (derivation-file-names->derivation-ids
+ postgresql-connection-pool
+ utility-thread-channel
+ read-derivations/fiberized
+ derivation-ids-hash-table
+ (list->vector
+ (map cdr derivation-file-names-by-system)))))
+ (map cons systems derivation-ids))
+ location-data)))
+ data-with-derivation-file-names)))
+ (with-resource-from-pool postgresql-connection-pool conn
+ (insert-system-tests-for-guix-revision
+ conn
+ (fibers-force guix-revision-id-promise)
+ data-with-derivation-ids))))))))
(with-time-logging
(simple-format #f "extract-information-from: ~A\n" store-item)
(parallel-via-fibers
- (fibers-force package-ids-promise)
+ (begin
+ (fibers-force package-ids-promise)
+ #f)
(extract-and-store-package-derivations)
(retry-on-missing-store-item extract-and-store-system-tests)
- (extract-and-store-lint-checkers-and-warnings)))
+ (with-time-logging "extract-and-store-lint-checkers-and-warnings"
+ (extract-and-store-lint-checkers-and-warnings))))
#t)
(prevent-inlining-for-tests extract-information-from)
(define (load-channel-instances utility-thread-channel
+ read-derivations/fiberized
derivation-ids-hash-table
git-repository-id commit
channel-derivations-by-system)
@@ -2113,6 +2133,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(make-resource-pool
(const channel-instances-conn)
1
+ #:name "postgres"
#:min-size 0)))
(unless existing-guix-revision-id
@@ -2130,6 +2151,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(derivation-file-names->derivation-ids
postgresql-connection-pool
utility-thread-channel
+ read-derivations/fiberized
derivation-ids-hash-table
(list->vector (map cdr derivations-by-system)))))
@@ -2150,15 +2172,34 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
#:key skip-system-tests? parallelism
extra-inferior-environment-variables)
(define utility-thread-channel
- (make-worker-thread-channel
- (const '())
- #:parallelism parallelism))
+ ;; There might be high demand for this, so order the requests
+ (make-queueing-channel
+ (call-with-default-io-waiters
+ (lambda ()
+ (make-worker-thread-channel
+ (const '())
+ #:parallelism parallelism)))))
+
+ (define (read-derivations filenames)
+ (call-with-worker-thread
+ utility-thread-channel
+ (lambda ()
+ (map (lambda (filename)
+ (if (file-exists? filename)
+ (read-derivation-from-file filename)
+ (raise-exception
+ (make-missing-store-item-error
+ filename))))
+ filenames))))
+ (define read-derivations/fiberized
+ (fiberize read-derivations
+ ;; Don't do this in parallel as there's caching involved with
+ ;; read-derivation-from-file
+ #:parallelism 1))
(define derivation-ids-hash-table
(make-hash-table))
- (%worker-thread-default-timeout #f)
-
(let* ((git-repository-fields
(select-git-repository conn git-repository-id))
(git-repository-url
@@ -2188,6 +2229,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
channel-derivations-by-system
(fibers-force channel-derivations-by-system-promise)))
(load-channel-instances utility-thread-channel
+ read-derivations/fiberized
derivation-ids-hash-table
git-repository-id commit
channel-derivations-by-system)))
@@ -2214,6 +2256,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
commit guix-source store-item
guix-derivation
utility-thread-channel
+ read-derivations/fiberized
derivation-ids-hash-table
#:skip-system-tests?
skip-system-tests?
@@ -2633,6 +2676,9 @@ SKIP LOCKED")
(define* (process-load-new-guix-revision-job id #:key skip-system-tests?
extra-inferior-environment-variables
parallelism)
+ (define finished-channel
+ (make-channel))
+
(define result
(with-postgresql-connection
(simple-format #f "load-new-guix-revision ~A" id)
@@ -2642,12 +2688,20 @@ SKIP LOCKED")
;; instances have the data updated.
(fix-derivation-output-details-hash-encoding conn)
+ (%worker-thread-default-timeout #f)
+
+ (resource-pool-retry-checkout-timeout 120)
+
(exec-query conn "BEGIN")
(spawn-fiber
(lambda ()
- (while #t
- (sleep 30)
+ (while (perform-operation
+ (choice-operation
+ (wrap-operation (get-operation finished-channel)
+ (const #f))
+ (wrap-operation (sleep-operation 20)
+ (const #t))))
(let ((stats (gc-stats)))
(simple-format
@@ -2752,4 +2806,5 @@ SKIP LOCKED")
"update-derivation-outputs-statistics"
(update-derivation-outputs-statistics conn))))))
+ (put-message finished-channel #t)
result)