aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2020-02-15 17:42:07 +0000
committerChristopher Baines <mail@cbaines.net>2020-02-15 17:42:07 +0000
commit36254f98e366657189b435af0bf130c142ceb73a (patch)
treef9a8a36fe96d295cec4376c4c04267e05b906e71
parent40f6de27f6168c160f65b1334fd3cf7d586949be (diff)
downloaddata-service-36254f98e366657189b435af0bf130c142ceb73a.tar
data-service-36254f98e366657189b435af0bf130c142ceb73a.tar.gz
Improve the job logging
Switch to using a sequence for the ids in the log parts table, and spawn a thread to listen for output from the inferior processes, and enter it in to the database.
-rw-r--r--guix-data-service/jobs/load-new-guix-revision.scm61
1 files changed, 54 insertions, 7 deletions
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm
index 9fef9c5..efeb788 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -18,6 +18,7 @@
(define-module (guix-data-service jobs load-new-guix-revision)
#:use-module (srfi srfi-1)
#:use-module (ice-9 match)
+ #:use-module (ice-9 textual-ports)
#:use-module (ice-9 hash-table)
#:use-module (rnrs exceptions)
#:use-module (json)
@@ -65,11 +66,17 @@
enqueue-load-new-guix-revision-job
most-recent-n-load-new-guix-revision-jobs))
-(define* (log-port job-id conn #:key delete-existing-log-parts?)
+(define (log-part-sequence-name job-id)
+ (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id))
+
+(define* (log-port job-id conn
+ #:key
+ delete-existing-log-parts?
+ real-output-port)
(define output-port
- (current-output-port))
+ (or real-output-port
+ (current-output-port)))
- (define id 0)
(define buffer "")
(define (insert job_id s)
@@ -77,18 +84,21 @@
conn
(string-append
"INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) "
- "VALUES ($1, $2, $3)")
- (list (number->string id) job_id s)))
+ "VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
+ (list job_id s)))
(define (log-string s)
(if (string-contains s "\n")
(let ((output (string-append buffer s)))
- (set! id (+ 1 id)) ; increment id
(set! buffer "") ; clear the buffer
(insert job-id output)
(display output output-port))
(set! buffer (string-append buffer s))))
+ (exec-query
+ conn
+ (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A"
+ (log-part-sequence-name job-id)))
(when delete-existing-log-parts?
;; TODO, this is useful when re-running jobs, but I'm not sure that should
;; be a thing, jobs should probably be only attempted once.
@@ -113,6 +123,34 @@
(setvbuf port 'line)
port))
+(define (setup-port-for-inferior-error-output job-id real-output-port)
+ (define (insert conn job_id s)
+ (exec-query
+ conn
+ (string-append "
+INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents)
+VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
+ (list job_id s)))
+
+ (match (pipe)
+ ((port-to-read-from . port-to-write-to)
+
+ (setvbuf port-to-read-from 'line)
+ (setvbuf port-to-write-to 'line)
+ (call-with-new-thread
+ (lambda ()
+ (with-postgresql-connection
+ (simple-format #f "~A inferior error logging" job-id)
+ (lambda (logging-conn)
+ (let loop ((line (get-line port-to-read-from)))
+ (let ((line-with-newline
+ (string-append line "\n")))
+ (insert logging-conn job-id line-with-newline)
+ (display line-with-newline real-output-port))
+ (loop (get-line port-to-read-from)))))))3
+
+ port-to-write-to)))
+
(define real-error-port
(make-parameter (current-error-port)))
@@ -206,6 +244,13 @@ WHERE job_id = $1"
"DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
(list job-id)))))
+(define (drop-log-parts-sequence conn job-id)
+ (exec-query
+ conn
+ (string-append
+ "DROP SEQUENCE "
+ (log-part-sequence-name job-id))))
+
(define inferior-package-id
(@@ (guix inferior) inferior-package-id))
@@ -1554,7 +1599,8 @@ SKIP LOCKED")
(let ((result
(parameterize ((current-build-output-port logging-port)
(real-error-port previous-error-port)
- (inferior-error-port previous-error-port))
+ (inferior-error-port
+ (setup-port-for-inferior-error-output id previous-error-port)))
(catch #t
(lambda ()
(with-store-connection
@@ -1570,6 +1616,7 @@ SKIP LOCKED")
key args)
#f)))))
(combine-log-parts! logging-conn id)
+ (drop-log-parts-sequence logging-conn id)
;; This can happen with GC, so do it explicitly
(close-port logging-port)