aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-data-service/jobs.scm241
-rw-r--r--guix-data-service/jobs/load-new-guix-revision.scm503
-rw-r--r--guix-data-service/model/package.scm11
-rw-r--r--guix-data-service/web/jobs/controller.scm1
-rw-r--r--scripts/guix-data-service-process-job.in26
-rw-r--r--tests/jobs-load-new-guix-revision.scm52
6 files changed, 365 insertions, 469 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm
index a0f59dc..b151367 100644
--- a/guix-data-service/jobs.scm
+++ b/guix-data-service/jobs.scm
@@ -20,11 +20,93 @@
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
#:use-module (ice-9 atomic)
+ #:use-module (ice-9 textual-ports)
+ #:use-module (squee)
+ #:use-module (guix-data-service utils)
+ #:use-module (guix-data-service database)
#:use-module (guix-data-service jobs load-new-guix-revision)
- #:export (process-jobs
+ #:export (log-for-job
+ count-log-parts
+ combine-log-parts!
+
+ process-jobs
default-max-processes))
+(define (log-part-sequence-name job-id)
+ (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id))
+
+(define (start-thread-for-process-output job-id)
+ (define (insert conn job_id s)
+ (exec-query
+ conn
+ (string-append "
+INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents)
+VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
+ (list job_id s)))
+
+
+ (match (pipe)
+ ((port-to-read-from . port-to-write-to)
+
+ (setvbuf port-to-read-from 'line)
+ (setvbuf port-to-write-to 'line)
+
+ (let ((flags (fcntl port-to-read-from F_GETFL)))
+ (fcntl port-to-read-from F_SETFL (logior O_NONBLOCK flags)))
+ (let ((flags (fcntl port-to-write-to F_GETFL)))
+ (fcntl port-to-write-to F_SETFL (logior O_NONBLOCK flags)))
+
+ (call-with-new-thread
+ (lambda ()
+ (with-postgresql-connection
+ (simple-format #f "~A job logging" job-id)
+ (lambda (logging-conn)
+ (exec-query
+ logging-conn
+ (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A"
+ (log-part-sequence-name job-id)))
+ (exec-query
+ logging-conn
+ "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
+ (list job-id))
+
+ (let loop ((line (get-line port-to-read-from)))
+ (let ((line-with-newline
+ (string-append line "\n")))
+ (catch #t
+ (lambda ()
+ (insert logging-conn job-id line-with-newline)
+ (display line-with-newline))
+ (lambda (key . args)
+ (display
+ (simple-format
+ #f
+ "
+error: ~A: ~A
+error: could not insert log part: '~A'\n\n"
+ key args line))
+ (catch #t
+ (lambda ()
+ (insert
+ logging-conn
+ job-id
+ (simple-format
+ #f
+ "
+guix-data-service: error: missing log line: ~A
+\n" key)))
+ (lambda _
+ #t)))))
+ (loop (get-line port-to-read-from)))))))
+
+ port-to-write-to)))
+
+(define (cleanup-logging conn job-id)
+ (drop-log-parts-sequence conn job-id)
+ (with-time-logging "vacuuming log parts"
+ (vacuum-log-parts-table conn)))
+
(define* (process-jobs conn #:key max-processes
latest-branch-revision-max-processes
skip-system-tests?)
@@ -32,29 +114,137 @@
(fetch-unlocked-jobs conn))
(define (process-job job-id)
- (apply execlp
- "guix-data-service-process-job"
- "guix-data-service-process-job"
- job-id
- (if skip-system-tests?
- '("--skip-system-tests")
- '())))
+ (let ((log-port (start-thread-for-process-output job-id)))
+ (spawn
+ "guix-data-service-process-job"
+ (cons* "guix-data-service-process-job"
+ job-id
+ (if skip-system-tests?
+ '("--skip-system-tests")
+ '()))
+ #:output log-port
+ #:error log-port)))
+
+ (define (post-job job-id)
+ (when (> (count-log-parts conn job-id)
+ 0)
+ (combine-log-parts! conn job-id)
+ (cleanup-logging conn job-id)))
(define (handle-job-failure job-id)
(record-job-event conn job-id "failure")
(display (simple-format #f "recording failure for job ~A\n" job-id)
- (current-error-port))
- (when (> (count-log-parts conn job-id)
- 0)
- (combine-log-parts! conn job-id)))
+ (current-error-port)))
(process-jobs-concurrently fetch-new-jobs
process-job
+ post-job
handle-job-failure
#:max-processes max-processes
#:priority-max-processes
latest-branch-revision-max-processes))
+
+(define* (log-for-job conn job-id
+ #:key
+ character-limit
+ start-character)
+ (define (sql-html-escape s)
+ (string-append
+ "replace("
+ (string-append
+ "replace("
+ (string-append
+ "replace("
+ s
+ ",'&','&')")
+ ",'<','&lt;')")
+ ",'>','&gt;')"))
+
+ (define (get-characters s)
+ (if start-character
+ (simple-format #f "substr(~A, ~A, ~A)"
+ s start-character
+ character-limit)
+ (simple-format #f "right(~A, ~A)" s character-limit)))
+
+ (define log-query
+ (string-append
+ "SELECT "
+ (sql-html-escape (get-characters "contents"))
+ " FROM load_new_guix_revision_job_logs"
+ " WHERE job_id = $1 AND contents IS NOT NULL"))
+
+ (define parts-query
+ (string-append
+ "SELECT "
+ (sql-html-escape
+ (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)"))
+ " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"))
+
+ (match (exec-query conn log-query (list job-id))
+ (((contents))
+ contents)
+ (()
+ (match (exec-query conn parts-query (list job-id))
+ (((contents))
+ contents)))))
+
+(define (count-log-parts conn job-id)
+ (match (exec-query
+ conn
+ "
+SELECT COUNT(*)
+FROM load_new_guix_revision_job_log_parts
+WHERE job_id = $1"
+ (list job-id))
+ (((id))
+ (string->number id))))
+
+(define (combine-log-parts! conn job-id)
+ (with-postgresql-transaction
+ conn
+ (lambda (conn)
+ (exec-query
+ conn
+ (string-append
+ "
+UPDATE load_new_guix_revision_job_logs SET contents = (
+ SELECT STRING_AGG(contents, '' ORDER BY id ASC)
+ FROM load_new_guix_revision_job_log_parts
+ WHERE job_id = $1
+ GROUP BY job_id
+)
+WHERE job_id = $1")
+ (list job-id))
+ (exec-query
+ conn
+ "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
+ (list job-id)))))
+
+(define (drop-log-parts-sequence conn job-id)
+ (with-postgresql-transaction
+ conn
+ (lambda (conn)
+ (exec-query conn
+ "SET LOCAL lock_timeout = '10s'")
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "error when dropping sequence: ~A"
+ exn))
+ (lambda ()
+ (exec-query conn
+ (string-append
+ "DROP SEQUENCE IF EXISTS "
+ (log-part-sequence-name job-id))))
+ #:unwind? #t))))
+
+(define (vacuum-log-parts-table conn)
+ (exec-query
+ conn
+ "VACUUM load_new_guix_revision_job_log_parts"))
+
(define default-max-processes
(max (round (/ (current-processor-count)
4))
@@ -67,6 +257,7 @@
(define* (process-jobs-concurrently
fetch-new-jobs
process-job
+ post-job
handle-job-failure
#:key
(max-processes default-max-processes)
@@ -108,9 +299,10 @@
;; No process to wait for
#f)
((pid . status)
- (unless (eq? status 0)
- (match (hash-ref processes pid)
- ((_ (id))
+ (match (hash-ref processes pid)
+ ((_ (id))
+ (post-job id)
+ (unless (eq? status 0)
(simple-format (current-error-port)
"pid ~A (job: ~A) failed with status ~A\n"
pid id status)
@@ -161,20 +353,6 @@
(kill pid SIGTERM)))
processes))
- (define (fork-and-process-job job-args)
- (match (primitive-fork)
- (0
- (dynamic-wind
- (const #t)
- (lambda ()
- (apply process-job job-args))
- (lambda ()
- (primitive-exit 127))))
- (pid
- (hashv-set! processes pid
- (list (current-time) job-args))
- #t)))
-
(define exit?
(make-atomic-box #f))
@@ -206,6 +384,9 @@
(if priority?
priority-max-processes
max-processes))
- (fork-and-process-job (list job-id))))))
+ (let ((pid (process-job job-id)))
+ (peek "PID" pid)
+ (hashv-set! processes pid
+ (list (current-time) (list job-id))))))))
jobs)))
(sleep 15)))
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm
index 796bfc5..705ec41 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -19,13 +19,16 @@
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-43)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
+ #:use-module (ice-9 custom-ports)
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 hash-table)
#:use-module (rnrs exceptions)
#:use-module (json)
#:use-module (squee)
+ #:use-module (fibers)
#:use-module (guix monads)
#:use-module (guix store)
#:use-module (guix channels)
@@ -61,10 +64,7 @@
#:use-module (guix-data-service model package-metadata)
#:use-module (guix-data-service model derivation)
#:use-module (guix-data-service model system-test)
- #:export (log-for-job
- count-log-parts
- combine-log-parts!
- fetch-unlocked-jobs
+ #:export (fetch-unlocked-jobs
process-load-new-guix-revision-job
select-load-new-guix-revision-job-metrics
select-job-for-commit
@@ -77,259 +77,6 @@
enqueue-load-new-guix-revision-job
most-recent-n-load-new-guix-revision-jobs))
-(define (log-part-sequence-name job-id)
- (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id))
-
-(define* (log-port job-id conn
- #:key
- delete-existing-log-parts?
- real-output-port)
- (define output-port
- (or real-output-port
- (current-output-port)))
-
- (define buffer "")
-
- (define (insert job_id s)
- (exec-query
- conn
- (string-append
- "
-INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents)
-VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
- (list job_id s)))
-
- (define (log-string s)
- (if (string-contains s "\n")
- (let ((output (string-append buffer s)))
- (set! buffer "") ; clear the buffer
- (catch #t
- (lambda ()
- (insert job-id output)
- (display output output-port))
- (lambda (key . args)
- (display
- (simple-format
- #f
- "
-error: ~A: ~A
-error: could not insert log part: '~A'\n\n"
- key args output)
- output-port)
- (catch #t
- (lambda ()
- (insert
- job-id
- (simple-format
- #f
- "
-guix-data-service: error: missing log line: ~A
-\n" key)))
- (lambda ()
- #t)))))
- (set! buffer (string-append buffer s))))
-
- (exec-query
- conn
- (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A"
- (log-part-sequence-name job-id)))
- (when delete-existing-log-parts?
- ;; TODO, this is useful when re-running jobs, but I'm not sure that should
- ;; be a thing, jobs should probably be only attempted once.
- (exec-query
- conn
- "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
- (list job-id)))
-
- (let ((port
- (make-soft-port
- (vector (lambda (c)
- (set! buffer (string-append buffer (string c))))
- log-string
- (lambda ()
- (force-output output-port))
- #f ; fetch one character
- (lambda ()
- ;; close port
- #f)
- #f) ; number of characters that can be read
- "w")))
- (setvbuf port 'line)
- port))
-
-(define (setup-port-for-inferior-error-output job-id real-output-port)
- (define (insert conn job_id s)
- (exec-query
- conn
- (string-append "
-INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents)
-VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
- (list job_id s)))
-
- (match (pipe)
- ((port-to-read-from . port-to-write-to)
-
- (setvbuf port-to-read-from 'line)
- (setvbuf port-to-write-to 'line)
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name "inferior logging"))
- (const #t))
-
- (with-postgresql-connection
- (simple-format #f "~A inferior error logging" job-id)
- (lambda (logging-conn)
- (let loop ((line (get-line port-to-read-from)))
- (let ((line-with-newline
- (string-append line "\n")))
- (catch #t
- (lambda ()
- (insert logging-conn job-id line-with-newline)
- (display line-with-newline real-output-port))
- (lambda (key . args)
- (display
- (simple-format
- #f
- "
-error: ~A: ~A
-error: could not insert log part: '~A'\n\n"
- key args line)
- real-output-port)
- (catch #t
- (lambda ()
- (insert
- logging-conn
- job-id
- (simple-format
- #f
- "
-guix-data-service: error: missing log line: ~A
-\n" key)))
- (lambda ()
- #t)))))
- (loop (get-line port-to-read-from)))))))
-
- port-to-write-to)))
-
-(define real-error-port
- (make-parameter (current-error-port)))
-
-(define inferior-error-port
- (make-parameter (current-error-port)))
-
-(define* (log-for-job conn job-id
- #:key
- character-limit
- start-character)
- (define (sql-html-escape s)
- (string-append
- "replace("
- (string-append
- "replace("
- (string-append
- "replace("
- s
- ",'&','&amp;')")
- ",'<','&lt;')")
- ",'>','&gt;')"))
-
- (define (get-characters s)
- (if start-character
- (simple-format #f "substr(~A, ~A, ~A)"
- s start-character
- character-limit)
- (simple-format #f "right(~A, ~A)" s character-limit)))
-
- (define log-query
- (string-append
- "SELECT "
- (sql-html-escape (get-characters "contents"))
- " FROM load_new_guix_revision_job_logs"
- " WHERE job_id = $1 AND contents IS NOT NULL"))
-
- (define parts-query
- (string-append
- "SELECT "
- (sql-html-escape
- (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)"))
- " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"))
-
- (match (exec-query conn log-query (list job-id))
- (((contents))
- contents)
- (()
- (match (exec-query conn parts-query (list job-id))
- (((contents))
- contents)))))
-
-(define (insert-empty-log-entry conn job-id)
- (exec-query
- conn
- "DELETE FROM load_new_guix_revision_job_logs WHERE job_id = $1"
- (list job-id))
- (exec-query
- conn
- "INSERT INTO load_new_guix_revision_job_logs (job_id, contents) VALUES
-($1, NULL)"
- (list job-id)))
-
-(define (count-log-parts conn job-id)
- (match (exec-query
- conn
- "
-SELECT COUNT(*)
-FROM load_new_guix_revision_job_log_parts
-WHERE job_id = $1"
- (list job-id))
- (((id))
- (string->number id))))
-
-(define (combine-log-parts! conn job-id)
- (with-postgresql-transaction
- conn
- (lambda (conn)
- (exec-query
- conn
- (string-append
- "
-UPDATE load_new_guix_revision_job_logs SET contents = (
- SELECT STRING_AGG(contents, '' ORDER BY id ASC)
- FROM load_new_guix_revision_job_log_parts
- WHERE job_id = $1
- GROUP BY job_id
-)
-WHERE job_id = $1")
- (list job-id))
- (exec-query
- conn
- "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
- (list job-id)))))
-
-(define (drop-log-parts-sequence conn job-id)
- (with-postgresql-transaction
- conn
- (lambda (conn)
- (exec-query conn
- "SET LOCAL lock_timeout = '10s'")
- (with-exception-handler
- (lambda (exn)
- (simple-format (current-error-port)
- "error when dropping sequence: ~A"
- exn))
- (lambda ()
- (exec-query conn
- (string-append
- "DROP SEQUENCE IF EXISTS "
- (log-part-sequence-name job-id))))
- #:unwind? #t))))
-
-(define (vacuum-log-parts-table conn)
- (exec-query
- conn
- "VACUUM load_new_guix_revision_job_log_parts"))
-
(define inferior-package-id
(@@ (guix inferior) inferior-package-id))
@@ -845,9 +592,12 @@ WHERE job_id = $1")
(a-name (inferior-package-name a))
(b-name (inferior-package-name b))
(a-version (inferior-package-version a))
- (b-version (inferior-package-version b)))
+ (b-version (inferior-package-version b))
+ (a-replacement (inferior-package-replacement a))
+ (b-replacement (inferior-package-replacement b)))
(if (and (string=? a-name b-name)
- (string=? a-version b-version))
+ (string=? a-version b-version)
+ (eq? a-replacement b-replacement))
(begin
(simple-format (current-error-port)
"warning: ignoring duplicate package: ~A (~A)\n"
@@ -926,7 +676,17 @@ WHERE job_id = $1")
(list->vector deduplicated-packages)))
-(define* (all-inferior-packages-data inf packages #:key (process-replacements? #t))
+(define* (all-inferior-packages-data inf packages)
+ (define inferior-package-id->packages-index-hash-table
+ (let ((hash-table (make-hash-table)))
+ (vector-for-each
+ (lambda (i pkg)
+ (hash-set! hash-table
+ (inferior-package-id pkg)
+ i))
+ packages)
+ hash-table))
+
(let* ((package-license-data
(with-time-logging "fetching inferior package license metadata"
(inferior-packages->license-data inf)))
@@ -944,27 +704,21 @@ WHERE job_id = $1")
(cdr translated-package-descriptions-and-synopsis))))
packages)))
(package-replacement-data
- (if process-replacements?
- (vector-map
- (lambda (_ package)
- (let ((replacement (inferior-package-replacement package)))
- (if replacement
- ;; I'm not sure if replacements can themselves be
- ;; replaced, but I do know for sure that there are
- ;; infinite chains of replacements (python(2)-urllib3
- ;; in 7c4c781aa40c42d4cd10b8d9482199f3db345e1b for
- ;; example).
- ;;
- ;; This code currently just capures the first level
- ;; of replacements
- (first
- (all-inferior-packages-data
- inf
- (vector replacement)
- #:process-replacements? #f))
- #f)))
- packages)
- #f)))
+ (vector-map
+ (lambda (_ pkg)
+ (let ((replacement (inferior-package-replacement pkg)))
+ (if replacement
+ ;; I'm not sure if replacements can themselves be
+ ;; replaced, but I do know for sure that there are
+ ;; infinite chains of replacements (python(2)-urllib3
+ ;; in 7c4c781aa40c42d4cd10b8d9482199f3db345e1b for
+ ;; example).
+ ;;
+ ;; So this might be #f in these cases
+ (hash-ref inferior-package-id->packages-index-hash-table
+ (inferior-package-id pkg))
+ #f)))
+ packages)))
`((names . ,(vector-map (lambda (_ pkg) (inferior-package-name pkg))
packages))
@@ -972,53 +726,57 @@ WHERE job_id = $1")
packages))
(license-data . ,package-license-data)
(metadata . ,package-metadata)
- (replacemnets . ,package-replacement-data))))
+ (replacements . ,package-replacement-data))))
(define (insert-packages conn inferior-packages-data)
- (let*-values
- (((package-license-set-ids)
- (inferior-packages->license-set-ids
- conn
- (inferior-packages->license-id-lists
- conn
- ;; TODO Don't needlessly convert
- (vector->list
- (assq-ref inferior-packages-data 'license-data)))))
- ((all-package-metadata-ids new-package-metadata-ids)
- (with-time-logging "inserting package metadata entries"
- (inferior-packages->package-metadata-ids
+ (let* ((names (assq-ref inferior-packages-data 'names))
+ (versions (assq-ref inferior-packages-data 'versions))
+ (package-license-set-ids
+ (inferior-packages->license-set-ids
conn
- ;; TODO Don't needlessly convert
- (vector->list
- (assq-ref inferior-packages-data 'metadata))
- package-license-set-ids)))
- ((replacement-ids)
- (or (and=> (assq-ref inferior-packages-data 'replacements)
- (lambda (all-replacement-data)
- (with-time-logging "inserting package replacements"
- (vector-map
- (lambda (_ replacement-data)
- (if replacement-data
- (first
- (insert-packages conn (list replacement-data)))
- (cons "integer" NULL)))
- all-replacement-data))))
- (make-vector (length package-license-set-ids)
- (cons "integer" NULL)))))
+ (inferior-packages->license-id-lists
+ conn
+ ;; TODO Don't needlessly convert
+ (vector->list
+ (assq-ref inferior-packages-data 'license-data)))))
+ (all-package-metadata-ids
+ new-package-metadata-ids
+ (with-time-logging "inserting package metadata entries"
+ (inferior-packages->package-metadata-ids
+ conn
+ ;; TODO Don't needlessly convert
+ (vector->list
+ (assq-ref inferior-packages-data 'metadata))
+ package-license-set-ids)))
+ (replacement-package-ids
+ (vector-map
+ (lambda (_ package-index-or-false)
+ (if package-index-or-false
+ (first
+ (inferior-packages->package-ids
+ conn
+ (list (list (vector-ref names package-index-or-false)
+ (vector-ref versions package-index-or-false)
+ (list-ref all-package-metadata-ids
+ package-index-or-false)
+ (cons "integer" NULL)))))
+ (cons "integer" NULL)))
+ (assq-ref inferior-packages-data 'replacements))))
(unless (null? new-package-metadata-ids)
- (with-time-logging "fetching package metadata tsvector entries"
+ (with-time-logging "inserting package metadata tsvector entries"
(insert-package-metadata-tsvector-entries
conn new-package-metadata-ids)))
- (with-time-logging "getting package-ids"
- (inferior-packages->package-ids
- conn
- ;; TODO Do this more efficiently
- (zip (vector->list (assq-ref inferior-packages-data 'names))
- (vector->list (assq-ref inferior-packages-data 'versions))
- all-package-metadata-ids
- (vector->list replacement-ids))))))
+ (with-time-logging "getting package-ids (without replacements)"
+ (list->vector
+ (inferior-packages->package-ids
+ conn
+ ;; TODO Do this more efficiently
+ (zip (vector->list names)
+ (vector->list versions)
+ all-package-metadata-ids
+ (vector->list replacement-package-ids)))))))
(define (insert-lint-warnings conn
package-ids
@@ -1082,11 +840,11 @@ WHERE job_id = $1")
conn
derivations-vector)))
- (insert-package-derivations conn
- (car system-and-target)
- (or (cdr system-and-target) "")
- package-ids
- derivation-ids)))
+ (insert-package-derivations conn
+ (car system-and-target)
+ (or (cdr system-and-target) "")
+ package-ids
+ derivation-ids)))
inferior-packages-system-and-target-to-derivations-alist))
(define guix-store-path
@@ -1213,8 +971,7 @@ WHERE job_id = $1")
"SSL_CERT_DIR=" (nss-certs-store-path store))))
(begin
(simple-format #t "debug: using open-inferior\n")
- (open-inferior (guix-store-path store)
- #:error-port (inferior-error-port))))))
+ (open-inferior (guix-store-path store))))))
(define (start-inferior-and-return-derivation-file-names)
;; /etc is only missing if open-inferior/container has been used
@@ -1344,8 +1101,7 @@ WHERE job_id = $1")
'("/gnu/store"))
(begin
(simple-format #t "debug: using open-inferior\n")
- (open-inferior store-path
- #:error-port (inferior-error-port))))))
+ (open-inferior store-path)))))
(inferior-eval '(use-modules (srfi srfi-1)
(srfi srfi-34)
(guix grafts)
@@ -1400,8 +1156,7 @@ WHERE job_id = $1")
(begin
(setenv "GUIX_LOCPATH" guix-locpath)
(simple-format #t "debug: using open-inferior\n")
- (open-inferior store-path
- #:error-port (inferior-error-port)))))))
+ (open-inferior store-path))))))
(setenv "GUIX_LOCPATH" guix-locpath) ; restore GUIX_LOCPATH
(when (eq? inf #f)
@@ -2040,44 +1795,6 @@ SKIP LOCKED")
(prevent-inlining-for-tests with-store-connection)
-(define (setup-logging id thunk)
- (let* ((previous-output-port (current-output-port))
- (previous-error-port (current-error-port))
- (result
- (with-postgresql-connection
- (simple-format #f "load-new-guix-revision ~A logging" id)
- (lambda (logging-conn)
- (insert-empty-log-entry logging-conn id)
- (let ((logging-port
- (log-port id logging-conn
- #:delete-existing-log-parts? #t)))
- (set-current-output-port logging-port)
- (set-current-error-port logging-port)
- (let ((result
- (parameterize ((current-build-output-port logging-port)
- (real-error-port previous-error-port)
- (inferior-error-port
- (setup-port-for-inferior-error-output
- id previous-error-port)))
- (thunk))))
- (set-current-output-port previous-output-port)
- (set-current-error-port previous-error-port)
-
- ;; This can happen with GC, so do it explicitly
- (close-port logging-port)
-
- (combine-log-parts! logging-conn id)
-
- result))))))
- result))
-
-(define (cleanup-logging id conn)
- (drop-log-parts-sequence conn id)
- (with-time-logging "vacuuming log parts"
- (vacuum-log-parts-table conn)))
-
-(prevent-inlining-for-tests setup-logging)
-
(define* (process-load-new-guix-revision-job id #:key skip-system-tests?)
(with-postgresql-connection
(simple-format #f "load-new-guix-revision ~A" id)
@@ -2104,28 +1821,24 @@ SKIP LOCKED")
(if (eq?
(with-time-logging (string-append "processing revision " commit)
- (setup-logging
- id
- (lambda ()
- (with-exception-handler
- (const #f)
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (with-store-connection
- (lambda (store)
- (load-new-guix-revision conn
- store
- git-repository-id
- commit
- #:skip-system-tests?
- skip-system-tests?))))
- (lambda (key . args)
- (simple-format (current-error-port)
- "error: load-new-guix-revision: ~A ~A\n"
- key args)
- (backtrace))))
- #:unwind? #t))))
+ (with-exception-handler
+ (const #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (with-store-connection
+ (lambda (store)
+ (load-new-guix-revision conn
+ store
+ git-repository-id
+ commit
+ #:skip-system-tests? #t))))
+ (lambda (key . args)
+ (simple-format (current-error-port)
+ "error: load-new-guix-revision: ~A ~A\n"
+ key args)
+ (backtrace))))
+ #:unwind? #t))
#t)
(begin
(record-job-succeeded conn id)
@@ -2133,10 +1846,6 @@ SKIP LOCKED")
(exec-query conn "COMMIT")
(with-time-logging
- "cleanup logging"
- (cleanup-logging id conn))
-
- (with-time-logging
"vacuuming package derivations by guix revision range table"
(vacuum-package-derivations-table conn))
@@ -2170,10 +1879,6 @@ SKIP LOCKED")
(exec-query conn "ROLLBACK")
(record-job-event conn id "failure")
- (with-time-logging
- "cleanup logging"
- (cleanup-logging id conn))
-
#f)))
(()
(exec-query conn "ROLLBACK")
diff --git a/guix-data-service/model/package.scm b/guix-data-service/model/package.scm
index 263f46c..7ec2b09 100644
--- a/guix-data-service/model/package.scm
+++ b/guix-data-service/model/package.scm
@@ -264,12 +264,11 @@ INSERT INTO packages (name, version, package_metadata_id) VALUES "
RETURNING id"))
(define (inferior-packages->package-ids conn package-entries)
- (list->vector
- (insert-missing-data-and-return-all-ids
- conn
- "packages"
- '(name version package_metadata_id replacement_package_id)
- package-entries)))
+ (insert-missing-data-and-return-all-ids
+ conn
+ "packages"
+ '(name version package_metadata_id replacement_package_id)
+ package-entries))
(define (select-package-versions-for-revision conn
commit
diff --git a/guix-data-service/web/jobs/controller.scm b/guix-data-service/web/jobs/controller.scm
index b8b494d..7e5084f 100644
--- a/guix-data-service/web/jobs/controller.scm
+++ b/guix-data-service/web/jobs/controller.scm
@@ -23,6 +23,7 @@
#:use-module (guix-data-service web controller)
#:use-module (guix-data-service web query-parameters)
#:use-module (guix-data-service web util)
+ #:use-module (guix-data-service jobs)
#:use-module (guix-data-service jobs load-new-guix-revision)
#:use-module (guix-data-service web jobs html)
#:export (jobs-controller))
diff --git a/scripts/guix-data-service-process-job.in b/scripts/guix-data-service-process-job.in
index c6d06c6..89e31b9 100644
--- a/scripts/guix-data-service-process-job.in
+++ b/scripts/guix-data-service-process-job.in
@@ -25,6 +25,8 @@
(use-modules (srfi srfi-1)
(srfi srfi-37)
(ice-9 match)
+ (ice-9 suspendable-ports)
+ (fibers)
(guix-data-service database)
(guix-data-service data-deletion)
(guix-data-service model package-derivation-by-guix-revision-range)
@@ -38,12 +40,21 @@
;; Make stack traces more useful
(setenv "COLUMNS" "256")
+(install-suspendable-ports!)
+
(define %options
(list (option '("skip-system-tests") #f #f
(lambda (opt name _ result)
- (alist-cons 'skip-system-tests #t result)))))
+ (alist-cons 'skip-system-tests #t result)))
+ (option '("parallelism") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'parallelism
+ (string->number arg)
+ (alist-delete 'parallelism
+ result))))))
-(define %default-options '())
+(define %default-options
+ '((parallelism . 1)))
(define (parse-options args)
(args-fold
@@ -62,6 +73,11 @@
(let ((opts (parse-options (cdr (program-arguments)))))
(match (assq-ref opts 'arguments)
((job)
- (process-load-new-guix-revision-job
- job
- #:skip-system-tests? (assq-ref opts 'skip-system-tests)))))
+ (run-fibers
+ (lambda ()
+ (process-load-new-guix-revision-job
+ job
+ #:skip-system-tests? (assq-ref opts 'skip-system-tests)))
+ #:hz 0
+ #:parallelism (assq-ref opts 'parallelism)
+ #:drain? #t))))
diff --git a/tests/jobs-load-new-guix-revision.scm b/tests/jobs-load-new-guix-revision.scm
index 0eaad3f..8213afb 100644
--- a/tests/jobs-load-new-guix-revision.scm
+++ b/tests/jobs-load-new-guix-revision.scm
@@ -48,43 +48,37 @@
(mock
((guix-data-service jobs load-new-guix-revision)
- setup-logging
- (lambda (conn thunk)
- (thunk)))
+ channel-derivations-by-system->guix-store-item
+ (lambda (store channel-derivations-by-system)
+ "/gnu/store/test"))
(mock
((guix-data-service jobs load-new-guix-revision)
- channel-derivations-by-system->guix-store-item
- (lambda (store channel-derivations-by-system)
- "/gnu/store/test"))
+ extract-information-from
+ (lambda* (conn store guix-revision-id commit
+ guix-source store-path
+ #:key skip-system-tests?)
+ #t))
(mock
- ((guix-data-service jobs load-new-guix-revision)
- extract-information-from
- (lambda* (conn store guix-revision-id commit
- guix-source store-path
- #:key skip-system-tests?)
+ ((guix-data-service model channel-instance)
+ insert-channel-instances
+ (lambda (conn guix-revision-id derivations-by-system)
#t))
(mock
- ((guix-data-service model channel-instance)
- insert-channel-instances
- (lambda (conn guix-revision-id derivations-by-system)
- #t))
-
- (mock
- ((guix channels)
- channel-news-for-commit
- (lambda (channel commit)
- '()))
-
- (match (enqueue-load-new-guix-revision-job
- conn
- (git-repository-url->git-repository-id conn "test-url")
- "test-commit"
- "test-source")
- ((id)
- (process-load-new-guix-revision-job id)))))))))))
+ ((guix channels)
+ channel-news-for-commit
+ (lambda (channel commit)
+ '()))
+
+ (match (enqueue-load-new-guix-revision-job
+ conn
+ (git-repository-url->git-repository-id conn "test-url")
+ "test-commit"
+ "test-source")
+ ((id)
+ (process-load-new-guix-revision-job id))))))))))
(exec-query conn "TRUNCATE guix_revisions CASCADE")
(exec-query conn "TRUNCATE load_new_guix_revision_jobs CASCADE")