diff options
author | Christopher Baines <mail@cbaines.net> | 2020-02-15 17:42:07 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2020-02-15 17:42:07 +0000 |
commit | 36254f98e366657189b435af0bf130c142ceb73a (patch) | |
tree | f9a8a36fe96d295cec4376c4c04267e05b906e71 | |
parent | 40f6de27f6168c160f65b1334fd3cf7d586949be (diff) | |
download | data-service-36254f98e366657189b435af0bf130c142ceb73a.tar data-service-36254f98e366657189b435af0bf130c142ceb73a.tar.gz |
Improve the job logging
Switch to using a sequence for the ids in the log parts table, and spawn a
thread to listen for output from the inferior processes, and enter it in to
the database.
-rw-r--r-- | guix-data-service/jobs/load-new-guix-revision.scm | 61 |
1 files changed, 54 insertions, 7 deletions
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index 9fef9c5..efeb788 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -18,6 +18,7 @@ (define-module (guix-data-service jobs load-new-guix-revision) #:use-module (srfi srfi-1) #:use-module (ice-9 match) + #:use-module (ice-9 textual-ports) #:use-module (ice-9 hash-table) #:use-module (rnrs exceptions) #:use-module (json) @@ -65,11 +66,17 @@ enqueue-load-new-guix-revision-job most-recent-n-load-new-guix-revision-jobs)) -(define* (log-port job-id conn #:key delete-existing-log-parts?) +(define (log-part-sequence-name job-id) + (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id)) + +(define* (log-port job-id conn + #:key + delete-existing-log-parts? + real-output-port) (define output-port - (current-output-port)) + (or real-output-port + (current-output-port))) - (define id 0) (define buffer "") (define (insert job_id s) @@ -77,18 +84,21 @@ conn (string-append "INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) " - "VALUES ($1, $2, $3)") - (list (number->string id) job_id s))) + "VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") + (list job_id s))) (define (log-string s) (if (string-contains s "\n") (let ((output (string-append buffer s))) - (set! id (+ 1 id)) ; increment id (set! buffer "") ; clear the buffer (insert job-id output) (display output output-port)) (set! buffer (string-append buffer s)))) + (exec-query + conn + (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A" + (log-part-sequence-name job-id))) (when delete-existing-log-parts? ;; TODO, this is useful when re-running jobs, but I'm not sure that should ;; be a thing, jobs should probably be only attempted once. @@ -113,6 +123,34 @@ (setvbuf port 'line) port)) +(define (setup-port-for-inferior-error-output job-id real-output-port) + (define (insert conn job_id s) + (exec-query + conn + (string-append " +INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) +VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") + (list job_id s))) + + (match (pipe) + ((port-to-read-from . port-to-write-to) + + (setvbuf port-to-read-from 'line) + (setvbuf port-to-write-to 'line) + (call-with-new-thread + (lambda () + (with-postgresql-connection + (simple-format #f "~A inferior error logging" job-id) + (lambda (logging-conn) + (let loop ((line (get-line port-to-read-from))) + (let ((line-with-newline + (string-append line "\n"))) + (insert logging-conn job-id line-with-newline) + (display line-with-newline real-output-port)) + (loop (get-line port-to-read-from)))))))3 + + port-to-write-to))) + (define real-error-port (make-parameter (current-error-port))) @@ -206,6 +244,13 @@ WHERE job_id = $1" "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" (list job-id))))) +(define (drop-log-parts-sequence conn job-id) + (exec-query + conn + (string-append + "DROP SEQUENCE " + (log-part-sequence-name job-id)))) + (define inferior-package-id (@@ (guix inferior) inferior-package-id)) @@ -1554,7 +1599,8 @@ SKIP LOCKED") (let ((result (parameterize ((current-build-output-port logging-port) (real-error-port previous-error-port) - (inferior-error-port previous-error-port)) + (inferior-error-port + (setup-port-for-inferior-error-output id previous-error-port))) (catch #t (lambda () (with-store-connection @@ -1570,6 +1616,7 @@ SKIP LOCKED") key args) #f))))) (combine-log-parts! logging-conn id) + (drop-log-parts-sequence logging-conn id) ;; This can happen with GC, so do it explicitly (close-port logging-port) |