(define-module (guix-data-service jobs load-new-guix-revision) #:use-module (srfi srfi-1) #:use-module (ice-9 match) #:use-module (ice-9 hash-table) #:use-module (rnrs exceptions) #:use-module (json) #:use-module (squee) #:use-module (guix monads) #:use-module (guix store) #:use-module (guix channels) #:use-module (guix inferior) #:use-module (guix profiles) #:use-module (guix utils) #:use-module (guix progress) #:use-module (guix packages) #:use-module (guix derivations) #:use-module (guix build utils) #:use-module (guix-data-service config) #:use-module (guix-data-service database) #:use-module (guix-data-service model package) #:use-module (guix-data-service model git-repository) #:use-module (guix-data-service model guix-revision) #:use-module (guix-data-service model package-derivation) #:use-module (guix-data-service model guix-revision-package-derivation) #:use-module (guix-data-service model license-set) #:use-module (guix-data-service model lint-checker) #:use-module (guix-data-service model lint-warning) #:use-module (guix-data-service model lint-warning-message) #:use-module (guix-data-service model location) #:use-module (guix-data-service model package-metadata) #:use-module (guix-data-service model derivation) #:export (log-for-job count-log-parts combine-log-parts! fetch-unlocked-jobs process-load-new-guix-revision-job select-job-for-commit select-jobs-and-events select-unprocessed-jobs-and-events select-jobs-and-events-for-commit record-job-event enqueue-load-new-guix-revision-job most-recent-n-load-new-guix-revision-jobs)) (define (log-port job-id conn) (define output-port (current-output-port)) (define id 0) (define buffer "") (define (insert job_id s) (exec-query conn (string-append "INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) " "VALUES ($1, $2, $3)") (list (number->string id) job_id s))) (define (log-string s) (if (string-contains s "\n") (let ((output (string-append buffer s))) (set! id (+ 1 id)) ; increment id (set! buffer "") ; clear the buffer (insert job-id output) (display output output-port)) (set! buffer (string-append buffer s)))) ;; TODO, this is useful when re-running jobs, but I'm not sure that should ;; be a thing, jobs should probably be only attempted once. (exec-query conn "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" (list job-id)) (let ((port (make-soft-port (vector (lambda (c) (set! buffer (string-append buffer (string c)))) log-string (lambda () (force-output output-port)) #f ; fetch one character (lambda () ;; close port #f) #f) ; number of characters that can be read "w"))) (setvbuf port 'line) port)) (define real-error-port (make-parameter (current-error-port))) (define* (log-for-job conn job-id #:key character-limit start-character) (define (sql-html-escape s) (string-append "replace(" (string-append "replace(" (string-append "replace(" s ",'&','&')") ",'<','<')") ",'>','>')")) (define (get-characters s) (if start-character (simple-format #f "substr(~A, ~A, ~A)" s start-character character-limit) (simple-format #f "right(~A, ~A)" s character-limit))) (define log-query (string-append "SELECT " (sql-html-escape (get-characters "contents")) " FROM load_new_guix_revision_job_logs" " WHERE job_id = $1 AND contents IS NOT NULL")) (define parts-query (string-append "SELECT " (sql-html-escape (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)")) " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1")) (match (exec-query conn log-query (list job-id)) (((contents)) contents) (() (match (exec-query conn parts-query (list job-id)) (((contents)) contents))))) (define (insert-empty-log-entry conn job-id) (exec-query conn "DELETE FROM load_new_guix_revision_job_logs WHERE job_id = $1" (list job-id)) (exec-query conn "INSERT INTO load_new_guix_revision_job_logs (job_id, contents) VALUES ($1, NULL)" (list job-id))) (define (count-log-parts conn job-id) (match (exec-query conn " SELECT COUNT(*) FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" (list job-id)) (((id)) (string->number id)))) (define (combine-log-parts! conn job-id) (with-postgresql-transaction conn (lambda (conn) (exec-query conn (string-append "UPDATE load_new_guix_revision_job_logs SET contents = " "(" "SELECT STRING_AGG(contents, '' ORDER BY id ASC) FROM " "load_new_guix_revision_job_log_parts WHERE job_id = $1 " "GROUP BY job_id" ")" "WHERE job_id = $1") (list job-id)) (exec-query conn "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" (list job-id))))) (define inferior-package-id (@@ (guix inferior) inferior-package-id)) (define (log-time action f) (simple-format #t "debug: Starting ~A\n" action) (let* ((start-time (current-time)) (result (f)) (time-taken (- (current-time) start-time))) (simple-format #t "debug: Finished ~A, took ~A seconds\n" action time-taken) result)) (define (record-start-time action) (simple-format #t "debug: Starting ~A\n" action) (cons action (current-time))) (define record-end-time (match-lambda ((action . start-time) (let ((time-taken (- (current-time) start-time))) (simple-format #t "debug: Finished ~A, took ~A seconds\n" action time-taken))))) (define (all-inferior-lint-warnings inf store) (define locales '("cs_CZ.utf8" "da_DK.utf8" "de_DE.utf8" "eo_EO.utf8" "es_ES.utf8" "fr_FR.utf8" "hu_HU.utf8" "pl_PL.utf8" "pt_BR.utf8" ;;"sr_SR.utf8" "sv_SE.utf8" "vi_VN.utf8" "zh_CN.utf8")) (define (lint-warnings-for-checker checker-name) `(lambda (store) (let* ((checker (find (lambda (checker) (eq? (lint-checker-name checker) ',checker-name)) %local-checkers)) (check (lint-checker-check checker))) (filter (match-lambda ((package-id . warnings) (not (null? warnings)))) (hash-map->list (lambda (package-id package) (cons package-id (map (lambda (lint-warning) (list (match (lint-warning-location lint-warning) (($ file line column) (list (if (string-prefix? "/gnu/store/" file) ;; Convert a string like ;; /gnu/store/53xh0mpigin2rffg31s52x5dc08y0qmr-guix-module-union/share/guile/site/2.2/gnu/packages/xdisorg.scm ;; ;; This happens when the checker uses ;; package-field-location. (string-join (drop (string-split file #\/) 8) "/") file) line column))) (let* ((source-locale "en_US.utf8") (source-message (begin (setlocale LC_MESSAGES source-locale) (lint-warning-message lint-warning))) (messages-by-locale (filter-map (lambda (locale) (catch 'system-error (lambda () (setlocale LC_MESSAGES locale)) (lambda (key . args) (error (simple-format #f "error changing locale to ~A: ~A ~A" locale key args)))) (let ((message (lint-warning-message lint-warning))) (setlocale LC_MESSAGES source-locale) (if (string=? message source-message) #f (cons locale message)))) (list ,@locales)))) (cons (cons source-locale source-message) messages-by-locale)))) (check package)))) %package-table))))) (and (catch 'misc-error (lambda () (inferior-eval '(use-modules (guix lint)) inf) #t) (lambda (key . args) (simple-format (current-error-port) "warning: failed to load the (guix lint) module: ~A ~A\n" key args) #f)) (let ((checkers (inferior-eval '(begin (map (lambda (checker) (list (lint-checker-name checker) (lint-checker-description checker) (if (memq checker %network-dependent-checkers) #t #f))) %all-checkers)) inf))) (map (match-lambda ((name description network-dependent?) (cons (list name description network-dependent?) (if network-dependent? '() (log-time (simple-format #f "getting ~A lint warnings" name) (lambda () (inferior-eval-with-store inf store (lint-warnings-for-checker name)))))))) checkers)))) (define (all-inferior-package-derivations store inf packages) (define inferior-%supported-systems (inferior-eval '(@ (guix packages) %supported-systems) inf)) (define supported-system-pairs (map (lambda (system) (cons system system)) inferior-%supported-systems)) (define supported-system-cross-build-pairs (map (lambda (system) (filter-map (lambda (target) (and (not (string=? system target)) (cons system target))) inferior-%supported-systems)) inferior-%supported-systems)) (define (proc packages system-target-pairs) `(lambda (store) (append-map (lambda (inferior-package-id) (let ((package (hashv-ref %package-table inferior-package-id))) (catch #t (lambda () (let ((supported-systems (catch #t (lambda () (package-transitive-supported-systems package)) (lambda (key . args) (simple-format (current-error-port) "error: while processing ~A, unable to compute transitive supported systems\n" (package-name package)) (simple-format (current-error-port) "error ~A: ~A\n" key args) #f)))) (if supported-systems (append-map (lambda (system) (filter-map (lambda (target) (catch 'misc-error (lambda () (guard (c ((package-cross-build-system-error? c) #f)) (list inferior-package-id system target (derivation-file-name (if (string=? system target) (package-derivation store package system) (package-cross-derivation store package target system)))))) (lambda args ;; misc-error #f ~A ~S (No ;; cross-compilation for ;; clojure-build-system yet: #f))) (lset-intersection string=? supported-systems (list ,@(map cdr system-target-pairs))))) (lset-intersection string=? supported-systems (list ,@(map car system-target-pairs)))) '()))) (lambda (key . args) (if (and (eq? key 'system-error) (eq? (car args) 'fport_write)) (begin (simple-format (current-error-port) "error: while processing ~A, exiting: ~A: ~A\n" (package-name package) key args) (exit 1)) (begin (simple-format (current-error-port) "error: while processing ~A ignoring error: ~A: ~A\n" (package-name package) key args) '())))))) (list ,@(map inferior-package-id packages))))) (append-map (lambda (system-target-pairs) (format (current-error-port) "heap size: ~a MiB~%" (round (/ (assoc-ref (gc-stats) 'heap-size) (expt 2. 20)))) (log-time (simple-format #f "getting derivations for ~A" system-target-pairs) (lambda () (catch 'match-error (lambda () (inferior-eval '(invalidate-derivation-caches!) inf)) (lambda (key . args) (simple-format (current-error-port) "warning: ignoring match-error from calling inferior invalidate-derivation-caches!\n"))) (inferior-eval-with-store inf store (proc packages system-target-pairs))))) (append (map list supported-system-pairs) supported-system-cross-build-pairs))) (define (deduplicate-inferior-packages packages) (pair-fold (lambda (pair result) (if (null? (cdr pair)) (cons (first pair) result) (let* ((a (first pair)) (b (second pair)) (a-name (inferior-package-name a)) (b-name (inferior-package-name b)) (a-version (inferior-package-version a)) (b-version (inferior-package-version b))) (if (and (string=? a-name b-name) (string=? a-version b-version)) (begin (simple-format (current-error-port) "warning: ignoring duplicate package: ~A (~A)\n" a-name a-version) result) (cons a result))))) '() (sort packages (lambda (a b) (let ((a-name (inferior-package-name a)) (b-name (inferior-package-name b))) (if (string=? a-name b-name) (stringlicense-set-ids conn inf packages)))) (packages-metadata-ids (log-time "fetching inferior package metadata" (lambda () (inferior-packages->package-metadata-ids conn packages package-license-set-ids))))) (log-time "getting package-ids" (lambda () (inferior-packages->package-ids conn (zip (map inferior-package-name packages) (map inferior-package-version packages) packages-metadata-ids)))))) (define (insert-lint-warnings conn inferior-package-id->package-database-id lint-checker-ids lint-warnings-data) (lint-warnings-data->lint-warning-ids conn (append-map (lambda (lint-checker-id warnings-by-package-id) (append-map (match-lambda ((package-id . warnings) (map (match-lambda ((location-data messages-by-locale) (let ((location-id (location->location-id conn (apply location location-data))) (lint-warning-message-set-id (lint-warning-message-data->lint-warning-message-set-id conn messages-by-locale))) (list lint-checker-id (inferior-package-id->package-database-id package-id) location-id lint-warning-message-set-id)))) (fold (lambda (location-and-messages result) (if (member location-and-messages result) (begin (apply simple-format (current-error-port) "warning: skipping duplicate lint warning ~A ~A\n" location-and-messages) result) (append result (list location-and-messages)))) '() warnings)))) warnings-by-package-id)) lint-checker-ids (map cdr lint-warnings-data)))) (define (inferior-data->package-derivation-ids conn inf inferior-package-id->package-database-id inferior-data-4-tuples) (let ((derivation-ids (derivation-file-names->derivation-ids conn (map fourth inferior-data-4-tuples))) (flat-package-ids-systems-and-targets (map (match-lambda ((inferior-package-id system target derivation-file-name) (list (inferior-package-id->package-database-id inferior-package-id) system target))) inferior-data-4-tuples))) (insert-package-derivations conn flat-package-ids-systems-and-targets derivation-ids))) (define guix-store-path (let ((store-path #f)) (lambda (store) (if (and store-path (file-exists? store-path)) store-path (let ((config-guix (%config 'guix))) (if (and (file-exists? config-guix) (string-prefix? "/gnu/store/" config-guix)) (begin (set! store-path (dirname (dirname (%config 'guix)))) store-path) (begin (invalidate-derivation-caches!) (hash-clear! (@@ (guix packages) %derivation-cache)) (let* ((guix-package (@ (gnu packages package-management) guix)) (derivation (package-derivation store guix-package))) (log-time "building the guix derivation" (lambda () (build-derivations store (list derivation)))) (let ((new-store-path (derivation->output-path derivation))) (set! store-path new-store-path) (simple-format (current-error-port) "debug: guix-store-path: ~A\n" new-store-path) new-store-path))))))))) (define (nss-certs-store-path store) (let* ((nss-certs-package (@ (gnu packages certs) nss-certs)) (derivation (package-derivation store nss-certs-package))) (log-time "building the nss-certs derivation" (lambda () (build-derivations store (list derivation)))) (derivation->output-path derivation))) (define (channel->derivation-file-name store channel) (define use-container? (defined? 'open-inferior/container (resolve-module '(guix inferior)))) (let ((inferior (if use-container? (open-inferior/container store (guix-store-path store) #:extra-shared-directories '("/gnu/store") #:extra-environment-variables (list (string-append "SSL_CERT_DIR=" (nss-certs-store-path store)))) (begin (simple-format #t "debug: using open-inferior\n") (open-inferior (guix-store-path store) #:error-port (real-error-port)))))) (catch #t (lambda () (with-throw-handler #t (lambda () ;; /etc is only missing if open-inferior/container has been used (when use-container? (inferior-eval '(begin ;; Create /etc/pass, as %known-shorthand-profiles in (guix ;; profiles) tries to read from this file. Because the environment ;; is cleaned in build-self.scm, xdg-directory in (guix utils) ;; falls back to accessing /etc/passwd. (mkdir "/etc") (call-with-output-file "/etc/passwd" (lambda (port) (display "root:x:0:0::/root:/bin/bash" port)))) inferior)) (let ((channel-instance (first (latest-channel-instances store (list channel))))) (inferior-eval '(use-modules (guix channels) (guix profiles)) inferior) (inferior-eval '(define channel-instance (@@ (guix channels) channel-instance)) inferior) (let ((file-name (inferior-eval-with-store inferior store `(lambda (store) (let ((instances (list (channel-instance (channel (name ',(channel-name channel)) (url ,(channel-url channel)) (branch ,(channel-branch channel)) (commit ,(channel-commit channel))) ,(channel-instance-commit channel-instance) ,(channel-instance-checkout channel-instance))))) (run-with-store store (mlet* %store-monad ((manifest (channel-instances->manifest instances)) (derv (profile-derivation manifest))) (mbegin %store-monad (return (derivation-file-name derv)))))))))) (close-inferior inferior) file-name))) (lambda (key . parameters) (display (backtrace) (current-error-port)) (display "\n" (current-error-port)) (simple-format (current-error-port) "error: channel->derivation-file-name: ~A: ~A\n" key parameters)))) (lambda args (close-inferior inferior) #f)))) (define (channel->manifest-store-item conn store channel) (let* ((manifest-store-item-derivation-file-name (log-time "computing the channel derivation" (lambda () ;; Obtain a session level lock here, to avoid conflicts with ;; other jobs over the Git repository. (with-advisory-session-lock conn 'channel->manifest-store-item (lambda () (channel->derivation-file-name store channel)))))) (derivation (read-derivation-from-file manifest-store-item-derivation-file-name))) (simple-format (current-error-port) "debug: channel dervation: ~A\n" manifest-store-item-derivation-file-name) (log-time "building the channel derivation" (lambda () (build-derivations store (list derivation)))) (derivation->output-path derivation))) (define (channel->guix-store-item conn store channel) (catch #t (lambda () (dirname (readlink (string-append (channel->manifest-store-item conn store channel) "/bin")))) (lambda args (simple-format #t "guix-data-service: load-new-guix-revision: error: ~A\n" args) #f))) (define (glibc-locales-for-guix-store-path store store-path) (let ((inf (if (defined? 'open-inferior/container (resolve-module '(guix inferior))) (open-inferior/container store store-path #:extra-shared-directories '("/gnu/store")) (begin (simple-format #t "debug: using open-inferior\n") (open-inferior store-path #:error-port (real-error-port)))))) (inferior-eval '(use-modules (srfi srfi-1) (srfi srfi-34) (guix grafts) (guix derivations)) inf) (inferior-eval '(when (defined? '%graft?) (%graft? #f)) inf) (let* ((inferior-glibc-locales (first (lookup-inferior-packages inf "glibc-locales"))) (derivation (inferior-package-derivation store inferior-glibc-locales)) (output (derivation->output-path derivation))) (close-inferior inf) (log-time "building the glibc-locales derivation" (lambda () (build-derivations store (list derivation)))) output))) (define (extract-information-from conn git-repository-id commit store-path) (simple-format #t "debug: extract-information-from: ~A\n" store-path) (with-store store (set-build-options store #:fallback? #t) (let* ((guix-locpath (getenv "GUIX_LOCPATH")) (inf (let ((guix-locpath (string-append (glibc-locales-for-guix-store-path store store-path) "/lib/locale" ":" guix-locpath))) ;; Unset the GUILE_LOAD_PATH and GUILE_LOAD_COMPILED_PATH to ;; avoid the values for these being used in the ;; inferior. Even though the inferior %load-path and ;; %load-compiled-path has the inferior modules first, this ;; can cause issues when there are modules present outside ;; of the inferior Guix which aren't present in the inferior ;; Guix (like the new (guix lint) module (unsetenv "GUILE_LOAD_PATH") (unsetenv "GUILE_LOAD_COMPILED_PATH") ;; Augment the GUIX_LOCPATH to include glibc-locales from ;; the Guix at store-path, this should mean that the ;; inferior Guix works, even if it's build using a different ;; glibc version (setenv "GUIX_LOCPATH" guix-locpath) (simple-format (current-error-port) "debug: set GUIX_LOCPATH to ~A\n" guix-locpath) (if (defined? 'open-inferior/container (resolve-module '(guix inferior))) (open-inferior/container store store-path #:extra-shared-directories '("/gnu/store")) (begin (simple-format #t "debug: using open-inferior\n") (open-inferior store-path #:error-port (real-error-port))))))) (setenv "GUIX_LOCPATH" guix-locpath) ; restore GUIX_LOCPATH (when (eq? inf #f) (error "error: inferior is #f")) ;; Normalise the locale for the inferior process (catch 'system-error (lambda () (inferior-eval '(setlocale LC_ALL "en_US.utf8") inf)) (lambda (key . args) (simple-format (current-error-port) "warning: failed to set locale to en_US.utf8: ~A ~A\n" key args) (display "trying to setlocale to en_US.UTF-8 instead\n" (current-error-port)) (with-exception-handler (lambda (key . args) (simple-format (current-error-port) "warning: failed to set locale to en_US.UTF-8: ~A ~A\n" key args)) (lambda () (inferior-eval '(setlocale LC_ALL "en_US.UTF-8") inf))))) (inferior-eval '(use-modules (srfi srfi-1) (srfi srfi-34) (guix grafts) (guix derivations)) inf) (inferior-eval '(when (defined? '%graft?) (%graft? #f)) inf) (catch #t (lambda () (let* ((packages (log-time "fetching inferior packages" (lambda () (deduplicate-inferior-packages (inferior-packages inf))))) (inferior-lint-warnings (log-time "fetching inferior lint warnings" (lambda () (all-inferior-lint-warnings inf store)))) (inferior-data-4-tuples (log-time "getting inferior derivations" (lambda () (all-inferior-package-derivations store inf packages))))) ;; Wait until this is the only transaction inserting data, to ;; avoid any concurrency issues (obtain-advisory-transaction-lock conn 'load-new-guix-revision-inserts) (let* ((package-ids (insert-packages conn inf packages)) (inferior-package-id->package-database-id (let ((lookup-table (alist->hashq-table (map (lambda (package package-id) (cons (inferior-package-id package) package-id)) packages package-ids)))) (lambda (inferior-id) (hashq-ref lookup-table inferior-id))))) (simple-format #t "debug: finished loading information from inferior\n") (close-inferior inf) (let ((guix-revision-id (insert-guix-revision conn git-repository-id commit store-path))) (when inferior-lint-warnings (let* ((lint-checker-ids (lint-checkers->lint-checker-ids conn (map car inferior-lint-warnings))) (lint-warning-ids (insert-lint-warnings conn inferior-package-id->package-database-id lint-checker-ids inferior-lint-warnings))) (insert-guix-revision-lint-checkers conn guix-revision-id lint-checker-ids) (insert-guix-revision-lint-warnings conn guix-revision-id lint-warning-ids))) (let ((package-derivation-ids (inferior-data->package-derivation-ids conn inf inferior-package-id->package-database-id inferior-data-4-tuples))) (insert-guix-revision-package-derivations conn guix-revision-id package-derivation-ids) (simple-format #t "Successfully loaded ~A package/derivation pairs\n" (length package-derivation-ids)))))) #t) (lambda (key . args) (simple-format (current-error-port) "Failed extracting information from commit: ~A\n\n" commit) (simple-format (current-error-port) " ~A ~A\n\n" key args) #f) (lambda (key . args) (display-backtrace (make-stack #t) (current-error-port))))))) (define (store-item-for-git-repository-id-and-commit conn git-repository-id commit) (with-store store (set-build-options store #:fallback? #t) (channel->guix-store-item conn store (channel (name 'guix) (url (git-repository-id->url conn git-repository-id)) (commit commit))))) (define (update-package-versions-table conn git-repository-id commit) ;; Lock the table to wait for other transactions to commit before updating ;; the table (exec-query conn " LOCK TABLE ONLY package_versions_by_guix_revision_range IN SHARE ROW EXCLUSIVE MODE") (for-each (match-lambda ((branch-name) (log-time (simple-format #f "deleting package version entries for ~A" branch-name) (lambda () (exec-query conn " DELETE FROM package_versions_by_guix_revision_range WHERE git_repository_id = $1 AND branch_name = $2" (list git-repository-id branch-name)))) (log-time (simple-format #f "inserting package version entries for ~A" branch-name) (lambda () (exec-query conn " INSERT INTO package_versions_by_guix_revision_range SELECT DISTINCT $1::integer AS git_repository_id, $2 AS branch_name, packages.name AS package_name, packages.version AS package_version, first_value(guix_revisions.id) OVER package_version AS first_guix_revision_id, last_value(guix_revisions.id) OVER package_version AS last_guix_revision_id FROM packages INNER JOIN ( SELECT DISTINCT package_derivations.package_id, guix_revision_package_derivations.revision_id FROM package_derivations INNER JOIN guix_revision_package_derivations ON package_derivations.id = guix_revision_package_derivations.package_derivation_id ) AS revision_packages ON packages.id = revision_packages.package_id INNER JOIN guix_revisions ON revision_packages.revision_id = guix_revisions.id INNER JOIN git_branches ON guix_revisions.commit = git_branches.commit WHERE git_branches.name = $2 WINDOW package_version AS ( PARTITION BY packages.name, packages.version ORDER BY git_branches.datetime RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) ORDER BY packages.name, packages.version" (list git-repository-id branch-name)))))) (exec-query conn "SELECT name FROM git_branches WHERE commit = $1 AND git_repository_id = $2" (list commit git-repository-id))) #t) (define (load-new-guix-revision conn git-repository-id commit) (let ((store-item (store-item-for-git-repository-id-and-commit conn git-repository-id commit))) (if store-item (and (extract-information-from conn git-repository-id commit store-item) (update-package-versions-table conn git-repository-id commit)) (begin (simple-format #t "Failed to generate store item for ~A\n" commit) #f)))) (define (enqueue-load-new-guix-revision-job conn git-repository-id commit source) (define query " INSERT INTO load_new_guix_revision_jobs (git_repository_id, commit, source) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING RETURNING id;") (match (exec-query conn query (list (number->string git-repository-id) commit source)) ((result) result) (() #f))) (define (select-job-for-commit conn commit) (let ((result (exec-query conn (string-append "SELECT id, commit, source, git_repository_id " "FROM load_new_guix_revision_jobs WHERE commit = $1") (list commit)))) result)) (define (select-jobs-and-events conn) (define query " SELECT load_new_guix_revision_jobs.id, load_new_guix_revision_jobs.commit, load_new_guix_revision_jobs.source, load_new_guix_revision_jobs.git_repository_id, load_new_guix_revision_jobs.created_at, load_new_guix_revision_jobs.succeeded_at, ( SELECT JSON_AGG( json_build_object('event', event, 'occurred_at', occurred_at) ORDER BY occurred_at ASC ) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id ), EXISTS ( SELECT 1 FROM load_new_guix_revision_job_logs WHERE job_id = load_new_guix_revision_jobs.id ) AS log_exists FROM load_new_guix_revision_jobs ORDER BY load_new_guix_revision_jobs.id DESC") (map (match-lambda ((id commit source git-repository-id created-at succeeded-at events-json log-exists?) (list id commit source git-repository-id created-at succeeded-at (if (string-null? events-json) #() (json-string->scm events-json)) (string=? log-exists? "t")))) (exec-query conn query))) (define (select-unprocessed-jobs-and-events conn) (define query " SELECT load_new_guix_revision_jobs.id, load_new_guix_revision_jobs.commit, load_new_guix_revision_jobs.source, load_new_guix_revision_jobs.git_repository_id, load_new_guix_revision_jobs.created_at, ( SELECT JSON_AGG( json_build_object('event', event, 'occurred_at', occurred_at) ORDER BY occurred_at ASC ) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id ), EXISTS ( SELECT 1 FROM load_new_guix_revision_job_logs WHERE job_id = load_new_guix_revision_jobs.id ) AS log_exists, commit IN ( SELECT commit FROM ( SELECT DISTINCT ON (name) name, git_branches.commit FROM git_branches WHERE git_branches.git_repository_id = load_new_guix_revision_jobs.git_repository_id AND git_branches.commit IS NOT NULL ORDER BY name, datetime DESC ) branches_and_latest_commits ) AS latest_branch_commit FROM load_new_guix_revision_jobs WHERE succeeded_at IS NULL AND ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'retry' ) >= ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure' ) ORDER BY latest_branch_commit DESC, id DESC") (map (match-lambda ((id commit source git-repository-id created-at events-json log-exists? latest-branch-commit) (list id commit source git-repository-id created-at (if (string-null? events-json) #() (json-string->scm events-json)) (string=? log-exists? "t") (string=? latest-branch-commit "t")))) (exec-query conn query))) (define (select-jobs-and-events-for-commit conn commit) (define query " SELECT load_new_guix_revision_jobs.id, load_new_guix_revision_jobs.source, load_new_guix_revision_jobs.git_repository_id, load_new_guix_revision_jobs.created_at, load_new_guix_revision_jobs.succeeded_at, ( SELECT JSON_AGG( json_build_object('event', event, 'occurred_at', occurred_at) ORDER BY occurred_at ASC ) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id ), EXISTS ( SELECT 1 FROM load_new_guix_revision_job_logs WHERE job_id = load_new_guix_revision_jobs.id ) AS log_exists FROM load_new_guix_revision_jobs WHERE commit = $1 ORDER BY load_new_guix_revision_jobs.id DESC") (map (match-lambda ((id source git-repository-id created-at succeeded-at events-json log-exists?) (list id commit source git-repository-id created-at succeeded-at (if (string-null? events-json) #() (json-string->scm events-json)) (string=? log-exists? "t")))) (exec-query conn query (list commit)))) (define (most-recent-n-load-new-guix-revision-jobs conn n) (let ((result (exec-query conn (string-append "SELECT id, commit, source, git_repository_id " "FROM load_new_guix_revision_jobs ORDER BY id ASC LIMIT $1") (list (number->string n))))) result)) (define (select-job-for-update conn id) (exec-query conn (string-append "SELECT id, commit, source, git_repository_id " "FROM load_new_guix_revision_jobs " "WHERE id = $1 AND succeeded_at IS NULL " "FOR NO KEY UPDATE SKIP LOCKED") (list id))) (define (record-job-event conn job-id event) (exec-query conn (string-append "INSERT INTO load_new_guix_revision_job_events (job_id, event) " "VALUES ($1, $2)") (list job-id event))) (define (record-job-succeeded conn id) (exec-query conn (string-append "UPDATE load_new_guix_revision_jobs " "SET succeeded_at = clock_timestamp() " "WHERE id = $1 ") (list id))) (define (fetch-unlocked-jobs conn) (define query " SELECT id, commit IN ( SELECT commit FROM ( SELECT DISTINCT ON (name) name, git_branches.commit FROM git_branches WHERE git_branches.git_repository_id = load_new_guix_revision_jobs.git_repository_id AND git_branches.commit IS NOT NULL ORDER BY name, datetime DESC ) branches_and_latest_commits ) AS latest_branch_commit FROM load_new_guix_revision_jobs WHERE succeeded_at IS NULL AND ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'retry' ) >= ( SELECT COUNT(*) FROM load_new_guix_revision_job_events WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure' ) ORDER BY latest_branch_commit DESC, id DESC FOR NO KEY UPDATE OF load_new_guix_revision_jobs SKIP LOCKED") (map (match-lambda ((id priority) (list id (string=? priority "t")))) (exec-query conn query))) (define (process-load-new-guix-revision-job id) (with-postgresql-connection (simple-format #f "load-new-guix-revision ~A" id) (lambda (conn) (exec-query conn "BEGIN") (match (select-job-for-update conn id) (((id commit source git-repository-id)) ;; With a separate connection, outside of the transaction so the event ;; gets persisted regardless. (with-postgresql-connection (simple-format #f "load-new-guix-revision ~A start-event" id) (lambda (start-event-conn) (record-job-event start-event-conn id "start"))) (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n" id commit source) (if (or (guix-revision-exists? conn git-repository-id commit) (eq? (log-time (string-append "loading revision " commit) (lambda () (let* ((previous-output-port (current-output-port)) (previous-error-port (current-error-port)) (result (with-postgresql-connection (simple-format #f "load-new-guix-revision ~A logging" id) (lambda (logging-conn) (insert-empty-log-entry logging-conn id) (let ((logging-port (log-port id logging-conn))) (set-current-output-port logging-port) (set-current-error-port logging-port) (let ((result (parameterize ((current-build-output-port logging-port) (real-error-port previous-error-port)) (load-new-guix-revision conn git-repository-id commit)))) (combine-log-parts! logging-conn id) ;; This can happen with GC, so do it explicitly (close-port logging-port) result)))))) (set-current-output-port previous-output-port) (set-current-error-port previous-error-port) result))) #t)) (begin (record-job-succeeded conn id) (record-job-event conn id "success") (exec-query conn "COMMIT") #t) (begin (exec-query conn "ROLLBACK") (record-job-event conn id "failure") #f))) (() (simple-format #t "job ~A not found to be processed\n" id))))))