From 36254f98e366657189b435af0bf130c142ceb73a Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sat, 15 Feb 2020 17:42:07 +0000 Subject: Improve the job logging Switch to using a sequence for the ids in the log parts table, and spawn a thread to listen for output from the inferior processes, and enter it in to the database. --- guix-data-service/jobs/load-new-guix-revision.scm | 61 ++++++++++++++++++++--- 1 file changed, 54 insertions(+), 7 deletions(-) diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index 9fef9c5..efeb788 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -18,6 +18,7 @@ (define-module (guix-data-service jobs load-new-guix-revision) #:use-module (srfi srfi-1) #:use-module (ice-9 match) + #:use-module (ice-9 textual-ports) #:use-module (ice-9 hash-table) #:use-module (rnrs exceptions) #:use-module (json) @@ -65,11 +66,17 @@ enqueue-load-new-guix-revision-job most-recent-n-load-new-guix-revision-jobs)) -(define* (log-port job-id conn #:key delete-existing-log-parts?) +(define (log-part-sequence-name job-id) + (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id)) + +(define* (log-port job-id conn + #:key + delete-existing-log-parts? + real-output-port) (define output-port - (current-output-port)) + (or real-output-port + (current-output-port))) - (define id 0) (define buffer "") (define (insert job_id s) @@ -77,18 +84,21 @@ conn (string-append "INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) " - "VALUES ($1, $2, $3)") - (list (number->string id) job_id s))) + "VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") + (list job_id s))) (define (log-string s) (if (string-contains s "\n") (let ((output (string-append buffer s))) - (set! id (+ 1 id)) ; increment id (set! buffer "") ; clear the buffer (insert job-id output) (display output output-port)) (set! buffer (string-append buffer s)))) + (exec-query + conn + (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A" + (log-part-sequence-name job-id))) (when delete-existing-log-parts? ;; TODO, this is useful when re-running jobs, but I'm not sure that should ;; be a thing, jobs should probably be only attempted once. @@ -113,6 +123,34 @@ (setvbuf port 'line) port)) +(define (setup-port-for-inferior-error-output job-id real-output-port) + (define (insert conn job_id s) + (exec-query + conn + (string-append " +INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) +VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") + (list job_id s))) + + (match (pipe) + ((port-to-read-from . port-to-write-to) + + (setvbuf port-to-read-from 'line) + (setvbuf port-to-write-to 'line) + (call-with-new-thread + (lambda () + (with-postgresql-connection + (simple-format #f "~A inferior error logging" job-id) + (lambda (logging-conn) + (let loop ((line (get-line port-to-read-from))) + (let ((line-with-newline + (string-append line "\n"))) + (insert logging-conn job-id line-with-newline) + (display line-with-newline real-output-port)) + (loop (get-line port-to-read-from)))))))3 + + port-to-write-to))) + (define real-error-port (make-parameter (current-error-port))) @@ -206,6 +244,13 @@ WHERE job_id = $1" "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" (list job-id))))) +(define (drop-log-parts-sequence conn job-id) + (exec-query + conn + (string-append + "DROP SEQUENCE " + (log-part-sequence-name job-id)))) + (define inferior-package-id (@@ (guix inferior) inferior-package-id)) @@ -1554,7 +1599,8 @@ SKIP LOCKED") (let ((result (parameterize ((current-build-output-port logging-port) (real-error-port previous-error-port) - (inferior-error-port previous-error-port)) + (inferior-error-port + (setup-port-for-inferior-error-output id previous-error-port))) (catch #t (lambda () (with-store-connection @@ -1570,6 +1616,7 @@ SKIP LOCKED") key args) #f))))) (combine-log-parts! logging-conn id) + (drop-log-parts-sequence logging-conn id) ;; This can happen with GC, so do it explicitly (close-port logging-port) -- cgit v1.2.3