aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2024-10-27 14:02:57 +0000
committerChristopher Baines <mail@cbaines.net>2024-10-27 14:39:52 +0000
commitc650fc6e7a4445abb45c88c8df7267799bd3f591 (patch)
tree0c5d69608ef2afaece5a055e2752495e8b64ddb9
parent1e0407e9b6869dbe893378493001fbd925440b99 (diff)
downloaddata-service-c650fc6e7a4445abb45c88c8df7267799bd3f591.tar
data-service-c650fc6e7a4445abb45c88c8df7267799bd3f591.tar.gz
Rework inserting derivations
To add more parallelism.
-rw-r--r--guix-data-service/jobs/load-new-guix-revision.scm397
1 files changed, 213 insertions, 184 deletions
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm
index 1a4c575..93b24b8 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -929,25 +929,25 @@
(define (update-derivation-ids-hash-table! conn
derivation-ids-hash-table
- file-names)
- (define file-names-count (vector-length file-names))
+ derivations)
+ (define derivations-count (length derivations))
(simple-format #t "debug: update-derivation-ids-hash-table!: ~A file-names\n"
- file-names-count)
+ derivations-count)
(let ((missing-file-names
- (vector-fold
- (lambda (_ result file-name)
- (if (and file-name
- (hash-ref derivation-ids-hash-table
- file-name))
+ (fold
+ (lambda (drv result)
+ (if (hash-ref derivation-ids-hash-table
+ (derivation-file-name drv))
result
- (cons file-name result)))
+ (cons (derivation-file-name drv)
+ result)))
'()
- file-names)))
+ derivations)))
(simple-format
#t "debug: update-derivation-ids-hash-table!: lookup ~A file-names, ~A not cached\n"
- file-names-count (length missing-file-names))
+ derivations-count (length missing-file-names))
(unless (null? missing-file-names)
(for-each
@@ -964,42 +964,26 @@
(define (insert-missing-derivations postgresql-connection-pool
utility-thread-channel
derivation-ids-hash-table
- derivations)
+ unfiltered-derivations)
(define (ensure-input-derivations-exist input-derivation-file-names)
(unless (null? input-derivation-file-names)
- (simple-format
- #t "debug: ensure-input-derivations-exist: processing ~A derivations\n"
- (length input-derivation-file-names))
-
- (with-resource-from-pool postgresql-connection-pool conn
- (update-derivation-ids-hash-table! conn
- derivation-ids-hash-table
- (list->vector
- input-derivation-file-names)))
- (simple-format
- #t
- "debug: ensure-input-derivations-exist: checking for missing input derivations\n")
- (let ((missing-derivations-filenames
- (remove (lambda (derivation-file-name)
- (hash-ref derivation-ids-hash-table
- derivation-file-name))
- input-derivation-file-names)))
-
- (unless (null? missing-derivations-filenames)
- (simple-format
- #f
- "debug: ensure-input-derivations-exist: inserting missing input derivations\n")
- ;; Ensure all the input derivations exist
- (insert-missing-derivations
- postgresql-connection-pool
+ ;; Ensure all the input derivations exist
+ (for-each
+ (lambda (chunk)
+ (simple-format
+ #t "debug: ensure-input-derivations-exist: processing ~A derivations\n"
+ (length chunk))
+
+ (insert-missing-derivations
+ postgresql-connection-pool
+ utility-thread-channel
+ derivation-ids-hash-table
+ (call-with-worker-thread
utility-thread-channel
- derivation-ids-hash-table
- (call-with-worker-thread
- utility-thread-channel
- (lambda ()
- (map read-derivation-from-file
- missing-derivations-filenames))))))))
+ (lambda ()
+ (map read-derivation-from-file chunk)))))
+ (chunk! input-derivation-file-names 1000))))
(define (insert-into-derivations conn drvs)
(string-append
@@ -1030,121 +1014,155 @@
(with-time-logging
(simple-format
#f "insert-missing-derivations: inserting ~A derivations"
- (length derivations))
- (let* ((chunks (chunk derivations 500))
- (derivation-ids
- (with-resource-from-pool postgresql-connection-pool conn
- (append-map!
- (lambda (chunk)
- (map (lambda (result)
- (string->number (car result)))
- (exec-query conn (insert-into-derivations conn chunk))))
- chunks))))
-
- (with-time-logging
- "insert-missing-derivations: updating hash table"
- (for-each (lambda (derivation derivation-id)
- (hash-set! derivation-ids-hash-table
- (derivation-file-name derivation)
- derivation-id))
- derivations
- derivation-ids))
-
- (with-time-logging
- "insert-missing-derivations: inserting sources"
- (for-each
- (lambda (derivation-id derivation)
- (let ((sources (derivation-sources derivation)))
- (unless (null? sources)
- (let ((sources-ids
- (with-resource-from-pool postgresql-connection-pool conn
- (insert-derivation-sources conn
- derivation-id
- sources))))
- (par-map&
- (lambda (id source-file)
- (match
- (with-resource-from-pool postgresql-connection-pool conn
- (exec-query
- conn
- "
-SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
- (list (number->string id))))
- (()
- (let ((nar-bytevector
- (call-with-worker-thread
- utility-thread-channel
- (lambda ()
- (call-with-values
- (lambda ()
- (open-bytevector-output-port))
- (lambda (port get-bytevector)
- (unless (file-exists? source-file)
- (raise-exception
- (make-missing-store-item-error
- source-file)))
- (write-file source-file port)
- (get-bytevector)))))))
- (letpar&
- ((compressed-nar-bytevector
- (call-with-worker-thread
- utility-thread-channel
- (lambda ()
- (call-with-values
- (lambda ()
- (open-bytevector-output-port))
- (lambda (port get-bytevector)
- (call-with-lzip-output-port port
- (lambda (port)
- (put-bytevector port nar-bytevector))
- #:level 9)
- (get-bytevector))))))
- (hash
- (call-with-worker-thread
- utility-thread-channel
- (lambda ()
- (bytevector->nix-base32-string
- (sha256 nar-bytevector)))))
- (uncompressed-size (bytevector-length nar-bytevector)))
-
+ (length unfiltered-derivations))
+ (let ((derivations
+ derivation-ids
+ (with-resource-from-pool postgresql-connection-pool conn
+ (update-derivation-ids-hash-table! conn
+ derivation-ids-hash-table
+ unfiltered-derivations)
+
+ (let ((derivations
+ ;; Do this while holding the PostgreSQL connection to
+ ;; avoid conflicts with other fibers
+ (filter-map (lambda (derivation)
+ (if (hash-ref derivation-ids-hash-table
+ (derivation-file-name
+ derivation))
+ #f
+ derivation))
+ unfiltered-derivations)))
+ (if (null? derivations)
+ (values '() '())
+ (let ((derivation-ids
+ (append-map!
+ (lambda (chunk)
+ (map (lambda (result)
+ (string->number (car result)))
+ (exec-query conn (insert-into-derivations conn chunk))))
+ (chunk derivations 500))))
+
+ ;; Do this while holding the connection so that other
+ ;; fibers don't also try inserting the same derivations
+ (with-time-logging
+ "insert-missing-derivations: updating hash table"
+ (for-each (lambda (derivation derivation-id)
+ (hash-set! derivation-ids-hash-table
+ (derivation-file-name derivation)
+ derivation-id))
+ derivations
+ derivation-ids))
+
+ (values derivations
+ derivation-ids)))))))
+
+ (unless (null? derivations)
+ (parallel-via-fibers
+ (with-time-logging
+ "insert-missing-derivations: inserting sources"
+ (fibers-for-each
+ (lambda (derivation-id derivation)
+ (let ((sources (derivation-sources derivation)))
+ (unless (null? sources)
+ (let ((sources-ids
+ (with-resource-from-pool postgresql-connection-pool conn
+ (insert-derivation-sources conn
+ derivation-id
+ sources))))
+ (par-map&
+ (lambda (id source-file)
+ (when
(with-resource-from-pool postgresql-connection-pool conn
- (insert-derivation-source-file-nar
- conn
- id
- hash
- compressed-nar-bytevector
- uncompressed-size)))))
- (_ #f)))
- sources-ids
- sources)))))
- derivation-ids
- derivations))
-
- (with-resource-from-pool postgresql-connection-pool conn
- (with-time-logging
- "insert-missing-derivations: inserting outputs"
- (for-each (lambda (derivation-id derivation)
- (insert-derivation-outputs conn
- derivation-id
- (derivation-outputs derivation)))
- derivation-ids
- derivations)))
+ (match
+ (exec-query
+ conn
+ "
+SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
+ (list (number->string id)))
+ (()
+ ;; Insert a placeholder to avoid other fibers
+ ;; working on this source file
+ (insert-placeholder-derivation-source-file-nar
+ conn
+ id)
+ #t)
+ (_ #f)))
+ (let ((nar-bytevector
+ (call-with-worker-thread
+ utility-thread-channel
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (open-bytevector-output-port))
+ (lambda (port get-bytevector)
+ (unless (file-exists? source-file)
+ (raise-exception
+ (make-missing-store-item-error
+ source-file)))
+ (write-file source-file port)
+ (let ((res (get-bytevector)))
+ (close-port port) ; maybe reduces memory?
+ res)))))))
+ (letpar&
+ ((compressed-nar-bytevector
+ (call-with-worker-thread
+ utility-thread-channel
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (open-bytevector-output-port))
+ (lambda (port get-bytevector)
+ (call-with-lzip-output-port port
+ (lambda (port)
+ (put-bytevector port nar-bytevector))
+ #:level 9)
+ (let ((res (get-bytevector)))
+ (close-port port) ; maybe reduces memory?
+ res))))))
+ (hash
+ (call-with-worker-thread
+ utility-thread-channel
+ (lambda ()
+ (bytevector->nix-base32-string
+ (sha256 nar-bytevector)))))
+ (uncompressed-size (bytevector-length nar-bytevector)))
+ (with-resource-from-pool postgresql-connection-pool conn
+ (update-derivation-source-file-nar
+ conn
+ id
+ hash
+ compressed-nar-bytevector
+ uncompressed-size))))))
+ sources-ids
+ sources)))))
+ derivation-ids
+ derivations))
- (with-time-logging
- "insert-missing-derivations: ensure-input-derivations-exist"
- (ensure-input-derivations-exist (deduplicate-strings
- (map derivation-input-path
- (append-map derivation-inputs
- derivations)))))
+ (with-resource-from-pool postgresql-connection-pool conn
+ (with-time-logging
+ "insert-missing-derivations: inserting outputs"
+ (for-each (lambda (derivation-id derivation)
+ (insert-derivation-outputs conn
+ derivation-id
+ (derivation-outputs derivation)))
+ derivation-ids
+ derivations)))
+
+ (with-time-logging
+ "insert-missing-derivations: ensure-input-derivations-exist"
+ (ensure-input-derivations-exist (deduplicate-strings
+ (map derivation-input-path
+ (append-map derivation-inputs
+ derivations))))))
- (with-resource-from-pool postgresql-connection-pool conn
- (with-time-logging
- (simple-format
- #f "insert-missing-derivations: inserting inputs for ~A derivations"
- (length derivations))
- (insert-derivation-inputs conn
- derivation-ids
- derivations))))))
+ (with-resource-from-pool postgresql-connection-pool conn
+ (with-time-logging
+ (simple-format
+ #f "insert-missing-derivations: inserting inputs for ~A derivations"
+ (length derivations))
+ (insert-derivation-inputs conn
+ derivation-ids
+ derivations)))))))
(define (derivation-file-names->derivation-ids postgresql-connection-pool
utility-thread-channel
@@ -1160,11 +1178,6 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
#t "debug: derivation-file-names->derivation-ids: processing ~A derivations\n"
derivations-count)
- (with-resource-from-pool postgresql-connection-pool conn
- (update-derivation-ids-hash-table! conn
- derivation-ids-hash-table
- derivation-file-names))
-
(let* ((missing-derivation-filenames
(deduplicate-strings
(vector-fold
@@ -1192,36 +1205,52 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(chunk! missing-derivation-filenames 1000))))
(for-each
- (lambda (missing-derivation-filenames-chunk)
+ (lambda (missing-derivation-chunk-promise)
(let ((missing-derivations-chunk
- ;; Do the filter again, since processing the last chunk
- ;; might have inserted some of the derivations in this
- ;; chunk
- (remove! (lambda (derivation)
- (hash-ref derivation-ids-hash-table
- (derivation-file-name
- derivation)))
- (fibers-force
- missing-derivation-filenames-chunk))))
-
+ (fibers-force
+ missing-derivation-chunk-promise)))
(unless (null? missing-derivations-chunk)
(insert-missing-derivations postgresql-connection-pool
utility-thread-channel
derivation-ids-hash-table
missing-derivations-chunk))))
- missing-derivations-chunked-promises))
-
- (let ((all-ids
- (vector-map
- (lambda (_ derivation-file-name)
- (if derivation-file-name
- (or (hash-ref derivation-ids-hash-table
- derivation-file-name)
- (error "missing derivation id"))
- #f))
- derivation-file-names)))
-
- all-ids))))
+ missing-derivations-chunked-promises)
+
+ (let ((all-ids
+ (vector-map
+ (lambda (_ derivation-file-name)
+ (if derivation-file-name
+ (or (hash-ref derivation-ids-hash-table
+ derivation-file-name)
+ ;; If a derivation ID can't be found, update the
+ ;; hash table then check again
+ (with-resource-from-pool postgresql-connection-pool conn
+ (for-each
+ (lambda (missing-derivations-chunked-promise)
+ (update-derivation-ids-hash-table!
+ conn
+ derivation-ids-hash-table
+ (fibers-force missing-derivations-chunked-promise)))
+ missing-derivations-chunked-promises)
+ (or (hash-ref derivation-ids-hash-table
+ derivation-file-name)
+ (error
+ (simple-format #f "missing derivation id (~A)"
+ derivation-file-name)))))
+ #f))
+ derivation-file-names)))
+
+ (with-resource-from-pool postgresql-connection-pool conn
+ (simple-format
+ (current-error-port)
+ "guix-data-service: clearing the derivation-ids-hash-table\n")
+ (hash-clear! derivation-ids-hash-table))
+
+ ;; Just in case this helps clear memory
+ (for-each fibers-promise-reset
+ missing-derivations-chunked-promises)
+
+ all-ids)))))
(prevent-inlining-for-tests derivation-file-names->derivation-ids)