diff options
author | Christopher Baines <mail@cbaines.net> | 2024-10-27 14:02:57 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2024-10-27 14:39:52 +0000 |
commit | c650fc6e7a4445abb45c88c8df7267799bd3f591 (patch) | |
tree | 0c5d69608ef2afaece5a055e2752495e8b64ddb9 | |
parent | 1e0407e9b6869dbe893378493001fbd925440b99 (diff) | |
download | data-service-c650fc6e7a4445abb45c88c8df7267799bd3f591.tar data-service-c650fc6e7a4445abb45c88c8df7267799bd3f591.tar.gz |
Rework inserting derivations
To add more parallelism.
-rw-r--r-- | guix-data-service/jobs/load-new-guix-revision.scm | 397 |
1 files changed, 213 insertions, 184 deletions
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index 1a4c575..93b24b8 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -929,25 +929,25 @@ (define (update-derivation-ids-hash-table! conn derivation-ids-hash-table - file-names) - (define file-names-count (vector-length file-names)) + derivations) + (define derivations-count (length derivations)) (simple-format #t "debug: update-derivation-ids-hash-table!: ~A file-names\n" - file-names-count) + derivations-count) (let ((missing-file-names - (vector-fold - (lambda (_ result file-name) - (if (and file-name - (hash-ref derivation-ids-hash-table - file-name)) + (fold + (lambda (drv result) + (if (hash-ref derivation-ids-hash-table + (derivation-file-name drv)) result - (cons file-name result))) + (cons (derivation-file-name drv) + result))) '() - file-names))) + derivations))) (simple-format #t "debug: update-derivation-ids-hash-table!: lookup ~A file-names, ~A not cached\n" - file-names-count (length missing-file-names)) + derivations-count (length missing-file-names)) (unless (null? missing-file-names) (for-each @@ -964,42 +964,26 @@ (define (insert-missing-derivations postgresql-connection-pool utility-thread-channel derivation-ids-hash-table - derivations) + unfiltered-derivations) (define (ensure-input-derivations-exist input-derivation-file-names) (unless (null? input-derivation-file-names) - (simple-format - #t "debug: ensure-input-derivations-exist: processing ~A derivations\n" - (length input-derivation-file-names)) - - (with-resource-from-pool postgresql-connection-pool conn - (update-derivation-ids-hash-table! conn - derivation-ids-hash-table - (list->vector - input-derivation-file-names))) - (simple-format - #t - "debug: ensure-input-derivations-exist: checking for missing input derivations\n") - (let ((missing-derivations-filenames - (remove (lambda (derivation-file-name) - (hash-ref derivation-ids-hash-table - derivation-file-name)) - input-derivation-file-names))) - - (unless (null? missing-derivations-filenames) - (simple-format - #f - "debug: ensure-input-derivations-exist: inserting missing input derivations\n") - ;; Ensure all the input derivations exist - (insert-missing-derivations - postgresql-connection-pool + ;; Ensure all the input derivations exist + (for-each + (lambda (chunk) + (simple-format + #t "debug: ensure-input-derivations-exist: processing ~A derivations\n" + (length chunk)) + + (insert-missing-derivations + postgresql-connection-pool + utility-thread-channel + derivation-ids-hash-table + (call-with-worker-thread utility-thread-channel - derivation-ids-hash-table - (call-with-worker-thread - utility-thread-channel - (lambda () - (map read-derivation-from-file - missing-derivations-filenames)))))))) + (lambda () + (map read-derivation-from-file chunk))))) + (chunk! input-derivation-file-names 1000)))) (define (insert-into-derivations conn drvs) (string-append @@ -1030,121 +1014,155 @@ (with-time-logging (simple-format #f "insert-missing-derivations: inserting ~A derivations" - (length derivations)) - (let* ((chunks (chunk derivations 500)) - (derivation-ids - (with-resource-from-pool postgresql-connection-pool conn - (append-map! - (lambda (chunk) - (map (lambda (result) - (string->number (car result))) - (exec-query conn (insert-into-derivations conn chunk)))) - chunks)))) - - (with-time-logging - "insert-missing-derivations: updating hash table" - (for-each (lambda (derivation derivation-id) - (hash-set! derivation-ids-hash-table - (derivation-file-name derivation) - derivation-id)) - derivations - derivation-ids)) - - (with-time-logging - "insert-missing-derivations: inserting sources" - (for-each - (lambda (derivation-id derivation) - (let ((sources (derivation-sources derivation))) - (unless (null? sources) - (let ((sources-ids - (with-resource-from-pool postgresql-connection-pool conn - (insert-derivation-sources conn - derivation-id - sources)))) - (par-map& - (lambda (id source-file) - (match - (with-resource-from-pool postgresql-connection-pool conn - (exec-query - conn - " -SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" - (list (number->string id)))) - (() - (let ((nar-bytevector - (call-with-worker-thread - utility-thread-channel - (lambda () - (call-with-values - (lambda () - (open-bytevector-output-port)) - (lambda (port get-bytevector) - (unless (file-exists? source-file) - (raise-exception - (make-missing-store-item-error - source-file))) - (write-file source-file port) - (get-bytevector))))))) - (letpar& - ((compressed-nar-bytevector - (call-with-worker-thread - utility-thread-channel - (lambda () - (call-with-values - (lambda () - (open-bytevector-output-port)) - (lambda (port get-bytevector) - (call-with-lzip-output-port port - (lambda (port) - (put-bytevector port nar-bytevector)) - #:level 9) - (get-bytevector)))))) - (hash - (call-with-worker-thread - utility-thread-channel - (lambda () - (bytevector->nix-base32-string - (sha256 nar-bytevector))))) - (uncompressed-size (bytevector-length nar-bytevector))) - + (length unfiltered-derivations)) + (let ((derivations + derivation-ids + (with-resource-from-pool postgresql-connection-pool conn + (update-derivation-ids-hash-table! conn + derivation-ids-hash-table + unfiltered-derivations) + + (let ((derivations + ;; Do this while holding the PostgreSQL connection to + ;; avoid conflicts with other fibers + (filter-map (lambda (derivation) + (if (hash-ref derivation-ids-hash-table + (derivation-file-name + derivation)) + #f + derivation)) + unfiltered-derivations))) + (if (null? derivations) + (values '() '()) + (let ((derivation-ids + (append-map! + (lambda (chunk) + (map (lambda (result) + (string->number (car result))) + (exec-query conn (insert-into-derivations conn chunk)))) + (chunk derivations 500)))) + + ;; Do this while holding the connection so that other + ;; fibers don't also try inserting the same derivations + (with-time-logging + "insert-missing-derivations: updating hash table" + (for-each (lambda (derivation derivation-id) + (hash-set! derivation-ids-hash-table + (derivation-file-name derivation) + derivation-id)) + derivations + derivation-ids)) + + (values derivations + derivation-ids))))))) + + (unless (null? derivations) + (parallel-via-fibers + (with-time-logging + "insert-missing-derivations: inserting sources" + (fibers-for-each + (lambda (derivation-id derivation) + (let ((sources (derivation-sources derivation))) + (unless (null? sources) + (let ((sources-ids + (with-resource-from-pool postgresql-connection-pool conn + (insert-derivation-sources conn + derivation-id + sources)))) + (par-map& + (lambda (id source-file) + (when (with-resource-from-pool postgresql-connection-pool conn - (insert-derivation-source-file-nar - conn - id - hash - compressed-nar-bytevector - uncompressed-size))))) - (_ #f))) - sources-ids - sources))))) - derivation-ids - derivations)) - - (with-resource-from-pool postgresql-connection-pool conn - (with-time-logging - "insert-missing-derivations: inserting outputs" - (for-each (lambda (derivation-id derivation) - (insert-derivation-outputs conn - derivation-id - (derivation-outputs derivation))) - derivation-ids - derivations))) + (match + (exec-query + conn + " +SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" + (list (number->string id))) + (() + ;; Insert a placeholder to avoid other fibers + ;; working on this source file + (insert-placeholder-derivation-source-file-nar + conn + id) + #t) + (_ #f))) + (let ((nar-bytevector + (call-with-worker-thread + utility-thread-channel + (lambda () + (call-with-values + (lambda () + (open-bytevector-output-port)) + (lambda (port get-bytevector) + (unless (file-exists? source-file) + (raise-exception + (make-missing-store-item-error + source-file))) + (write-file source-file port) + (let ((res (get-bytevector))) + (close-port port) ; maybe reduces memory? + res))))))) + (letpar& + ((compressed-nar-bytevector + (call-with-worker-thread + utility-thread-channel + (lambda () + (call-with-values + (lambda () + (open-bytevector-output-port)) + (lambda (port get-bytevector) + (call-with-lzip-output-port port + (lambda (port) + (put-bytevector port nar-bytevector)) + #:level 9) + (let ((res (get-bytevector))) + (close-port port) ; maybe reduces memory? + res)))))) + (hash + (call-with-worker-thread + utility-thread-channel + (lambda () + (bytevector->nix-base32-string + (sha256 nar-bytevector))))) + (uncompressed-size (bytevector-length nar-bytevector))) + (with-resource-from-pool postgresql-connection-pool conn + (update-derivation-source-file-nar + conn + id + hash + compressed-nar-bytevector + uncompressed-size)))))) + sources-ids + sources))))) + derivation-ids + derivations)) - (with-time-logging - "insert-missing-derivations: ensure-input-derivations-exist" - (ensure-input-derivations-exist (deduplicate-strings - (map derivation-input-path - (append-map derivation-inputs - derivations))))) + (with-resource-from-pool postgresql-connection-pool conn + (with-time-logging + "insert-missing-derivations: inserting outputs" + (for-each (lambda (derivation-id derivation) + (insert-derivation-outputs conn + derivation-id + (derivation-outputs derivation))) + derivation-ids + derivations))) + + (with-time-logging + "insert-missing-derivations: ensure-input-derivations-exist" + (ensure-input-derivations-exist (deduplicate-strings + (map derivation-input-path + (append-map derivation-inputs + derivations)))))) - (with-resource-from-pool postgresql-connection-pool conn - (with-time-logging - (simple-format - #f "insert-missing-derivations: inserting inputs for ~A derivations" - (length derivations)) - (insert-derivation-inputs conn - derivation-ids - derivations)))))) + (with-resource-from-pool postgresql-connection-pool conn + (with-time-logging + (simple-format + #f "insert-missing-derivations: inserting inputs for ~A derivations" + (length derivations)) + (insert-derivation-inputs conn + derivation-ids + derivations))))))) (define (derivation-file-names->derivation-ids postgresql-connection-pool utility-thread-channel @@ -1160,11 +1178,6 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" #t "debug: derivation-file-names->derivation-ids: processing ~A derivations\n" derivations-count) - (with-resource-from-pool postgresql-connection-pool conn - (update-derivation-ids-hash-table! conn - derivation-ids-hash-table - derivation-file-names)) - (let* ((missing-derivation-filenames (deduplicate-strings (vector-fold @@ -1192,36 +1205,52 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (chunk! missing-derivation-filenames 1000)))) (for-each - (lambda (missing-derivation-filenames-chunk) + (lambda (missing-derivation-chunk-promise) (let ((missing-derivations-chunk - ;; Do the filter again, since processing the last chunk - ;; might have inserted some of the derivations in this - ;; chunk - (remove! (lambda (derivation) - (hash-ref derivation-ids-hash-table - (derivation-file-name - derivation))) - (fibers-force - missing-derivation-filenames-chunk)))) - + (fibers-force + missing-derivation-chunk-promise))) (unless (null? missing-derivations-chunk) (insert-missing-derivations postgresql-connection-pool utility-thread-channel derivation-ids-hash-table missing-derivations-chunk)))) - missing-derivations-chunked-promises)) - - (let ((all-ids - (vector-map - (lambda (_ derivation-file-name) - (if derivation-file-name - (or (hash-ref derivation-ids-hash-table - derivation-file-name) - (error "missing derivation id")) - #f)) - derivation-file-names))) - - all-ids)))) + missing-derivations-chunked-promises) + + (let ((all-ids + (vector-map + (lambda (_ derivation-file-name) + (if derivation-file-name + (or (hash-ref derivation-ids-hash-table + derivation-file-name) + ;; If a derivation ID can't be found, update the + ;; hash table then check again + (with-resource-from-pool postgresql-connection-pool conn + (for-each + (lambda (missing-derivations-chunked-promise) + (update-derivation-ids-hash-table! + conn + derivation-ids-hash-table + (fibers-force missing-derivations-chunked-promise))) + missing-derivations-chunked-promises) + (or (hash-ref derivation-ids-hash-table + derivation-file-name) + (error + (simple-format #f "missing derivation id (~A)" + derivation-file-name))))) + #f)) + derivation-file-names))) + + (with-resource-from-pool postgresql-connection-pool conn + (simple-format + (current-error-port) + "guix-data-service: clearing the derivation-ids-hash-table\n") + (hash-clear! derivation-ids-hash-table)) + + ;; Just in case this helps clear memory + (for-each fibers-promise-reset + missing-derivations-chunked-promises) + + all-ids))))) (prevent-inlining-for-tests derivation-file-names->derivation-ids) |