;;; Guix Data Service -- Information about Guix over time ;;; Copyright © 2019 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (guix-data-service jobs load-new-guix-revision) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (ice-9 textual-ports) #:use-module (ice-9 hash-table) #:use-module (ice-9 suspendable-ports) #:use-module (ice-9 binary-ports) #:use-module ((ice-9 ports internal) #:select (port-poll)) #:use-module (rnrs bytevectors) #:use-module (rnrs exceptions) #:use-module (lzlib) #:use-module (json) #:use-module (squee) #:use-module (gcrypt hash) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (guix monads) #:use-module (guix base32) #:use-module (guix store) #:use-module (guix channels) #:use-module (guix inferior) #:use-module (guix profiles) #:use-module (guix utils) #:use-module (guix i18n) #:use-module (guix progress) #:use-module (guix packages) #:use-module (guix derivations) #:use-module (guix serialization) #:use-module (guix build utils) #:use-module ((guix build syscalls) #:select (set-thread-name free-disk-space)) #:use-module (guix-data-service config) #:use-module (guix-data-service database) #:use-module (guix-data-service utils) #:use-module (guix-data-service model utils) #:use-module (guix-data-service model build) #:use-module (guix-data-service model system) #:use-module (guix-data-service model channel-instance) #:use-module (guix-data-service model channel-news) #:use-module (guix-data-service model package) #:use-module (guix-data-service model package-derivation-by-guix-revision-range) #:use-module (guix-data-service model git-repository) #:use-module (guix-data-service model guix-revision) #:use-module (guix-data-service model package-derivation) #:use-module (guix-data-service model guix-revision-package-derivation) #:use-module (guix-data-service model license) #:use-module (guix-data-service model license-set) #:use-module (guix-data-service model lint-checker) #:use-module (guix-data-service model lint-warning) #:use-module (guix-data-service model lint-warning-message) #:use-module (guix-data-service model location) #:use-module (guix-data-service model package-metadata) #:use-module (guix-data-service model derivation) #:use-module (guix-data-service model system-test) #:export (fetch-unlocked-jobs process-load-new-guix-revision-job select-load-new-guix-revision-job-metrics select-job-for-commit select-jobs-and-events select-recent-job-events select-unprocessed-jobs-and-events select-jobs-and-events-for-commit guix-revision-loaded-successfully? record-job-event enqueue-load-new-guix-revision-job most-recent-n-load-new-guix-revision-jobs)) (define inferior-package-id (@@ (guix inferior) inferior-package-id)) (define (record-start-time action) (simple-format #t "debug: Starting ~A\n" action) (cons action (current-time))) (define record-end-time (match-lambda ((action . start-time) (let ((time-taken (- (current-time) start-time))) (simple-format #t "debug: Finished ~A, took ~A seconds\n" action time-taken))))) (define-exception-type &missing-store-item-error &error make-missing-store-item-error missing-store-item-error? (item missing-store-item-error-item)) (define* (retry-on-missing-store-item thunk #:key on-exception) (with-exception-handler (lambda (exn) (if (missing-store-item-error? exn) (begin (simple-format (current-error-port) "missing store item ~A, retrying ~A\n" (missing-store-item-error-item exn) thunk) (when on-exception (on-exception)) (retry-on-missing-store-item thunk #:on-exception on-exception)) (raise-exception exn))) thunk #:unwind? #t)) (define (inferior-guix-systems inf) ;; The order shouldn't matter here, but bugs in Guix can lead to different ;; results depending on the order, so sort the systems to try and provide ;; deterministic behaviour (sort (cond ((inferior-eval '(defined? 'systems (resolve-module '(guix platform))) inf) (remove (lambda (system) ;; There aren't currently bootstrap binaries for s390x-linux, so this ;; just leads to lots of errors (string=? system "s390x-linux")) (inferior-eval '((@ (guix platform) systems)) inf))) (else (inferior-eval '(@ (guix packages) %supported-systems) inf))) stringpackage ,guix-source #:commit ,guix-commit))) (map (lambda (system-test) (let ((stats (gc-stats))) (simple-format (current-error-port) "inferior heap: ~a MiB used (~a MiB heap)~%" (round (/ (- (assoc-ref stats 'heap-size) (assoc-ref stats 'heap-free-size)) (expt 2. 20))) (round (/ (assoc-ref stats 'heap-size) (expt 2. 20))))) (list (system-test-name system-test) (system-test-description system-test) (filter-map (lambda (system) (simple-format (current-error-port) "guix-data-service: computing derivation for ~A system test (on ~A)\n" (system-test-name system-test) system) (catch #t (lambda () (cons system (parameterize ((%current-system system)) (derivation-file-name (run-with-store store (mbegin %store-monad (system-test-value system-test))))))) (lambda (key . args) (simple-format (current-error-port) "guix-data-service: error computing derivation for system test ~A (~A): ~A: ~A\n" (system-test-name system-test) system key args) #f))) (list ,@inf-systems)) (match (system-test-location system-test) (($ file line column) (list file line column))))) (all-system-tests))))) (catch #t (lambda () (inferior-eval ;; For channel-source->package '(use-modules (gnu packages package-management)) inf) (let ((system-test-data (with-time-logging "getting system tests" (inferior-eval-with-store/non-blocking inf store extract)))) system-test-data)) (lambda (key . args) (display (backtrace) (current-error-port)) (display "\n" (current-error-port)) (simple-format (current-error-port) "error: all-inferior-system-tests: ~A: ~A\n" key args) #f))) (define locales '("cs_CZ.UTF-8" "da_DK.UTF-8" "de_DE.UTF-8" "eo_EO.UTF-8" "es_ES.UTF-8" "fr_FR.UTF-8" "hu_HU.UTF-8" "nl_NL.UTF-8" "pl_PL.UTF-8" "pt_BR.UTF-8" ;;"sr_SR.UTF-8" "sv_SE.UTF-8" "vi_VN.UTF-8" "zh_CN.UTF-8")) (define (inferior-lint-checkers inf) (and (or (inferior-eval '(and (resolve-module '(guix lint) #:ensure #f) (use-modules (guix lint)) #t) inf) (begin (simple-format (current-error-port) "warning: no (guix lint) module found\n") #f)) (inferior-eval `(begin (define (lint-descriptions-by-locale checker) (let* ((source-locale "en_US.UTF-8") (source-description (begin (setlocale LC_MESSAGES source-locale) (G_ (lint-checker-description checker)))) (descriptions-by-locale (filter-map (lambda (locale) (catch 'system-error (lambda () (setlocale LC_MESSAGES locale)) (lambda (key . args) (error (simple-format #f "error changing locale to ~A: ~A ~A" locale key args)))) (let ((description (G_ (lint-checker-description checker)))) (setlocale LC_MESSAGES source-locale) (if (string=? description source-description) #f (cons locale description)))) (list ,@locales)))) (cons (cons source-locale source-description) descriptions-by-locale))) (map (lambda (checker) (list (lint-checker-name checker) (lint-descriptions-by-locale checker) (if (memq checker %network-dependent-checkers) #t #f))) %all-checkers)) inf))) (define (inferior-lint-warnings inf store checker-name) (define lint-warnings-for-checker `(lambda (store) (let* ((checker-name (quote ,checker-name)) (checker (find (lambda (checker) (eq? (lint-checker-name checker) checker-name)) %local-checkers)) (check (lint-checker-check checker))) (define lint-checker-requires-store?-defined? (defined? 'lint-checker-requires-store? (resolve-module '(guix lint)))) (define (process-lint-warning lint-warning) (list (match (lint-warning-location lint-warning) (($ file line column) (list (if (string-prefix? "/gnu/store/" file) ;; Convert a string like ;; /gnu/store/53xh0mpigin2rffg31s52x5dc08y0qmr-guix-module-union/share/guile/site/2.2/gnu/packages/xdisorg.scm ;; ;; This happens when the checker uses ;; package-field-location. (string-join (drop (string-split file #\/) 8) "/") file) line column))) (let* ((source-locale "en_US.UTF-8") (source-message (begin (setlocale LC_MESSAGES source-locale) (lint-warning-message lint-warning))) (messages-by-locale (filter-map (lambda (locale) (catch 'system-error (lambda () (setlocale LC_MESSAGES locale)) (lambda (key . args) (error (simple-format #f "error changing locale to ~A: ~A ~A" locale key args)))) (let ((message (lint-warning-message lint-warning))) (setlocale LC_MESSAGES source-locale) (if (string=? message source-message) #f (cons locale message)))) (list ,@locales)))) (cons (cons source-locale source-message) messages-by-locale)))) (vector-map (lambda (_ package) (map process-lint-warning (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception checking ~A with ~A checker: ~A\n" package checker-name exn) (raise-exception exn)) (lambda () (if (and lint-checker-requires-store?-defined? (lint-checker-requires-store? checker)) (check package #:store store) (check package))) #:unwind? #t))) gds-inferior-packages)))) (ensure-gds-inferior-packages-defined! inf) (inferior-eval '(and (resolve-module '(guix lint) #:ensure #f) (use-modules (guix lint)) #t) inf) (with-time-logging (simple-format #f "getting ~A lint warnings" checker-name) (inferior-eval-with-store/non-blocking inf store lint-warnings-for-checker))) (define (inferior-fetch-system-target-pairs inf) (define inf-systems (inferior-guix-systems inf)) (define inf-targets (cond ((inferior-eval '(defined? 'targets (resolve-module '(guix platform))) inf) (sort (inferior-eval '((@ (guix platform) targets)) inf) stringbool (member system supported-systems)))) (target-supported? (or (not target) (let ((system-for-target (assoc-ref target-system-alist target))) (or (not system-for-target) (->bool (member system-for-target (package-supported-systems package) string=?))))))) (when (string=? (package-name package) "guix") (simple-format (current-error-port) "looking at guix package (supported systems: ~A, system supported: ~A, target supported: ~A\n" supported-systems system-supported? target-supported?)) (if system-supported? (if target-supported? (derivation-for-system-and-target package system target) #f) #f))) (lambda (key . args) (if (and (eq? key 'system-error) (eq? (car args) 'fport_write)) (begin (simple-format (current-error-port) "error: while processing ~A, exiting: ~A: ~A\n" (package-name package) key args) (exit 1)) (begin (simple-format (current-error-port) "error: while processing ~A ignoring error: ~A: ~A\n" (package-name package) key args) #f))))) vec) vec))) (inferior-eval '(when (defined? 'systems (resolve-module '(guix platform))) (use-modules (guix platform))) inf) (unless (inferior-eval '(defined? 'package-unsupported-target-error? (resolve-module '(guix packages))) inf) (inferior-eval '(define package-unsupported-target-error? (const #f)) inf) (inferior-eval '(define unsupported-cross-compilation-target-error? (const #f)) inf)) (inferior-eval-with-store/non-blocking inf store proc)) (define (sort-and-deduplicate-inferior-packages packages pkg-to-replacement-hash-table) (pair-fold (lambda (pair result) (if (null? (cdr pair)) (cons (first pair) result) (let* ((a (first pair)) (b (second pair)) (a-name (inferior-package-name a)) (b-name (inferior-package-name b)) (a-version (inferior-package-version a)) (b-version (inferior-package-version b)) (a-replacement (hashq-ref pkg-to-replacement-hash-table a)) (b-replacement (hashq-ref pkg-to-replacement-hash-table b))) (if (and (string=? a-name b-name) (string=? a-version b-version) (eq? a-replacement b-replacement)) (begin (simple-format (current-error-port) "warning: ignoring duplicate package: ~A (~A)\n" a-name a-version) result) (cons a result))))) '() (sort packages (lambda (a b) (let ((a-name (inferior-package-name a)) (b-name (inferior-package-name b))) (if (string=? a-name b-name) (let ((a-version (inferior-package-version a)) (b-version (inferior-package-version b))) (if (string=? a-version b-version) ;; The name and version are the same, so try and pick ;; the same package each time, by looking at the ;; location. (let ((a-location (inferior-package-location a)) (b-location (inferior-package-location b))) (> (location-line a-location) (location-line b-location))) (stringbool (package-replacement (hash-ref %package-table id)))) (list ,@(map inferior-package-id packages))) inf)))) (pkg-to-replacement-hash-table (let ((ht (make-hash-table))) (for-each (lambda (pkg replacement) (when replacement (hashq-set! ht pkg replacement))) packages replacements) ht)) (non-exported-replacements (let ((package-id-hash-table (make-hash-table))) (for-each (lambda (pkg) (hash-set! package-id-hash-table (inferior-package-id pkg) #t)) packages) (filter (lambda (pkg) (and pkg (eq? #f (hash-ref package-id-hash-table (inferior-package-id pkg))))) replacements))) (deduplicated-packages ;; This isn't perfect, sometimes there can be two packages with the ;; same name and version, but different derivations. Guix will warn ;; about this case though, generally this means only one of the ;; packages should be exported. (with-time-logging "deduplicating inferior packages" (call-with-temporary-thread (lambda () ;; TODO Sort introduces a continuation barrier (sort-and-deduplicate-inferior-packages (append! packages non-exported-replacements) pkg-to-replacement-hash-table))))) (deduplicated-packages-length (length deduplicated-packages))) (inferior-eval `(define gds-inferior-packages (vector ,@(map inferior-package-id deduplicated-packages))) inf) (inferior-eval '(begin (vector-map! (lambda (_ id) (or (hashv-ref %package-table id) (error "missing package id"))) gds-inferior-packages) #t) inf) (inferior-eval '(let ((stats (gc-stats))) (simple-format (current-error-port) "post gds-inferior-packages inferior heap: ~a MiB used (~a MiB heap)~%" (round (/ (- (assoc-ref stats 'heap-size) (assoc-ref stats 'heap-free-size)) (expt 2. 20))) (round (/ (assoc-ref stats 'heap-size) (expt 2. 20))))) inf) (values (list->vector deduplicated-packages) pkg-to-replacement-hash-table))) (define (ensure-gds-inferior-packages-defined! inf) (unless (inferior-eval '(defined? 'gds-inferior-packages) inf) (with-time-logging "ensuring gds-inferior-packages is defined in inferior" (inferior-packages-plus-replacements inf)))) (define* (all-inferior-packages-data inf packages pkg-to-replacement-hash-table) (define inferior-package-id->packages-index-hash-table (let ((hash-table (make-hash-table))) (vector-for-each (lambda (i pkg) (hash-set! hash-table (inferior-package-id pkg) i)) packages) hash-table)) (let* ((package-license-data (with-time-logging "fetching inferior package license metadata" (inferior-packages->license-data inf))) (package-metadata (with-time-logging "fetching inferior package metadata" (vector-map (lambda (_ package) (let ((translated-package-descriptions-and-synopsis (inferior-packages->translated-package-descriptions-and-synopsis inf package))) (list (non-empty-string-or-false (inferior-package-home-page package)) (inferior-package-location package) (car translated-package-descriptions-and-synopsis) (cdr translated-package-descriptions-and-synopsis)))) packages))) (package-replacement-data (vector-map (lambda (_ pkg) (let ((replacement (hashq-ref pkg-to-replacement-hash-table pkg))) (if replacement ;; I'm not sure if replacements can themselves be ;; replaced, but I do know for sure that there are ;; infinite chains of replacements (python(2)-urllib3 ;; in 7c4c781aa40c42d4cd10b8d9482199f3db345e1b for ;; example). ;; ;; So this might be #f in these cases (let ((index (hash-ref inferior-package-id->packages-index-hash-table (inferior-package-id replacement)))) (unless index (simple-format (current-error-port) "warning: replacement for ~A (~A) is unknown\n" pkg replacement)) index) #f))) packages))) `((names . ,(vector-map (lambda (_ pkg) (inferior-package-name pkg)) packages)) (versions . ,(vector-map (lambda (_ pkg) (inferior-package-version pkg)) packages)) (license-data . ,package-license-data) (metadata . ,package-metadata) (replacements . ,package-replacement-data)))) (define (insert-packages conn inferior-packages-data) (let* ((names (assq-ref inferior-packages-data 'names)) (versions (assq-ref inferior-packages-data 'versions)) (package-license-set-ids (with-time-logging "inserting package license sets" (inferior-packages->license-set-ids conn (inferior-packages->license-id-lists conn ;; TODO Don't needlessly convert (vector->list (assq-ref inferior-packages-data 'license-data)))))) (all-package-metadata-ids new-package-metadata-ids (with-time-logging "inserting package metadata entries" (inferior-packages->package-metadata-ids conn ;; TODO Don't needlessly convert (vector->list (assq-ref inferior-packages-data 'metadata)) package-license-set-ids))) (replacement-package-ids (vector-map (lambda (_ package-index-or-false) (if package-index-or-false (first (inferior-packages->package-ids conn (list (list (vector-ref names package-index-or-false) (vector-ref versions package-index-or-false) (list-ref all-package-metadata-ids package-index-or-false) (cons "integer" NULL))))) (cons "integer" NULL))) (assq-ref inferior-packages-data 'replacements)))) (unless (null? new-package-metadata-ids) (with-time-logging "inserting package metadata tsvector entries" (insert-package-metadata-tsvector-entries conn new-package-metadata-ids))) (with-time-logging "getting package-ids (without replacements)" (list->vector (inferior-packages->package-ids conn ;; TODO Do this more efficiently (zip (vector->list names) (vector->list versions) all-package-metadata-ids (vector->list replacement-package-ids))))))) (define (insert-lint-warnings conn package-ids lint-checker-ids lint-warnings-data) (lint-warnings-data->lint-warning-ids conn (append-map! (lambda (lint-checker-id warnings-per-package) (if warnings-per-package (vector-fold (lambda (_ result package-id warnings) (append! result (map (match-lambda ((location-data messages-by-locale) (let ((location-id (location->location-id conn (apply location location-data))) (lint-warning-message-set-id (lint-warning-message-data->lint-warning-message-set-id conn messages-by-locale))) (list lint-checker-id package-id location-id lint-warning-message-set-id)))) (fold (lambda (location-and-messages result) ;; TODO Sort to delete duplicates, rather than use member (if (member location-and-messages result) (begin (apply simple-format (current-error-port) "warning: skipping duplicate lint warning ~A ~A\n" location-and-messages) result) (append! result (list location-and-messages)))) '() warnings)))) '() package-ids warnings-per-package) '())) lint-checker-ids lint-warnings-data))) (define (update-derivation-ids-hash-table! conn derivation-ids-hash-table derivation-file-names) (define derivations-count (vector-length derivation-file-names)) (let ((missing-file-names (vector-fold (lambda (_ result file-name) (if (hash-ref derivation-ids-hash-table file-name) result (cons file-name result))) '() derivation-file-names))) (simple-format #t "debug: update-derivation-ids-hash-table!: lookup ~A file-names, ~A not cached\n" derivations-count (length missing-file-names)) (unless (null? missing-file-names) (for-each (lambda (chunk) (for-each (match-lambda ((id file-name) (hash-set! derivation-ids-hash-table file-name (string->number id)))) (exec-query conn (select-existing-derivations chunk)))) (chunk! missing-file-names 1000))))) (define* (insert-missing-derivations postgresql-connection-pool utility-thread-channel derivation-ids-hash-table unfiltered-derivations #:key (log-tag "unspecified")) (define (insert-into-derivations conn drvs) (string-append "INSERT INTO derivations " "(file_name, builder, args, env_vars, system_id) VALUES " (string-join (map (match-lambda (($ outputs inputs sources system builder args env-vars file-name) (simple-format #f "('~A', '~A', ARRAY[~A]::varchar[], ARRAY[~A], '~A')" file-name builder (string-join (map quote-string args) ",") (string-join (map (match-lambda ((key . value) (string-append "['" key '"', $$" value "$$ ]"))) env-vars) ",") (system->system-id conn system)))) drvs) ",") " RETURNING id" ";")) (define (insert-derivations) (with-resource-from-pool postgresql-connection-pool conn (update-derivation-ids-hash-table! conn derivation-ids-hash-table (let ((file-names-vector (make-vector (length unfiltered-derivations)))) (for-each (lambda (i drv) (vector-set! file-names-vector i (derivation-file-name drv))) (iota (vector-length file-names-vector)) unfiltered-derivations) file-names-vector)) (let ((derivations ;; Do this while holding the PostgreSQL connection to ;; avoid conflicts with other fibers (delete-duplicates (filter-map (lambda (derivation) (if (hash-ref derivation-ids-hash-table (derivation-file-name derivation)) #f derivation)) unfiltered-derivations)))) (if (null? derivations) (values '() '()) (begin (simple-format (current-error-port) "insert-missing-derivations: inserting ~A derivations (~A)\n" (length unfiltered-derivations) log-tag) (let ((derivation-ids (append-map! (lambda (chunk) (map (lambda (result) (string->number (car result))) (exec-query conn (insert-into-derivations conn chunk)))) (chunk derivations 500)))) ;; Do this while holding the connection so that other ;; fibers don't also try inserting the same derivations (with-time-logging (string-append "insert-missing-derivations: updating hash table (" log-tag ")") (for-each (lambda (derivation derivation-id) (hash-set! derivation-ids-hash-table (derivation-file-name derivation) derivation-id)) derivations derivation-ids)) (simple-format (current-error-port) "insert-missing-derivations: finished inserting ~A derivations (~A)\n" (length unfiltered-derivations) log-tag) (values derivations derivation-ids))))))) (define (insert-sources derivations derivation-ids) (with-time-logging (string-append "insert-missing-derivations: inserting sources (" log-tag ")") (fibers-for-each (lambda (derivation-id derivation) (let ((sources (derivation-sources derivation))) (unless (null? sources) (let ((sources-ids (with-resource-from-pool postgresql-connection-pool conn (insert-derivation-sources conn derivation-id sources)))) (fibers-for-each (lambda (id source-file) (when (with-resource-from-pool postgresql-connection-pool conn (match (exec-query conn " SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (list (number->string id))) (() ;; Insert a placeholder to avoid other fibers ;; working on this source file (insert-placeholder-derivation-source-file-nar conn id) #t) (_ #f))) ;; Use the utility-thread-channel to control concurrency here, ;; to avoid using too much memory (call-with-worker-thread utility-thread-channel (lambda () (let ((nar-bytevector (call-with-values (lambda () (open-bytevector-output-port)) (lambda (port get-bytevector) (unless (file-exists? source-file) (raise-exception (make-missing-store-item-error source-file))) (write-file source-file port) (let ((res (get-bytevector))) (close-port port) ; maybe reduces memory? res))))) (let ((compressed-nar-bytevector (call-with-values (lambda () (open-bytevector-output-port)) (lambda (port get-bytevector) (call-with-lzip-output-port port (lambda (port) (put-bytevector port nar-bytevector)) #:level 9) (let ((res (get-bytevector))) (close-port port) ; maybe reduces memory? res)))) (hash (bytevector->nix-base32-string (sha256 nar-bytevector))) (uncompressed-size (bytevector-length nar-bytevector))) (with-resource-from-pool postgresql-connection-pool conn (update-derivation-source-file-nar conn id hash compressed-nar-bytevector uncompressed-size)))))))) sources-ids sources))))) derivation-ids derivations))) (let ((derivations derivation-ids (insert-derivations))) (unless (null? derivations) (parallel-via-fibers (insert-sources derivations derivation-ids) (with-time-logging (string-append "insert-missing-derivations: inserting outputs (" log-tag ")") (with-resource-from-pool postgresql-connection-pool conn (for-each (lambda (derivation-id derivation) (insert-derivation-outputs conn derivation-id (derivation-outputs derivation))) derivation-ids derivations))) (with-time-logging (string-append "insert-missing-derivations: ensure-input-derivations-exist (" log-tag ")") (let ((input-derivations (map derivation-input-derivation (append-map derivation-inputs derivations)))) (unless (null? input-derivations) ;; Ensure all the input derivations exist (for-each (lambda (chunk) (insert-missing-derivations postgresql-connection-pool utility-thread-channel derivation-ids-hash-table chunk #:log-tag log-tag)) (chunk! input-derivations 1000)))))) (string-append "insert-missing-derivations: done parallel (" log-tag ")") (with-resource-from-pool postgresql-connection-pool conn (with-time-logging (simple-format #f "insert-missing-derivations: inserting inputs for ~A derivations (~A)" (length derivations) log-tag) (insert-derivation-inputs conn derivation-ids derivations)))))) (define* (derivation-file-names->derivation-ids postgresql-connection-pool utility-thread-channel read-derivations/fiberized derivation-ids-hash-table derivation-file-names #:key (log-tag "unspecified")) (define derivations-count (vector-length derivation-file-names)) (if (= 0 derivations-count) #() (begin (simple-format #t "debug: derivation-file-names->derivation-ids: processing ~A derivations (~A)\n" derivations-count log-tag) (let* ((missing-derivation-filenames (deduplicate-strings (vector-fold (lambda (_ result derivation-file-name) (if (not derivation-file-name) result (if (hash-ref derivation-ids-hash-table derivation-file-name) result (cons derivation-file-name result)))) '() derivation-file-names)))) (let ((chunks (chunk! missing-derivation-filenames 1000))) (for-each (lambda (i missing-derivation-file-names-chunk) (let ((missing-derivations-chunk (read-derivations/fiberized missing-derivation-file-names-chunk))) (simple-format #t "debug: derivation-file-names->derivation-ids: processing chunk ~A (~A)\n" i log-tag) (insert-missing-derivations postgresql-connection-pool utility-thread-channel derivation-ids-hash-table missing-derivations-chunk #:log-tag log-tag))) (iota (length chunks)) chunks)) (let ((all-ids (vector-map (lambda (_ derivation-file-name) (if derivation-file-name (or (hash-ref derivation-ids-hash-table derivation-file-name) (error (simple-format #f "missing derivation id (~A)" derivation-file-name))) #f)) derivation-file-names))) all-ids))))) (prevent-inlining-for-tests derivation-file-names->derivation-ids) (define guix-store-path (let ((store-path #f)) (lambda (store) (if (and store-path (file-exists? store-path)) store-path (let ((config-guix (%config 'guix))) (if (and (file-exists? config-guix) (string-prefix? "/gnu/store/" config-guix)) (begin (set! store-path (dirname (dirname (%config 'guix)))) store-path) (begin (invalidate-derivation-caches!) (hash-clear! (@@ (guix packages) %derivation-cache)) (let* ((guix-package (@ (gnu packages package-management) guix)) (derivation (package-derivation store guix-package))) (with-time-logging "building the guix derivation" (build-derivations store (list derivation))) (let ((new-store-path (derivation->output-path derivation))) (set! store-path new-store-path) (simple-format (current-error-port) "debug: guix-store-path: ~A\n" new-store-path) new-store-path))))))))) (define (nss-certs-store-path store) (let* ((nss-certs-package (@ (gnu packages certs) nss-certs)) (derivation (package-derivation store nss-certs-package))) (with-time-logging "building the nss-certs derivation" (build-derivations store (list derivation))) (derivation->output-path derivation))) (define (non-blocking-port port) "Make PORT non-blocking and return it." (let ((flags (fcntl port F_GETFL))) (when (zero? (logand O_NONBLOCK flags)) (fcntl port F_SETFL (logior O_NONBLOCK flags))) port)) (define (ensure-non-blocking-store-connection store) (match (store-connection-socket store) ((? file-port? port) (non-blocking-port port)) (_ #f))) (define (call-with-temporary-blocking-store store proc) (let* ((port (store-connection-socket store)) (flags (fcntl port F_GETFL))) (unless (zero? (logand O_NONBLOCK flags)) (fcntl port F_SETFL (logxor O_NONBLOCK flags))) (call-with-values (lambda () (proc store)) (lambda vals (fcntl port F_SETFL (logior O_NONBLOCK flags)) (apply values vals))))) (define (make-inferior-non-blocking! inferior) (non-blocking-port ((@@ (guix inferior) inferior-socket) inferior))) (define (call-with-temporary-thread thunk) (let ((channel (make-channel))) (call-with-new-thread (lambda () (parameterize ((current-read-waiter (lambda (port) (port-poll port "r"))) (current-write-waiter (lambda (port) (port-poll port "w")))) (with-exception-handler (lambda (exn) (put-message channel `(exception . ,exn))) (lambda () (with-throw-handler #t (lambda () (call-with-values thunk (lambda values (put-message channel `(values ,@values))))) (lambda _ (backtrace)))) #:unwind? #t)))) (match (get-message channel) (('values . results) (apply values results)) (('exception . exn) (raise-exception exn))))) (define (inferior-eval-with-store/non-blocking inferior store proc) (call-with-temporary-thread (lambda () (inferior-eval-with-store inferior store proc)))) (define* (channel->source-and-derivation-file-names-by-system conn channel fetch-with-authentication? #:key parallelism) (define use-container? (defined? 'open-inferior/container (resolve-module '(guix inferior)))) (define (inferior-code channel-instance system) `(lambda (store) (let* ((system ,system) (instances (list (channel-instance (channel (name ',(channel-name channel)) (url ,(channel-url channel)) (branch ,(channel-branch channel)) (commit ,(channel-commit channel))) ,(channel-instance-commit channel-instance) ,(channel-instance-checkout channel-instance))))) (simple-format (current-error-port) "guix-data-service: computing the derivation-file-name for ~A\n" system) (let ((manifest (catch #t (lambda () ((channel-instances->manifest instances #:system system) store)) (lambda (key . args) (simple-format (current-error-port) "error: while computing manifest entry derivation for ~A\n" system) (simple-format (current-error-port) "error ~A: ~A\n" key args) #f)))) (define (add-tmp-root-and-return-drv drv) (add-temp-root store drv) drv) (simple-format (current-error-port) "computed the manifest for ~A\n" system) `((manifest-entry-item . ,(and manifest (add-tmp-root-and-return-drv (derivation-file-name (manifest-entry-item (first (manifest-entries manifest)))))))))))) (define (start-inferior inferior-store) (let ((inferior (if use-container? (open-inferior/container inferior-store (guix-store-path inferior-store) #:extra-shared-directories '("/gnu/store") #:extra-environment-variables (list (string-append "SSL_CERT_DIR=" (nss-certs-store-path inferior-store)))) (begin (simple-format #t "debug: using open-inferior\n") (open-inferior (guix-store-path inferior-store) #:error-port (current-error-port)))))) ;; /etc is only missing if open-inferior/container has been used (when use-container? (inferior-eval '(begin ;; Create /etc/pass, as %known-shorthand-profiles in (guix ;; profiles) tries to read from this file. Because the environment ;; is cleaned in build-self.scm, xdg-directory in (guix utils) ;; falls back to accessing /etc/passwd. (mkdir "/etc") (call-with-output-file "/etc/passwd" (lambda (port) (display "root:x:0:0::/root:/bin/bash" port)))) inferior)) (inferior-eval '(use-modules (srfi srfi-1) (ice-9 history) (guix channels) (guix grafts) (guix profiles)) inferior) (inferior-eval '(%graft? #f) inferior) (inferior-eval '(disable-value-history!) inferior) (inferior-eval '(define channel-instance (@@ (guix channels) channel-instance)) inferior) inferior)) (let* ((channel-instance ;; Obtain a session level lock here, to avoid conflicts with ;; other jobs over the Git repository. (with-advisory-session-lock/log-time conn 'latest-channel-instances (lambda () (with-store-connection (lambda (store) ;; TODO (guix serialization) uses dynamic-wind (call-with-temporary-thread (lambda () (first (latest-channel-instances store (list channel) #:authenticate? fetch-with-authentication?))))))))) (pool-store-connections '()) (inferior-and-store-pool (make-resource-pool (lambda () (let* ((inferior-store (open-store-connection)) (inferior (start-inferior inferior-store))) (ensure-non-blocking-store-connection inferior-store) (set-build-options inferior-store #:fallback? #t) (make-inferior-non-blocking! inferior) (call-with-blocked-asyncs (lambda () (set! pool-store-connections (cons inferior-store pool-store-connections)))) (cons inferior inferior-store))) parallelism #:min-size 0 #:name "inferior" #:idle-seconds 30 #:destructor (match-lambda ((inferior . store) (close-inferior inferior) (close-connection store))))) (systems (with-resource-from-pool inferior-and-store-pool res (match res ((inferior . inferior-store) (inferior-eval '(@ (guix packages) %supported-systems) inferior))))) (result (fibers-map (lambda (system) (with-resource-from-pool inferior-and-store-pool res (match res ((inferior . inferior-store) (with-exception-handler (lambda (exn) (if (inferior-protocol-error? exn) (begin (simple-format (current-error-port) "ignoring ~A for ~A\n" exn system) (cons system #f)) (raise-exception exn))) (lambda () (with-throw-handler #t (lambda () (cons system (inferior-eval-with-store/non-blocking inferior inferior-store (inferior-code channel-instance system)))) (lambda _ (simple-format (current-error-port) "failed to compute channel instance derivation for ~A\n" system)))) #:unwind? #t))))) systems))) (cons (channel-instance-checkout channel-instance) result))) (define* (channel->source-and-derivations-by-system conn channel fetch-with-authentication? #:key parallelism) (match (with-time-logging "computing the channel derivation" (channel->source-and-derivation-file-names-by-system conn channel fetch-with-authentication? #:parallelism parallelism)) ((source . derivation-file-names-by-system) (for-each (match-lambda ((system . derivation-file-name) (simple-format (current-error-port) "debug: ~A: channel dervation: ~A\n" system derivation-file-name))) derivation-file-names-by-system) (values source derivation-file-names-by-system)))) (prevent-inlining-for-tests channel->source-and-derivations-by-system) (define (channel-derivations-by-system->guix-store-item channel-derivations-by-system) (let ((derivation-file-name-for-current-system (assoc-ref (assoc-ref channel-derivations-by-system (%current-system)) 'manifest-entry-item))) (if derivation-file-name-for-current-system (let ((derivation-for-current-system (read-derivation-from-file derivation-file-name-for-current-system))) (with-time-logging "building the channel derivation" (with-store-connection (lambda (store) (build-derivations store (list derivation-for-current-system))))) (values (derivation->output-path derivation-for-current-system) derivation-file-name-for-current-system)) #f))) (prevent-inlining-for-tests channel-derivations-by-system->guix-store-item) (define (glibc-locales-for-guix-store-path store store-path) (let ((inf (if (defined? 'open-inferior/container (resolve-module '(guix inferior))) (open-inferior/container store store-path #:extra-shared-directories '("/gnu/store")) (begin (simple-format #t "debug: using open-inferior\n") (open-inferior store-path #:error-port (current-error-port)))))) (inferior-eval '(use-modules (srfi srfi-1) (srfi srfi-34) (guix grafts) (guix derivations)) inf) (inferior-eval '(when (defined? '%graft?) (%graft? #f)) inf) (let* ((derivation (or (and=> (inferior-eval-with-store/non-blocking inf store '(lambda (store) (and (defined? 'libc-locales-for-target (resolve-module '(gnu packages base))) (derivation-file-name (package-derivation store ((@ (gnu packages base) libc-locales-for-target))))))) read-derivation-from-file) (inferior-package-derivation store (first (lookup-inferior-packages inf "glibc-locales"))))) (output (derivation->output-path derivation))) (close-inferior inf) (with-time-logging "building the glibc-locales derivation" (build-derivations store (list derivation))) output))) (define (start-inferior-for-data-extration store store-path guix-locpath extra-inferior-environment-variables) (call-with-blocked-asyncs (lambda () (let* ((original-guix-locpath (getenv "GUIX_LOCPATH")) (original-extra-env-vars-values (map (match-lambda ((key . _) (getenv key))) extra-inferior-environment-variables)) (inf (begin ;; Unset the GUILE_LOAD_PATH and GUILE_LOAD_COMPILED_PATH to ;; avoid the values for these being used in the ;; inferior. Even though the inferior %load-path and ;; %load-compiled-path has the inferior modules first, this ;; can cause issues when there are modules present outside ;; of the inferior Guix which aren't present in the inferior ;; Guix (like the new (guix lint) module (unsetenv "GUILE_LOAD_PATH") (unsetenv "GUILE_LOAD_COMPILED_PATH") (simple-format (current-error-port) "debug: set GUIX_LOCPATH to ~A\n" guix-locpath) (for-each (match-lambda ((key . val) (simple-format (current-error-port) "debug: set ~A to ~A\n" key val) (setenv key val))) extra-inferior-environment-variables) (if (defined? 'open-inferior/container (resolve-module '(guix inferior))) (open-inferior/container store store-path #:extra-shared-directories '("/gnu/store") #:extra-environment-variables (list (string-append "GUIX_LOCPATH=" guix-locpath))) (begin (setenv "GUIX_LOCPATH" guix-locpath) (simple-format #t "debug: using open-inferior\n") (open-inferior store-path #:error-port (current-error-port))))))) (setenv "GUIX_LOCPATH" original-guix-locpath) ; restore GUIX_LOCPATH (for-each (lambda (key val) (setenv key val)) (map car extra-inferior-environment-variables) original-extra-env-vars-values) (when (eq? inf #f) (error "error: inferior is #f")) ;; Normalise the locale for the inferior process (with-exception-handler (lambda (key . args) (simple-format (current-error-port) "warning: failed to set locale to en_US.UTF-8: ~A ~A\n" key args)) (lambda () (inferior-eval '(setlocale LC_ALL "en_US.UTF-8") inf))) (inferior-eval '(use-modules (srfi srfi-1) (srfi srfi-34) (srfi srfi-43) (ice-9 history) (guix grafts) (guix derivations) (gnu tests)) inf) (inferior-eval '(disable-value-history!) inf) ;; For G_ and P_ (or (inferior-eval '(and (resolve-module '(guix i18n) #:ensure #f) (use-modules (guix i18n)) #t) inf) (inferior-eval '(use-modules (guix ui)) inf)) (inferior-eval '(when (defined? '%graft?) (%graft? #f)) inf) ;; Load the heap-profiler (for-each (lambda (exp) (inferior-eval exp inf)) (call-with-input-file (%search-load-path "guix-data-service/heap-profiler.scm") (lambda (port) (let loop ((result '())) (let ((exp (read port))) (if (eof-object? exp) (reverse! result) (loop (cons (if (eq? (car exp) 'define-module) `(use-modules ,@(let loop ((lst (cddr exp)) (result '())) (match lst ('() result) (('#:use-module mod rest ...) (loop rest (cons mod result))) (rest (loop (cdr lst) result))))) exp) result)))))))) ;; TODO Have Guix make this easier ((@@ (guix inferior) ensure-store-bridge!) inf) (non-blocking-port ((@@ (guix inferior) inferior-bridge-socket) inf)) inf)))) (define* (extract-information-from db-conn guix-revision-id-promise commit guix-source store-item guix-derivation utility-thread-channel read-derivations/fiberized derivation-ids-hash-table #:key skip-system-tests? extra-inferior-environment-variables parallelism) (define guix-locpath ;; Augment the GUIX_LOCPATH to include glibc-locales from ;; the Guix at store-path, this should mean that the ;; inferior Guix works, even if it's build using a different ;; glibc version (string-append (with-store-connection (lambda (store) (glibc-locales-for-guix-store-path store store-item))) "/lib/locale" ":" (getenv "GUIX_LOCPATH"))) (define inf-and-store-pool (make-resource-pool (lambda () (let* ((inferior-store (open-store-connection))) (unless (valid-path? inferior-store store-item) (simple-format #t "warning: store item missing (~A)\n" store-item) (unless (valid-path? inferior-store guix-derivation) (simple-format #t "warning: attempting to substitute guix derivation (~A)\n" guix-derivation) (fibers-force guix-revision-id-promise) (ensure-path inferior-store guix-derivation)) (simple-format #t "warning: building (~A)\n" guix-derivation) (build-derivations inferior-store (list (read-derivation-from-file guix-derivation)))) ;; Use this more to keep the store-path alive so long as there's a ;; inferior operating (add-temp-root inferior-store store-item) (let ((inferior (start-inferior-for-data-extration inferior-store store-item guix-locpath extra-inferior-environment-variables))) (ensure-non-blocking-store-connection inferior-store) (make-inferior-non-blocking! inferior) (simple-format #t "debug: started new inferior and store connection\n") (cons inferior inferior-store)))) parallelism #:min-size 0 #:idle-seconds 20 #:name "inferior" #:destructor (match-lambda ((inferior . store) (simple-format #t "debug: closing inferior and associated store connection\n") (close-connection store) (close-inferior inferior))))) (define (call-with-inferior proc) (define (check-wal-size) (define (get-wal-bytes) (catch #t (lambda () (stat:size (stat "/var/guix/db/db.sqlite-wal"))) (lambda _ 0))) (define threshold (max (* 4096 (expt 2 20)) (* 0.8 (- (free-disk-space "/var/guix/db/db.sqlite") (get-wal-bytes))))) (if (< (get-wal-bytes) threshold) #t (let loop ((wal-bytes (get-wal-bytes))) (if (> wal-bytes threshold) (let ((stats (resource-pool-stats inf-and-store-pool))) (simple-format #t "debug: guix-daemon WAL is large (~A), ~A inferiors, waiting\n" wal-bytes (assq-ref stats 'resources)) (sleep 30) (loop (get-wal-bytes))) (begin (simple-format #t "debug: guix-daemon WAL now ~A bytes, continuing\n" wal-bytes) #t))))) (let loop () (check-wal-size) (match (with-exception-handler (lambda (exn) (if (resource-pool-timeout-error? exn) 'retry (raise-exception exn))) (lambda () (call-with-resource-from-pool inf-and-store-pool (match-lambda ((inferior . inferior-store) (call-with-values (lambda () (proc inferior inferior-store)) (lambda vals (simple-format #t "debug: returning inferior to pool\n") (cons 'result vals))))) #:timeout 20)) #:unwind? #t) ('retry (loop)) (('result . vals) (apply values vals))))) (define postgresql-connection-pool (make-resource-pool (lambda () (with-time-logging "acquiring advisory transaction lock: load-new-guix-revision-inserts" ;; Wait until this is the only transaction inserting data, to ;; avoid any concurrency issues (obtain-advisory-transaction-lock db-conn 'load-new-guix-revision-inserts)) db-conn) 1 #:name "postgres" #:min-size 0)) (define package-ids-promise (fibers-delay (lambda () (let ((packages-data (call-with-inferior (lambda (inferior inferior-store) (with-time-logging "getting all inferior package data" (let ((packages pkg-to-replacement-hash-table (inferior-packages-plus-replacements inferior))) (all-inferior-packages-data inferior packages pkg-to-replacement-hash-table))))))) (with-resource-from-pool postgresql-connection-pool conn (insert-packages conn packages-data)))))) (define (extract-and-store-lint-checkers-and-warnings) (define inferior-lint-checkers-data (call-with-inferior (lambda (inferior inferior-store) (inferior-lint-checkers inferior)))) (when inferior-lint-checkers-data (letpar& ((lint-checker-ids (with-resource-from-pool postgresql-connection-pool conn (lint-checkers->lint-checker-ids conn (map (match-lambda ((name descriptions-by-locale network-dependent) (list name network-dependent (lint-checker-description-data->lint-checker-description-set-id conn descriptions-by-locale)))) inferior-lint-checkers-data)))) (lint-warnings-data (fibers-map (match-lambda ((checker-name _ network-dependent?) (and (and (not network-dependent?) ;; Running the derivation linter is ;; currently infeasible (not (eq? checker-name 'derivation))) (begin (call-with-inferior (lambda (inferior inferior-store) (inferior-lint-warnings inferior inferior-store checker-name))))))) inferior-lint-checkers-data))) (let ((package-ids (fibers-force package-ids-promise))) (with-resource-from-pool postgresql-connection-pool conn (insert-guix-revision-lint-checkers conn (fibers-force guix-revision-id-promise) lint-checker-ids) (let ((lint-warning-ids (insert-lint-warnings conn package-ids lint-checker-ids lint-warnings-data))) (chunk-for-each! (lambda (lint-warning-ids-chunk) (insert-guix-revision-lint-warnings conn (fibers-force guix-revision-id-promise) lint-warning-ids-chunk)) 5000 lint-warning-ids))))))) (define (extract-and-store-package-derivations) (define packages-count (call-with-inferior (lambda (inferior inferior-store) (ensure-gds-inferior-packages-defined! inferior) (inferior-eval '(vector-length gds-inferior-packages) inferior)))) (define chunk-size 1000) (define (inferior-cleanup inferior) (inferior-eval '(let ((stats (gc-stats))) (simple-format (current-error-port) "cleaning up inferior (heap: ~a MiB used (~a MiB heap))~%" (round (/ (- (assoc-ref stats 'heap-size) (assoc-ref stats 'heap-free-size)) (expt 2. 20))) (round (/ (assoc-ref stats 'heap-size) (expt 2. 20))))) inferior) (catch 'match-error (lambda () (inferior-eval '(invalidate-derivation-caches!) inferior)) (lambda (key . args) (simple-format (current-error-port) "warning: ignoring match-error from calling inferior invalidate-derivation-caches!\n"))) ;; Generating derivations populates the derivation cache (inferior-eval '(hash-clear! (@@ (guix derivations) %derivation-cache)) inferior) ;; Clean the cached store connections, as there are ;; caches associated with these that take up lots of ;; memory (inferior-eval '(when (defined? '%store-table) (hash-clear! %store-table)) inferior) (inferior-eval '(hash-for-each (lambda (key _) ((@ (guix memoization) invalidate-memoization!) key)) (@@ (guix memoization) %memoization-tables)) inferior) (inferior-eval '(gc) inferior) (inferior-eval '(let ((stats (gc-stats))) (simple-format (current-error-port) "finished cleaning up inferior (heap: ~a MiB used (~a MiB heap))~%" (round (/ (- (assoc-ref stats 'heap-size) (assoc-ref stats 'heap-free-size)) (expt 2. 20))) (round (/ (assoc-ref stats 'heap-size) (expt 2. 20))))) inferior)) (define (get-derivations system target) (let ((derivations-vector (make-vector packages-count))) (with-time-logging (simple-format #f "getting derivations for ~A" (cons system target)) (let loop ((start-index 0)) (let* ((last-chunk? (>= (+ start-index chunk-size) packages-count)) (count (if last-chunk? (- packages-count start-index) chunk-size)) (chunk (call-with-inferior (lambda (inferior inferior-store) (ensure-gds-inferior-packages-defined! inferior) (let ((result (inferior-package-derivations inferior-store inferior system target start-index count))) (when last-chunk? (inferior-cleanup inferior)) result))))) (vector-copy! derivations-vector start-index chunk) (unless last-chunk? (loop (+ start-index chunk-size)))))) derivations-vector)) (define (process-system-and-target system target get-derivations) (with-time-logging (simple-format #f "processing derivations for ~A" (cons system target)) (let* ((derivations-vector (get-derivations system target)) (derivation-ids (with-time-logging (simple-format #f "derivation-file-names->derivation-ids (~A ~A)" system target) (derivation-file-names->derivation-ids postgresql-connection-pool utility-thread-channel read-derivations/fiberized derivation-ids-hash-table derivations-vector #:log-tag (simple-format #f "~A:~A" system target)))) (guix-revision-id (fibers-force guix-revision-id-promise)) (package-ids (fibers-force package-ids-promise)) (package-derivation-ids (with-resource-from-pool postgresql-connection-pool conn (with-time-logging (simple-format #f "insert-package-derivations (~A ~A)" system target) (insert-package-derivations conn system (or target "") package-ids derivation-ids))))) (chunk-for-each! (lambda (package-derivation-ids-chunk) (with-resource-from-pool postgresql-connection-pool conn (insert-guix-revision-package-derivations conn guix-revision-id package-derivation-ids-chunk))) 2000 package-derivation-ids))) (with-resource-from-pool postgresql-connection-pool conn (with-time-logging (simple-format #f "insert-guix-revision-package-derivation-distribution-counts (~A ~A)" system target) (insert-guix-revision-package-derivation-distribution-counts conn (fibers-force guix-revision-id-promise) (number->string (system->system-id conn system)) (or target "")))) 'finished) (let ((get-derivations/fiberized (fiberize get-derivations ;; Limit concurrency here to keep focused on specific ;; systems until they've been fully processed #:parallelism parallelism))) (with-time-logging "extract-and-store-package-derivations" (fibers-map-with-progress (match-lambda ((system . target) (retry-on-missing-store-item (lambda () (process-system-and-target system target get-derivations/fiberized))))) (list (call-with-inferior (lambda (inferior inferior-store) (inferior-fetch-system-target-pairs inferior)))) #:report (lambda (data) (for-each (match-lambda ((result (system . target)) (simple-format #t "~A ~A: ~A\n" system target result))) data)))))) (define (extract-and-store-system-tests) (if skip-system-tests? (begin (simple-format #t "debug: skipping system tests\n") '()) (with-time-logging "extract-and-store-system-tests" (let ((data-with-derivation-file-names (call-with-inferior (lambda (inferior inferior-store) (with-time-logging "getting inferior system tests" (all-inferior-system-tests inferior inferior-store guix-source commit)))))) (when data-with-derivation-file-names (let ((data-with-derivation-ids (map (match-lambda ((name description derivation-file-names-by-system location-data) (list name description (let ((systems (map car derivation-file-names-by-system)) (derivation-ids (derivation-file-names->derivation-ids postgresql-connection-pool utility-thread-channel read-derivations/fiberized derivation-ids-hash-table (list->vector (map cdr derivation-file-names-by-system)) #:log-tag "channel-instances"))) (map cons systems derivation-ids)) location-data))) data-with-derivation-file-names))) (with-resource-from-pool postgresql-connection-pool conn (insert-system-tests-for-guix-revision conn (fibers-force guix-revision-id-promise) data-with-derivation-ids)))))))) (with-time-logging (simple-format #f "extract-information-from: ~A\n" store-item) (parallel-via-fibers (begin (fibers-force package-ids-promise) #f) (extract-and-store-package-derivations) (retry-on-missing-store-item extract-and-store-system-tests) (with-time-logging "extract-and-store-lint-checkers-and-warnings" (extract-and-store-lint-checkers-and-warnings)))) #t) (prevent-inlining-for-tests extract-information-from) (define (load-channel-instances utility-thread-channel read-derivations/fiberized derivation-ids-hash-table git-repository-id commit channel-derivations-by-system) ;; Load the channel instances in a different transaction, so that this can ;; commit prior to the outer transaction (with-postgresql-connection "load-new-guix-revision insert channel instances" (lambda (channel-instances-conn) (with-postgresql-transaction channel-instances-conn (lambda (channel-instances-conn) (with-time-logging "acquiring advisory transaction lock: load-new-guix-revision-inserts" ;; Wait until this is the only transaction inserting data, to avoid ;; any concurrency issues (obtain-advisory-transaction-lock channel-instances-conn 'load-new-guix-revision-inserts)) (let* ((existing-guix-revision-id (git-repository-id-and-commit->revision-id channel-instances-conn git-repository-id commit)) (guix-revision-id (or existing-guix-revision-id (insert-guix-revision channel-instances-conn git-repository-id commit))) (postgresql-connection-pool (make-resource-pool (const channel-instances-conn) 1 #:name "postgres" #:min-size 0))) (unless existing-guix-revision-id (let* ((derivations-by-system (filter-map (match-lambda ((system . derivations) (and=> (assoc-ref derivations 'manifest-entry-item) (lambda (drv) (cons system drv))))) channel-derivations-by-system)) (derivation-ids (derivation-file-names->derivation-ids postgresql-connection-pool utility-thread-channel read-derivations/fiberized derivation-ids-hash-table (list->vector (map cdr derivations-by-system))))) (insert-channel-instances channel-instances-conn guix-revision-id (map cons (map car derivations-by-system) (vector->list derivation-ids)))) (simple-format (current-error-port) "guix-data-service: saved the channel instance derivations to the database\n")) guix-revision-id)))))) (prevent-inlining-for-tests load-channel-instances) (define* (load-new-guix-revision conn git-repository-id commit #:key skip-system-tests? parallelism extra-inferior-environment-variables) (define utility-thread-channel ;; There might be high demand for this, so order the requests (make-queueing-channel (call-with-default-io-waiters (lambda () (make-worker-thread-channel (const '()) #:parallelism parallelism))))) (define (read-derivations filenames) (call-with-worker-thread utility-thread-channel (lambda () (map (lambda (filename) (if (file-exists? filename) (read-derivation-from-file filename) (raise-exception (make-missing-store-item-error filename)))) filenames)))) (define read-derivations/fiberized (fiberize read-derivations ;; Don't do this in parallel as there's caching involved with ;; read-derivation-from-file #:parallelism 1)) (define derivation-ids-hash-table (make-hash-table)) (let* ((git-repository-fields (select-git-repository conn git-repository-id)) (git-repository-url (second git-repository-fields)) (fetch-with-authentication? (fourth git-repository-fields)) (channel-for-commit (channel (name 'guix) (url git-repository-url) (commit commit)))) (define channel-derivations-by-system-promise (fibers-delay (lambda () (with-postgresql-connection "load-new-guix-revision channel->source-and-derivations-by-system" (lambda (channel-conn) (channel->source-and-derivations-by-system channel-conn channel-for-commit fetch-with-authentication? #:parallelism parallelism)))))) (define guix-revision-id-promise (fibers-delay (lambda () (retry-on-missing-store-item (lambda () (let ((guix-source channel-derivations-by-system (fibers-force channel-derivations-by-system-promise))) (load-channel-instances utility-thread-channel read-derivations/fiberized derivation-ids-hash-table git-repository-id commit channel-derivations-by-system))) #:on-exception (lambda () (fibers-promise-reset channel-derivations-by-system-promise)))))) ;; Prompt getting the guix-revision-id as soon as possible (spawn-fiber (lambda () (fibers-force guix-revision-id-promise))) (let* ((guix-source channel-derivations-by-system (fibers-force channel-derivations-by-system-promise)) (store-item guix-derivation (channel-derivations-by-system->guix-store-item channel-derivations-by-system))) (if store-item (and (extract-information-from conn guix-revision-id-promise commit guix-source store-item guix-derivation utility-thread-channel read-derivations/fiberized derivation-ids-hash-table #:skip-system-tests? skip-system-tests? #:extra-inferior-environment-variables extra-inferior-environment-variables #:parallelism parallelism) (if (defined? 'channel-news-for-commit (resolve-module '(guix channels))) (with-time-logging "inserting channel news entries" (insert-channel-news-entries-for-guix-revision conn (fibers-force guix-revision-id-promise) (channel-news-for-commit channel-for-commit commit))) (begin (simple-format #t "debug: importing channel news not supported\n") #t)) (update-package-derivations-table conn git-repository-id (fibers-force guix-revision-id-promise) commit) (with-time-logging "updating builds.derivation_output_details_set_id" (update-builds-derivation-output-details-set-id conn (string->number (fibers-force guix-revision-id-promise))))) (begin (simple-format #t "Failed to generate store item for ~A\n" commit) #f))))) (define (enqueue-load-new-guix-revision-job conn git-repository-id commit source) (define query " INSERT INTO load_new_guix_revision_jobs (git_repository_id, commit, source) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING RETURNING id;") (match (exec-query conn query (list (number->string git-repository-id) commit source)) ((result) result) (() #f))) (define (select-load-new-guix-revision-job-metrics conn) (define query " SELECT COALESCE(git_repositories.label, git_repositories.url) AS repository_label, CASE WHEN succeeded_at IS NOT NULL THEN 'succeeded' WHEN ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'retry' ) >= ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure' ) THEN 'queued' ELSE 'failed' END AS state, COUNT(*) FROM load_new_guix_revision_jobs INNER JOIN git_repositories ON load_new_guix_revision_jobs.git_repository_id = git_repositories.id GROUP BY 1, 2") (map (match-lambda ((label state count) (list label state (string->number count)))) (exec-query conn query))) (define (select-job-for-commit conn commit) (let ((result (exec-query conn " SELECT id, commit, source, git_repository_id, CASE WHEN succeeded_at IS NOT NULL THEN 'succeeded' WHEN ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'retry' ) >= ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure' ) THEN 'queued' ELSE 'failed' END AS state FROM load_new_guix_revision_jobs WHERE commit = $1" (list commit)))) (match result (() #f) (((id commit source git_repository_id state)) `((id . ,(string->number id)) (commit . ,commit) (source . ,source) (git_repository_id . ,(string->number git_repository_id)) (state . ,state)))))) (define* (select-recent-job-events conn #:key (limit 8)) (define query (string-append " SELECT load_new_guix_revision_jobs.id, load_new_guix_revision_jobs.commit, load_new_guix_revision_jobs.source, load_new_guix_revision_jobs.git_repository_id, load_new_guix_revision_job_events.event, load_new_guix_revision_job_events.occurred_at FROM load_new_guix_revision_jobs INNER JOIN load_new_guix_revision_job_events ON load_new_guix_revision_job_events.job_id = load_new_guix_revision_jobs.id ORDER BY load_new_guix_revision_job_events.occurred_at DESC LIMIT " (number->string limit))) (exec-query conn query)) (define (select-jobs-and-events conn before-id limit) (define query (string-append " SELECT load_new_guix_revision_jobs.id, load_new_guix_revision_jobs.commit, load_new_guix_revision_jobs.source, load_new_guix_revision_jobs.git_repository_id, load_new_guix_revision_jobs.created_at, load_new_guix_revision_jobs.succeeded_at, ( SELECT JSON_AGG( json_build_object('event', event, 'occurred_at', occurred_at) ORDER BY occurred_at ASC ) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id ), EXISTS ( SELECT 1 FROM load_new_guix_revision_job_logs WHERE job_id = load_new_guix_revision_jobs.id ) AS log_exists FROM load_new_guix_revision_jobs " (if before-id (string-append "WHERE load_new_guix_revision_jobs.id < " (number->string before-id)) "") " ORDER BY load_new_guix_revision_jobs.id DESC " (if limit (string-append "LIMIT " (number->string limit)) ""))) (map (match-lambda ((id commit source git-repository-id created-at succeeded-at events-json log-exists?) (list id commit source git-repository-id created-at succeeded-at (if (or (eq? #f events-json) (string-null? events-json)) #() (json-string->scm events-json)) (string=? log-exists? "t")))) (exec-query conn query))) (define (select-unprocessed-jobs-and-events conn) (define query " SELECT load_new_guix_revision_jobs.id, load_new_guix_revision_jobs.commit, load_new_guix_revision_jobs.source, load_new_guix_revision_jobs.git_repository_id, load_new_guix_revision_jobs.created_at, ( SELECT JSON_AGG( json_build_object('event', event, 'occurred_at', occurred_at) ORDER BY occurred_at ASC ) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id ), EXISTS ( SELECT 1 FROM load_new_guix_revision_job_logs WHERE job_id = load_new_guix_revision_jobs.id ) AS log_exists, commit IN ( SELECT commit FROM ( SELECT DISTINCT ON (name) name, git_commits.commit FROM git_branches INNER JOIN git_commits ON git_commits.git_branch_id = git_branches.id WHERE git_branches.git_repository_id = load_new_guix_revision_jobs.git_repository_id ORDER BY name, datetime DESC ) branches_and_latest_commits ) AS latest_branch_commit FROM load_new_guix_revision_jobs INNER JOIN git_repositories ON load_new_guix_revision_jobs.git_repository_id = git_repositories.id WHERE succeeded_at IS NULL AND ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'retry' ) >= ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure' ) ORDER BY latest_branch_commit DESC, git_repositories.job_priority DESC, id DESC") (map (match-lambda ((id commit source git-repository-id created-at events-json log-exists? latest-branch-commit) (list id commit source git-repository-id created-at (if (or (eq? #f events-json) (string-null? events-json)) #() (json-string->scm events-json)) (string=? log-exists? "t") (string=? latest-branch-commit "t")))) (exec-query conn query))) (define (select-jobs-and-events-for-commit conn commit) (define query " SELECT load_new_guix_revision_jobs.id, load_new_guix_revision_jobs.source, load_new_guix_revision_jobs.git_repository_id, load_new_guix_revision_jobs.created_at, load_new_guix_revision_jobs.succeeded_at, ( SELECT JSON_AGG( json_build_object('event', event, 'occurred_at', occurred_at) ORDER BY occurred_at ASC ) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id ), EXISTS ( SELECT 1 FROM load_new_guix_revision_job_logs WHERE job_id = load_new_guix_revision_jobs.id ) AS log_exists FROM load_new_guix_revision_jobs WHERE commit = $1 ORDER BY load_new_guix_revision_jobs.id DESC") (map (match-lambda ((id source git-repository-id created-at succeeded-at events-json log-exists?) (list id commit source git-repository-id created-at succeeded-at (if (or (eq? #f events-json) (string-null? events-json)) #() (json-string->scm events-json)) (string=? log-exists? "t")))) (exec-query conn query (list commit)))) (define (guix-revision-loaded-successfully? conn commit) (define query " SELECT EXISTS( SELECT 1 FROM load_new_guix_revision_jobs INNER JOIN load_new_guix_revision_job_events ON job_id = load_new_guix_revision_jobs.id WHERE commit = $1 AND event = 'success' )") (let ((result (caar (exec-query conn query (list commit))))) (string=? result "t"))) (define (most-recent-n-load-new-guix-revision-jobs conn n) (let ((result (exec-query conn " SELECT id, commit, source, git_repository_id FROM load_new_guix_revision_jobs ORDER BY id ASC LIMIT $1" (list (number->string n))))) result)) (define (select-job-for-update conn id) (exec-query conn " SELECT id, commit, source, git_repository_id FROM load_new_guix_revision_jobs WHERE id = $1 AND succeeded_at IS NULL FOR NO KEY UPDATE SKIP LOCKED" (list id))) (define (record-job-event conn job-id event) (exec-query conn (string-append " INSERT INTO load_new_guix_revision_job_events (job_id, event) VALUES ($1, $2)") (list job-id event))) (define (record-job-succeeded conn id) (exec-query conn (string-append " UPDATE load_new_guix_revision_jobs SET succeeded_at = clock_timestamp() WHERE id = $1 ") (list id))) (define (fetch-unlocked-jobs conn) (define query " SELECT load_new_guix_revision_jobs.id, commit IN ( SELECT commit FROM ( SELECT DISTINCT ON (name) name, git_commits.commit FROM git_branches INNER JOIN git_commits ON git_commits.git_branch_id = git_branches.id WHERE git_branches.git_repository_id = load_new_guix_revision_jobs.git_repository_id ORDER BY name, datetime DESC ) branches_and_latest_commits ) AS latest_branch_commit FROM load_new_guix_revision_jobs INNER JOIN git_repositories ON load_new_guix_revision_jobs.git_repository_id = git_repositories.id WHERE succeeded_at IS NULL AND ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'retry' ) >= ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure' ) ORDER BY latest_branch_commit DESC, git_repositories.job_priority DESC, load_new_guix_revision_jobs.id DESC FOR NO KEY UPDATE OF load_new_guix_revision_jobs SKIP LOCKED") (map (match-lambda ((id priority) (list id (string=? priority "t")))) (exec-query conn query))) (define (open-store-connection) (let ((store (open-connection #:non-blocking? #t #:built-in-builders '("download")))) (set-build-options store #:fallback? #t) store)) (prevent-inlining-for-tests open-store-connection) (define* (with-store-connection proc) (let ((store (open-store-connection))) (define (thunk) (parameterize ((current-store-protocol-version (store-connection-version store))) (call-with-values (lambda () (proc store)) (lambda results (close-connection store) (apply values results))))) (with-exception-handler (lambda (exception) (close-connection store) (raise-exception exception)) thunk))) (prevent-inlining-for-tests with-store-connection) (define* (process-load-new-guix-revision-job id #:key skip-system-tests? extra-inferior-environment-variables parallelism) (define finished-channel (make-channel)) (define result (with-postgresql-connection (simple-format #f "load-new-guix-revision ~A" id) (lambda (conn) ;; Fix the hash encoding of derivation_output_details. This'll only run ;; once on any given database, but is kept here just to make sure any ;; instances have the data updated. (fix-derivation-output-details-hash-encoding conn) (%worker-thread-default-timeout #f) (resource-pool-retry-checkout-timeout 120) (exec-query conn "BEGIN") (spawn-fiber (lambda () (while (perform-operation (choice-operation (wrap-operation (get-operation finished-channel) (const #f)) (wrap-operation (sleep-operation 20) (const #t)))) (let ((stats (gc-stats))) (simple-format (current-error-port) "process-job heap: ~a MiB used (~a MiB heap)~%" (round (/ (- (assoc-ref stats 'heap-size) (assoc-ref stats 'heap-free-size)) (expt 2. 20))) (round (/ (assoc-ref stats 'heap-size) (expt 2. 20)))))))) (match (select-job-for-update conn id) (((id commit source git-repository-id)) ;; With a separate connection, outside of the transaction so the event ;; gets persisted regardless. (with-postgresql-connection (simple-format #f "load-new-guix-revision ~A start-event" id) (lambda (start-event-conn) (record-job-event start-event-conn id "start"))) (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n" id commit source) (if (eq? (with-time-logging (string-append "processing revision " commit) (with-exception-handler (const #f) (lambda () (with-throw-handler #t (lambda () (load-new-guix-revision conn git-repository-id commit #:skip-system-tests? #t #:extra-inferior-environment-variables extra-inferior-environment-variables #:parallelism parallelism)) (lambda (key . args) (simple-format (current-error-port) "error: load-new-guix-revision: ~A ~A\n" key args) (backtrace)))) #:unwind? #t)) #t) (begin (record-job-succeeded conn id) (record-job-event conn id "success") (exec-query conn "COMMIT") #t) (begin (exec-query conn "ROLLBACK") (record-job-event conn id "failure") #f))) (() (exec-query conn "ROLLBACK") (simple-format #t "job ~A not found to be processed\n" id)))))) (when result (parallel-via-fibers (with-postgresql-connection (simple-format #f "post load-new-guix-revision ~A" id) (lambda (conn) (with-time-logging "vacuuming package derivations by guix revision range table" (vacuum-package-derivations-table conn)))) (with-postgresql-connection (simple-format #f "post load-new-guix-revision ~A" id) (lambda (conn) (with-time-logging "vacuum-derivation-inputs-table" (vacuum-derivation-inputs-table conn)) (match (exec-query conn "SELECT reltuples::bigint FROM pg_class WHERE relname = 'derivation_inputs'") (((rows)) ;; Don't attempt counting distinct values if there are too ;; many rows, as that is far to slow and could use up all the ;; disk space. (when (< (string->number rows) 1000000000) (with-time-logging "update-derivation-inputs-statistics" (update-derivation-inputs-statistics conn))))))) (with-postgresql-connection (simple-format #f "post load-new-guix-revision ~A" id) (lambda (conn) (with-time-logging "vacuum-derivation-outputs-table" (vacuum-derivation-outputs-table conn)) (with-time-logging "update-derivation-outputs-statistics" (update-derivation-outputs-statistics conn)))))) (put-message finished-channel #t) result)