diff options
author | Christopher Baines <mail@cbaines.net> | 2023-11-05 13:42:03 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2023-11-05 13:46:20 +0000 |
commit | c3cb04cb8052492dae715012586f9e3d2d64579d (patch) | |
tree | 1f56c415ef46e68d60d623e0a0bea70c4bd74ceb | |
parent | f5acc60288e0ad9f0c1093f3d50af1347e4df1df (diff) | |
download | data-service-c3cb04cb8052492dae715012586f9e3d2d64579d.tar data-service-c3cb04cb8052492dae715012586f9e3d2d64579d.tar.gz |
Use fibers when processing new revisions
Just have one fiber at the moment, but this will enable using fibers for
parallelism in the future.
Fibers seemed to cause problems with the logging setup, which was a bit odd in
the first place. So move logging to the parent process which is better anyway.
-rw-r--r-- | guix-data-service/jobs.scm | 241 | ||||
-rw-r--r-- | guix-data-service/jobs/load-new-guix-revision.scm | 503 | ||||
-rw-r--r-- | guix-data-service/model/package.scm | 11 | ||||
-rw-r--r-- | guix-data-service/web/jobs/controller.scm | 1 | ||||
-rw-r--r-- | scripts/guix-data-service-process-job.in | 26 | ||||
-rw-r--r-- | tests/jobs-load-new-guix-revision.scm | 52 |
6 files changed, 365 insertions, 469 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm index a0f59dc..b151367 100644 --- a/guix-data-service/jobs.scm +++ b/guix-data-service/jobs.scm @@ -20,11 +20,93 @@ #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 atomic) + #:use-module (ice-9 textual-ports) + #:use-module (squee) + #:use-module (guix-data-service utils) + #:use-module (guix-data-service database) #:use-module (guix-data-service jobs load-new-guix-revision) - #:export (process-jobs + #:export (log-for-job + count-log-parts + combine-log-parts! + + process-jobs default-max-processes)) +(define (log-part-sequence-name job-id) + (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id)) + +(define (start-thread-for-process-output job-id) + (define (insert conn job_id s) + (exec-query + conn + (string-append " +INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) +VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") + (list job_id s))) + + + (match (pipe) + ((port-to-read-from . port-to-write-to) + + (setvbuf port-to-read-from 'line) + (setvbuf port-to-write-to 'line) + + (let ((flags (fcntl port-to-read-from F_GETFL))) + (fcntl port-to-read-from F_SETFL (logior O_NONBLOCK flags))) + (let ((flags (fcntl port-to-write-to F_GETFL))) + (fcntl port-to-write-to F_SETFL (logior O_NONBLOCK flags))) + + (call-with-new-thread + (lambda () + (with-postgresql-connection + (simple-format #f "~A job logging" job-id) + (lambda (logging-conn) + (exec-query + logging-conn + (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A" + (log-part-sequence-name job-id))) + (exec-query + logging-conn + "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" + (list job-id)) + + (let loop ((line (get-line port-to-read-from))) + (let ((line-with-newline + (string-append line "\n"))) + (catch #t + (lambda () + (insert logging-conn job-id line-with-newline) + (display line-with-newline)) + (lambda (key . args) + (display + (simple-format + #f + " +error: ~A: ~A +error: could not insert log part: '~A'\n\n" + key args line)) + (catch #t + (lambda () + (insert + logging-conn + job-id + (simple-format + #f + " +guix-data-service: error: missing log line: ~A +\n" key))) + (lambda _ + #t))))) + (loop (get-line port-to-read-from))))))) + + port-to-write-to))) + +(define (cleanup-logging conn job-id) + (drop-log-parts-sequence conn job-id) + (with-time-logging "vacuuming log parts" + (vacuum-log-parts-table conn))) + (define* (process-jobs conn #:key max-processes latest-branch-revision-max-processes skip-system-tests?) @@ -32,29 +114,137 @@ (fetch-unlocked-jobs conn)) (define (process-job job-id) - (apply execlp - "guix-data-service-process-job" - "guix-data-service-process-job" - job-id - (if skip-system-tests? - '("--skip-system-tests") - '()))) + (let ((log-port (start-thread-for-process-output job-id))) + (spawn + "guix-data-service-process-job" + (cons* "guix-data-service-process-job" + job-id + (if skip-system-tests? + '("--skip-system-tests") + '())) + #:output log-port + #:error log-port))) + + (define (post-job job-id) + (when (> (count-log-parts conn job-id) + 0) + (combine-log-parts! conn job-id) + (cleanup-logging conn job-id))) (define (handle-job-failure job-id) (record-job-event conn job-id "failure") (display (simple-format #f "recording failure for job ~A\n" job-id) - (current-error-port)) - (when (> (count-log-parts conn job-id) - 0) - (combine-log-parts! conn job-id))) + (current-error-port))) (process-jobs-concurrently fetch-new-jobs process-job + post-job handle-job-failure #:max-processes max-processes #:priority-max-processes latest-branch-revision-max-processes)) + +(define* (log-for-job conn job-id + #:key + character-limit + start-character) + (define (sql-html-escape s) + (string-append + "replace(" + (string-append + "replace(" + (string-append + "replace(" + s + ",'&','&')") + ",'<','<')") + ",'>','>')")) + + (define (get-characters s) + (if start-character + (simple-format #f "substr(~A, ~A, ~A)" + s start-character + character-limit) + (simple-format #f "right(~A, ~A)" s character-limit))) + + (define log-query + (string-append + "SELECT " + (sql-html-escape (get-characters "contents")) + " FROM load_new_guix_revision_job_logs" + " WHERE job_id = $1 AND contents IS NOT NULL")) + + (define parts-query + (string-append + "SELECT " + (sql-html-escape + (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)")) + " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1")) + + (match (exec-query conn log-query (list job-id)) + (((contents)) + contents) + (() + (match (exec-query conn parts-query (list job-id)) + (((contents)) + contents))))) + +(define (count-log-parts conn job-id) + (match (exec-query + conn + " +SELECT COUNT(*) +FROM load_new_guix_revision_job_log_parts +WHERE job_id = $1" + (list job-id)) + (((id)) + (string->number id)))) + +(define (combine-log-parts! conn job-id) + (with-postgresql-transaction + conn + (lambda (conn) + (exec-query + conn + (string-append + " +UPDATE load_new_guix_revision_job_logs SET contents = ( + SELECT STRING_AGG(contents, '' ORDER BY id ASC) + FROM load_new_guix_revision_job_log_parts + WHERE job_id = $1 + GROUP BY job_id +) +WHERE job_id = $1") + (list job-id)) + (exec-query + conn + "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" + (list job-id))))) + +(define (drop-log-parts-sequence conn job-id) + (with-postgresql-transaction + conn + (lambda (conn) + (exec-query conn + "SET LOCAL lock_timeout = '10s'") + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "error when dropping sequence: ~A" + exn)) + (lambda () + (exec-query conn + (string-append + "DROP SEQUENCE IF EXISTS " + (log-part-sequence-name job-id)))) + #:unwind? #t)))) + +(define (vacuum-log-parts-table conn) + (exec-query + conn + "VACUUM load_new_guix_revision_job_log_parts")) + (define default-max-processes (max (round (/ (current-processor-count) 4)) @@ -67,6 +257,7 @@ (define* (process-jobs-concurrently fetch-new-jobs process-job + post-job handle-job-failure #:key (max-processes default-max-processes) @@ -108,9 +299,10 @@ ;; No process to wait for #f) ((pid . status) - (unless (eq? status 0) - (match (hash-ref processes pid) - ((_ (id)) + (match (hash-ref processes pid) + ((_ (id)) + (post-job id) + (unless (eq? status 0) (simple-format (current-error-port) "pid ~A (job: ~A) failed with status ~A\n" pid id status) @@ -161,20 +353,6 @@ (kill pid SIGTERM))) processes)) - (define (fork-and-process-job job-args) - (match (primitive-fork) - (0 - (dynamic-wind - (const #t) - (lambda () - (apply process-job job-args)) - (lambda () - (primitive-exit 127)))) - (pid - (hashv-set! processes pid - (list (current-time) job-args)) - #t))) - (define exit? (make-atomic-box #f)) @@ -206,6 +384,9 @@ (if priority? priority-max-processes max-processes)) - (fork-and-process-job (list job-id)))))) + (let ((pid (process-job job-id))) + (peek "PID" pid) + (hashv-set! processes pid + (list (current-time) (list job-id)))))))) jobs))) (sleep 15))) diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index 796bfc5..705ec41 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -19,13 +19,16 @@ #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) #:use-module (srfi srfi-43) + #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 threads) + #:use-module (ice-9 custom-ports) #:use-module (ice-9 textual-ports) #:use-module (ice-9 hash-table) #:use-module (rnrs exceptions) #:use-module (json) #:use-module (squee) + #:use-module (fibers) #:use-module (guix monads) #:use-module (guix store) #:use-module (guix channels) @@ -61,10 +64,7 @@ #:use-module (guix-data-service model package-metadata) #:use-module (guix-data-service model derivation) #:use-module (guix-data-service model system-test) - #:export (log-for-job - count-log-parts - combine-log-parts! - fetch-unlocked-jobs + #:export (fetch-unlocked-jobs process-load-new-guix-revision-job select-load-new-guix-revision-job-metrics select-job-for-commit @@ -77,259 +77,6 @@ enqueue-load-new-guix-revision-job most-recent-n-load-new-guix-revision-jobs)) -(define (log-part-sequence-name job-id) - (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id)) - -(define* (log-port job-id conn - #:key - delete-existing-log-parts? - real-output-port) - (define output-port - (or real-output-port - (current-output-port))) - - (define buffer "") - - (define (insert job_id s) - (exec-query - conn - (string-append - " -INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) -VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") - (list job_id s))) - - (define (log-string s) - (if (string-contains s "\n") - (let ((output (string-append buffer s))) - (set! buffer "") ; clear the buffer - (catch #t - (lambda () - (insert job-id output) - (display output output-port)) - (lambda (key . args) - (display - (simple-format - #f - " -error: ~A: ~A -error: could not insert log part: '~A'\n\n" - key args output) - output-port) - (catch #t - (lambda () - (insert - job-id - (simple-format - #f - " -guix-data-service: error: missing log line: ~A -\n" key))) - (lambda () - #t))))) - (set! buffer (string-append buffer s)))) - - (exec-query - conn - (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A" - (log-part-sequence-name job-id))) - (when delete-existing-log-parts? - ;; TODO, this is useful when re-running jobs, but I'm not sure that should - ;; be a thing, jobs should probably be only attempted once. - (exec-query - conn - "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" - (list job-id))) - - (let ((port - (make-soft-port - (vector (lambda (c) - (set! buffer (string-append buffer (string c)))) - log-string - (lambda () - (force-output output-port)) - #f ; fetch one character - (lambda () - ;; close port - #f) - #f) ; number of characters that can be read - "w"))) - (setvbuf port 'line) - port)) - -(define (setup-port-for-inferior-error-output job-id real-output-port) - (define (insert conn job_id s) - (exec-query - conn - (string-append " -INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) -VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") - (list job_id s))) - - (match (pipe) - ((port-to-read-from . port-to-write-to) - - (setvbuf port-to-read-from 'line) - (setvbuf port-to-write-to 'line) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name "inferior logging")) - (const #t)) - - (with-postgresql-connection - (simple-format #f "~A inferior error logging" job-id) - (lambda (logging-conn) - (let loop ((line (get-line port-to-read-from))) - (let ((line-with-newline - (string-append line "\n"))) - (catch #t - (lambda () - (insert logging-conn job-id line-with-newline) - (display line-with-newline real-output-port)) - (lambda (key . args) - (display - (simple-format - #f - " -error: ~A: ~A -error: could not insert log part: '~A'\n\n" - key args line) - real-output-port) - (catch #t - (lambda () - (insert - logging-conn - job-id - (simple-format - #f - " -guix-data-service: error: missing log line: ~A -\n" key))) - (lambda () - #t))))) - (loop (get-line port-to-read-from))))))) - - port-to-write-to))) - -(define real-error-port - (make-parameter (current-error-port))) - -(define inferior-error-port - (make-parameter (current-error-port))) - -(define* (log-for-job conn job-id - #:key - character-limit - start-character) - (define (sql-html-escape s) - (string-append - "replace(" - (string-append - "replace(" - (string-append - "replace(" - s - ",'&','&')") - ",'<','<')") - ",'>','>')")) - - (define (get-characters s) - (if start-character - (simple-format #f "substr(~A, ~A, ~A)" - s start-character - character-limit) - (simple-format #f "right(~A, ~A)" s character-limit))) - - (define log-query - (string-append - "SELECT " - (sql-html-escape (get-characters "contents")) - " FROM load_new_guix_revision_job_logs" - " WHERE job_id = $1 AND contents IS NOT NULL")) - - (define parts-query - (string-append - "SELECT " - (sql-html-escape - (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)")) - " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1")) - - (match (exec-query conn log-query (list job-id)) - (((contents)) - contents) - (() - (match (exec-query conn parts-query (list job-id)) - (((contents)) - contents))))) - -(define (insert-empty-log-entry conn job-id) - (exec-query - conn - "DELETE FROM load_new_guix_revision_job_logs WHERE job_id = $1" - (list job-id)) - (exec-query - conn - "INSERT INTO load_new_guix_revision_job_logs (job_id, contents) VALUES -($1, NULL)" - (list job-id))) - -(define (count-log-parts conn job-id) - (match (exec-query - conn - " -SELECT COUNT(*) -FROM load_new_guix_revision_job_log_parts -WHERE job_id = $1" - (list job-id)) - (((id)) - (string->number id)))) - -(define (combine-log-parts! conn job-id) - (with-postgresql-transaction - conn - (lambda (conn) - (exec-query - conn - (string-append - " -UPDATE load_new_guix_revision_job_logs SET contents = ( - SELECT STRING_AGG(contents, '' ORDER BY id ASC) - FROM load_new_guix_revision_job_log_parts - WHERE job_id = $1 - GROUP BY job_id -) -WHERE job_id = $1") - (list job-id)) - (exec-query - conn - "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" - (list job-id))))) - -(define (drop-log-parts-sequence conn job-id) - (with-postgresql-transaction - conn - (lambda (conn) - (exec-query conn - "SET LOCAL lock_timeout = '10s'") - (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "error when dropping sequence: ~A" - exn)) - (lambda () - (exec-query conn - (string-append - "DROP SEQUENCE IF EXISTS " - (log-part-sequence-name job-id)))) - #:unwind? #t)))) - -(define (vacuum-log-parts-table conn) - (exec-query - conn - "VACUUM load_new_guix_revision_job_log_parts")) - (define inferior-package-id (@@ (guix inferior) inferior-package-id)) @@ -845,9 +592,12 @@ WHERE job_id = $1") (a-name (inferior-package-name a)) (b-name (inferior-package-name b)) (a-version (inferior-package-version a)) - (b-version (inferior-package-version b))) + (b-version (inferior-package-version b)) + (a-replacement (inferior-package-replacement a)) + (b-replacement (inferior-package-replacement b))) (if (and (string=? a-name b-name) - (string=? a-version b-version)) + (string=? a-version b-version) + (eq? a-replacement b-replacement)) (begin (simple-format (current-error-port) "warning: ignoring duplicate package: ~A (~A)\n" @@ -926,7 +676,17 @@ WHERE job_id = $1") (list->vector deduplicated-packages))) -(define* (all-inferior-packages-data inf packages #:key (process-replacements? #t)) +(define* (all-inferior-packages-data inf packages) + (define inferior-package-id->packages-index-hash-table + (let ((hash-table (make-hash-table))) + (vector-for-each + (lambda (i pkg) + (hash-set! hash-table + (inferior-package-id pkg) + i)) + packages) + hash-table)) + (let* ((package-license-data (with-time-logging "fetching inferior package license metadata" (inferior-packages->license-data inf))) @@ -944,27 +704,21 @@ WHERE job_id = $1") (cdr translated-package-descriptions-and-synopsis)))) packages))) (package-replacement-data - (if process-replacements? - (vector-map - (lambda (_ package) - (let ((replacement (inferior-package-replacement package))) - (if replacement - ;; I'm not sure if replacements can themselves be - ;; replaced, but I do know for sure that there are - ;; infinite chains of replacements (python(2)-urllib3 - ;; in 7c4c781aa40c42d4cd10b8d9482199f3db345e1b for - ;; example). - ;; - ;; This code currently just capures the first level - ;; of replacements - (first - (all-inferior-packages-data - inf - (vector replacement) - #:process-replacements? #f)) - #f))) - packages) - #f))) + (vector-map + (lambda (_ pkg) + (let ((replacement (inferior-package-replacement pkg))) + (if replacement + ;; I'm not sure if replacements can themselves be + ;; replaced, but I do know for sure that there are + ;; infinite chains of replacements (python(2)-urllib3 + ;; in 7c4c781aa40c42d4cd10b8d9482199f3db345e1b for + ;; example). + ;; + ;; So this might be #f in these cases + (hash-ref inferior-package-id->packages-index-hash-table + (inferior-package-id pkg)) + #f))) + packages))) `((names . ,(vector-map (lambda (_ pkg) (inferior-package-name pkg)) packages)) @@ -972,53 +726,57 @@ WHERE job_id = $1") packages)) (license-data . ,package-license-data) (metadata . ,package-metadata) - (replacemnets . ,package-replacement-data)))) + (replacements . ,package-replacement-data)))) (define (insert-packages conn inferior-packages-data) - (let*-values - (((package-license-set-ids) - (inferior-packages->license-set-ids - conn - (inferior-packages->license-id-lists - conn - ;; TODO Don't needlessly convert - (vector->list - (assq-ref inferior-packages-data 'license-data))))) - ((all-package-metadata-ids new-package-metadata-ids) - (with-time-logging "inserting package metadata entries" - (inferior-packages->package-metadata-ids + (let* ((names (assq-ref inferior-packages-data 'names)) + (versions (assq-ref inferior-packages-data 'versions)) + (package-license-set-ids + (inferior-packages->license-set-ids conn - ;; TODO Don't needlessly convert - (vector->list - (assq-ref inferior-packages-data 'metadata)) - package-license-set-ids))) - ((replacement-ids) - (or (and=> (assq-ref inferior-packages-data 'replacements) - (lambda (all-replacement-data) - (with-time-logging "inserting package replacements" - (vector-map - (lambda (_ replacement-data) - (if replacement-data - (first - (insert-packages conn (list replacement-data))) - (cons "integer" NULL))) - all-replacement-data)))) - (make-vector (length package-license-set-ids) - (cons "integer" NULL))))) + (inferior-packages->license-id-lists + conn + ;; TODO Don't needlessly convert + (vector->list + (assq-ref inferior-packages-data 'license-data))))) + (all-package-metadata-ids + new-package-metadata-ids + (with-time-logging "inserting package metadata entries" + (inferior-packages->package-metadata-ids + conn + ;; TODO Don't needlessly convert + (vector->list + (assq-ref inferior-packages-data 'metadata)) + package-license-set-ids))) + (replacement-package-ids + (vector-map + (lambda (_ package-index-or-false) + (if package-index-or-false + (first + (inferior-packages->package-ids + conn + (list (list (vector-ref names package-index-or-false) + (vector-ref versions package-index-or-false) + (list-ref all-package-metadata-ids + package-index-or-false) + (cons "integer" NULL))))) + (cons "integer" NULL))) + (assq-ref inferior-packages-data 'replacements)))) (unless (null? new-package-metadata-ids) - (with-time-logging "fetching package metadata tsvector entries" + (with-time-logging "inserting package metadata tsvector entries" (insert-package-metadata-tsvector-entries conn new-package-metadata-ids))) - (with-time-logging "getting package-ids" - (inferior-packages->package-ids - conn - ;; TODO Do this more efficiently - (zip (vector->list (assq-ref inferior-packages-data 'names)) - (vector->list (assq-ref inferior-packages-data 'versions)) - all-package-metadata-ids - (vector->list replacement-ids)))))) + (with-time-logging "getting package-ids (without replacements)" + (list->vector + (inferior-packages->package-ids + conn + ;; TODO Do this more efficiently + (zip (vector->list names) + (vector->list versions) + all-package-metadata-ids + (vector->list replacement-package-ids))))))) (define (insert-lint-warnings conn package-ids @@ -1082,11 +840,11 @@ WHERE job_id = $1") conn derivations-vector))) - (insert-package-derivations conn - (car system-and-target) - (or (cdr system-and-target) "") - package-ids - derivation-ids))) + (insert-package-derivations conn + (car system-and-target) + (or (cdr system-and-target) "") + package-ids + derivation-ids))) inferior-packages-system-and-target-to-derivations-alist)) (define guix-store-path @@ -1213,8 +971,7 @@ WHERE job_id = $1") "SSL_CERT_DIR=" (nss-certs-store-path store)))) (begin (simple-format #t "debug: using open-inferior\n") - (open-inferior (guix-store-path store) - #:error-port (inferior-error-port)))))) + (open-inferior (guix-store-path store)))))) (define (start-inferior-and-return-derivation-file-names) ;; /etc is only missing if open-inferior/container has been used @@ -1344,8 +1101,7 @@ WHERE job_id = $1") '("/gnu/store")) (begin (simple-format #t "debug: using open-inferior\n") - (open-inferior store-path - #:error-port (inferior-error-port)))))) + (open-inferior store-path))))) (inferior-eval '(use-modules (srfi srfi-1) (srfi srfi-34) (guix grafts) @@ -1400,8 +1156,7 @@ WHERE job_id = $1") (begin (setenv "GUIX_LOCPATH" guix-locpath) (simple-format #t "debug: using open-inferior\n") - (open-inferior store-path - #:error-port (inferior-error-port))))))) + (open-inferior store-path)))))) (setenv "GUIX_LOCPATH" guix-locpath) ; restore GUIX_LOCPATH (when (eq? inf #f) @@ -2040,44 +1795,6 @@ SKIP LOCKED") (prevent-inlining-for-tests with-store-connection) -(define (setup-logging id thunk) - (let* ((previous-output-port (current-output-port)) - (previous-error-port (current-error-port)) - (result - (with-postgresql-connection - (simple-format #f "load-new-guix-revision ~A logging" id) - (lambda (logging-conn) - (insert-empty-log-entry logging-conn id) - (let ((logging-port - (log-port id logging-conn - #:delete-existing-log-parts? #t))) - (set-current-output-port logging-port) - (set-current-error-port logging-port) - (let ((result - (parameterize ((current-build-output-port logging-port) - (real-error-port previous-error-port) - (inferior-error-port - (setup-port-for-inferior-error-output - id previous-error-port))) - (thunk)))) - (set-current-output-port previous-output-port) - (set-current-error-port previous-error-port) - - ;; This can happen with GC, so do it explicitly - (close-port logging-port) - - (combine-log-parts! logging-conn id) - - result)))))) - result)) - -(define (cleanup-logging id conn) - (drop-log-parts-sequence conn id) - (with-time-logging "vacuuming log parts" - (vacuum-log-parts-table conn))) - -(prevent-inlining-for-tests setup-logging) - (define* (process-load-new-guix-revision-job id #:key skip-system-tests?) (with-postgresql-connection (simple-format #f "load-new-guix-revision ~A" id) @@ -2104,28 +1821,24 @@ SKIP LOCKED") (if (eq? (with-time-logging (string-append "processing revision " commit) - (setup-logging - id - (lambda () - (with-exception-handler - (const #f) - (lambda () - (with-throw-handler #t - (lambda () - (with-store-connection - (lambda (store) - (load-new-guix-revision conn - store - git-repository-id - commit - #:skip-system-tests? - skip-system-tests?)))) - (lambda (key . args) - (simple-format (current-error-port) - "error: load-new-guix-revision: ~A ~A\n" - key args) - (backtrace)))) - #:unwind? #t)))) + (with-exception-handler + (const #f) + (lambda () + (with-throw-handler #t + (lambda () + (with-store-connection + (lambda (store) + (load-new-guix-revision conn + store + git-repository-id + commit + #:skip-system-tests? #t)))) + (lambda (key . args) + (simple-format (current-error-port) + "error: load-new-guix-revision: ~A ~A\n" + key args) + (backtrace)))) + #:unwind? #t)) #t) (begin (record-job-succeeded conn id) @@ -2133,10 +1846,6 @@ SKIP LOCKED") (exec-query conn "COMMIT") (with-time-logging - "cleanup logging" - (cleanup-logging id conn)) - - (with-time-logging "vacuuming package derivations by guix revision range table" (vacuum-package-derivations-table conn)) @@ -2170,10 +1879,6 @@ SKIP LOCKED") (exec-query conn "ROLLBACK") (record-job-event conn id "failure") - (with-time-logging - "cleanup logging" - (cleanup-logging id conn)) - #f))) (() (exec-query conn "ROLLBACK") diff --git a/guix-data-service/model/package.scm b/guix-data-service/model/package.scm index 263f46c..7ec2b09 100644 --- a/guix-data-service/model/package.scm +++ b/guix-data-service/model/package.scm @@ -264,12 +264,11 @@ INSERT INTO packages (name, version, package_metadata_id) VALUES " RETURNING id")) (define (inferior-packages->package-ids conn package-entries) - (list->vector - (insert-missing-data-and-return-all-ids - conn - "packages" - '(name version package_metadata_id replacement_package_id) - package-entries))) + (insert-missing-data-and-return-all-ids + conn + "packages" + '(name version package_metadata_id replacement_package_id) + package-entries)) (define (select-package-versions-for-revision conn commit diff --git a/guix-data-service/web/jobs/controller.scm b/guix-data-service/web/jobs/controller.scm index b8b494d..7e5084f 100644 --- a/guix-data-service/web/jobs/controller.scm +++ b/guix-data-service/web/jobs/controller.scm @@ -23,6 +23,7 @@ #:use-module (guix-data-service web controller) #:use-module (guix-data-service web query-parameters) #:use-module (guix-data-service web util) + #:use-module (guix-data-service jobs) #:use-module (guix-data-service jobs load-new-guix-revision) #:use-module (guix-data-service web jobs html) #:export (jobs-controller)) diff --git a/scripts/guix-data-service-process-job.in b/scripts/guix-data-service-process-job.in index c6d06c6..89e31b9 100644 --- a/scripts/guix-data-service-process-job.in +++ b/scripts/guix-data-service-process-job.in @@ -25,6 +25,8 @@ (use-modules (srfi srfi-1) (srfi srfi-37) (ice-9 match) + (ice-9 suspendable-ports) + (fibers) (guix-data-service database) (guix-data-service data-deletion) (guix-data-service model package-derivation-by-guix-revision-range) @@ -38,12 +40,21 @@ ;; Make stack traces more useful (setenv "COLUMNS" "256") +(install-suspendable-ports!) + (define %options (list (option '("skip-system-tests") #f #f (lambda (opt name _ result) - (alist-cons 'skip-system-tests #t result))))) + (alist-cons 'skip-system-tests #t result))) + (option '("parallelism") #t #f + (lambda (opt name arg result) + (alist-cons 'parallelism + (string->number arg) + (alist-delete 'parallelism + result)))))) -(define %default-options '()) +(define %default-options + '((parallelism . 1))) (define (parse-options args) (args-fold @@ -62,6 +73,11 @@ (let ((opts (parse-options (cdr (program-arguments))))) (match (assq-ref opts 'arguments) ((job) - (process-load-new-guix-revision-job - job - #:skip-system-tests? (assq-ref opts 'skip-system-tests))))) + (run-fibers + (lambda () + (process-load-new-guix-revision-job + job + #:skip-system-tests? (assq-ref opts 'skip-system-tests))) + #:hz 0 + #:parallelism (assq-ref opts 'parallelism) + #:drain? #t)))) diff --git a/tests/jobs-load-new-guix-revision.scm b/tests/jobs-load-new-guix-revision.scm index 0eaad3f..8213afb 100644 --- a/tests/jobs-load-new-guix-revision.scm +++ b/tests/jobs-load-new-guix-revision.scm @@ -48,43 +48,37 @@ (mock ((guix-data-service jobs load-new-guix-revision) - setup-logging - (lambda (conn thunk) - (thunk))) + channel-derivations-by-system->guix-store-item + (lambda (store channel-derivations-by-system) + "/gnu/store/test")) (mock ((guix-data-service jobs load-new-guix-revision) - channel-derivations-by-system->guix-store-item - (lambda (store channel-derivations-by-system) - "/gnu/store/test")) + extract-information-from + (lambda* (conn store guix-revision-id commit + guix-source store-path + #:key skip-system-tests?) + #t)) (mock - ((guix-data-service jobs load-new-guix-revision) - extract-information-from - (lambda* (conn store guix-revision-id commit - guix-source store-path - #:key skip-system-tests?) + ((guix-data-service model channel-instance) + insert-channel-instances + (lambda (conn guix-revision-id derivations-by-system) #t)) (mock - ((guix-data-service model channel-instance) - insert-channel-instances - (lambda (conn guix-revision-id derivations-by-system) - #t)) - - (mock - ((guix channels) - channel-news-for-commit - (lambda (channel commit) - '())) - - (match (enqueue-load-new-guix-revision-job - conn - (git-repository-url->git-repository-id conn "test-url") - "test-commit" - "test-source") - ((id) - (process-load-new-guix-revision-job id))))))))))) + ((guix channels) + channel-news-for-commit + (lambda (channel commit) + '())) + + (match (enqueue-load-new-guix-revision-job + conn + (git-repository-url->git-repository-id conn "test-url") + "test-commit" + "test-source") + ((id) + (process-load-new-guix-revision-job id)))))))))) (exec-query conn "TRUNCATE guix_revisions CASCADE") (exec-query conn "TRUNCATE load_new_guix_revision_jobs CASCADE") |