diff options
Diffstat (limited to 'guix-data-service/jobs.scm')
-rw-r--r-- | guix-data-service/jobs.scm | 241 |
1 files changed, 211 insertions, 30 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm index a0f59dc..b151367 100644 --- a/guix-data-service/jobs.scm +++ b/guix-data-service/jobs.scm @@ -20,11 +20,93 @@ #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 atomic) + #:use-module (ice-9 textual-ports) + #:use-module (squee) + #:use-module (guix-data-service utils) + #:use-module (guix-data-service database) #:use-module (guix-data-service jobs load-new-guix-revision) - #:export (process-jobs + #:export (log-for-job + count-log-parts + combine-log-parts! + + process-jobs default-max-processes)) +(define (log-part-sequence-name job-id) + (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id)) + +(define (start-thread-for-process-output job-id) + (define (insert conn job_id s) + (exec-query + conn + (string-append " +INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) +VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") + (list job_id s))) + + + (match (pipe) + ((port-to-read-from . port-to-write-to) + + (setvbuf port-to-read-from 'line) + (setvbuf port-to-write-to 'line) + + (let ((flags (fcntl port-to-read-from F_GETFL))) + (fcntl port-to-read-from F_SETFL (logior O_NONBLOCK flags))) + (let ((flags (fcntl port-to-write-to F_GETFL))) + (fcntl port-to-write-to F_SETFL (logior O_NONBLOCK flags))) + + (call-with-new-thread + (lambda () + (with-postgresql-connection + (simple-format #f "~A job logging" job-id) + (lambda (logging-conn) + (exec-query + logging-conn + (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A" + (log-part-sequence-name job-id))) + (exec-query + logging-conn + "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" + (list job-id)) + + (let loop ((line (get-line port-to-read-from))) + (let ((line-with-newline + (string-append line "\n"))) + (catch #t + (lambda () + (insert logging-conn job-id line-with-newline) + (display line-with-newline)) + (lambda (key . args) + (display + (simple-format + #f + " +error: ~A: ~A +error: could not insert log part: '~A'\n\n" + key args line)) + (catch #t + (lambda () + (insert + logging-conn + job-id + (simple-format + #f + " +guix-data-service: error: missing log line: ~A +\n" key))) + (lambda _ + #t))))) + (loop (get-line port-to-read-from))))))) + + port-to-write-to))) + +(define (cleanup-logging conn job-id) + (drop-log-parts-sequence conn job-id) + (with-time-logging "vacuuming log parts" + (vacuum-log-parts-table conn))) + (define* (process-jobs conn #:key max-processes latest-branch-revision-max-processes skip-system-tests?) @@ -32,29 +114,137 @@ (fetch-unlocked-jobs conn)) (define (process-job job-id) - (apply execlp - "guix-data-service-process-job" - "guix-data-service-process-job" - job-id - (if skip-system-tests? - '("--skip-system-tests") - '()))) + (let ((log-port (start-thread-for-process-output job-id))) + (spawn + "guix-data-service-process-job" + (cons* "guix-data-service-process-job" + job-id + (if skip-system-tests? + '("--skip-system-tests") + '())) + #:output log-port + #:error log-port))) + + (define (post-job job-id) + (when (> (count-log-parts conn job-id) + 0) + (combine-log-parts! conn job-id) + (cleanup-logging conn job-id))) (define (handle-job-failure job-id) (record-job-event conn job-id "failure") (display (simple-format #f "recording failure for job ~A\n" job-id) - (current-error-port)) - (when (> (count-log-parts conn job-id) - 0) - (combine-log-parts! conn job-id))) + (current-error-port))) (process-jobs-concurrently fetch-new-jobs process-job + post-job handle-job-failure #:max-processes max-processes #:priority-max-processes latest-branch-revision-max-processes)) + +(define* (log-for-job conn job-id + #:key + character-limit + start-character) + (define (sql-html-escape s) + (string-append + "replace(" + (string-append + "replace(" + (string-append + "replace(" + s + ",'&','&')") + ",'<','<')") + ",'>','>')")) + + (define (get-characters s) + (if start-character + (simple-format #f "substr(~A, ~A, ~A)" + s start-character + character-limit) + (simple-format #f "right(~A, ~A)" s character-limit))) + + (define log-query + (string-append + "SELECT " + (sql-html-escape (get-characters "contents")) + " FROM load_new_guix_revision_job_logs" + " WHERE job_id = $1 AND contents IS NOT NULL")) + + (define parts-query + (string-append + "SELECT " + (sql-html-escape + (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)")) + " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1")) + + (match (exec-query conn log-query (list job-id)) + (((contents)) + contents) + (() + (match (exec-query conn parts-query (list job-id)) + (((contents)) + contents))))) + +(define (count-log-parts conn job-id) + (match (exec-query + conn + " +SELECT COUNT(*) +FROM load_new_guix_revision_job_log_parts +WHERE job_id = $1" + (list job-id)) + (((id)) + (string->number id)))) + +(define (combine-log-parts! conn job-id) + (with-postgresql-transaction + conn + (lambda (conn) + (exec-query + conn + (string-append + " +UPDATE load_new_guix_revision_job_logs SET contents = ( + SELECT STRING_AGG(contents, '' ORDER BY id ASC) + FROM load_new_guix_revision_job_log_parts + WHERE job_id = $1 + GROUP BY job_id +) +WHERE job_id = $1") + (list job-id)) + (exec-query + conn + "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" + (list job-id))))) + +(define (drop-log-parts-sequence conn job-id) + (with-postgresql-transaction + conn + (lambda (conn) + (exec-query conn + "SET LOCAL lock_timeout = '10s'") + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "error when dropping sequence: ~A" + exn)) + (lambda () + (exec-query conn + (string-append + "DROP SEQUENCE IF EXISTS " + (log-part-sequence-name job-id)))) + #:unwind? #t)))) + +(define (vacuum-log-parts-table conn) + (exec-query + conn + "VACUUM load_new_guix_revision_job_log_parts")) + (define default-max-processes (max (round (/ (current-processor-count) 4)) @@ -67,6 +257,7 @@ (define* (process-jobs-concurrently fetch-new-jobs process-job + post-job handle-job-failure #:key (max-processes default-max-processes) @@ -108,9 +299,10 @@ ;; No process to wait for #f) ((pid . status) - (unless (eq? status 0) - (match (hash-ref processes pid) - ((_ (id)) + (match (hash-ref processes pid) + ((_ (id)) + (post-job id) + (unless (eq? status 0) (simple-format (current-error-port) "pid ~A (job: ~A) failed with status ~A\n" pid id status) @@ -161,20 +353,6 @@ (kill pid SIGTERM))) processes)) - (define (fork-and-process-job job-args) - (match (primitive-fork) - (0 - (dynamic-wind - (const #t) - (lambda () - (apply process-job job-args)) - (lambda () - (primitive-exit 127)))) - (pid - (hashv-set! processes pid - (list (current-time) job-args)) - #t))) - (define exit? (make-atomic-box #f)) @@ -206,6 +384,9 @@ (if priority? priority-max-processes max-processes)) - (fork-and-process-job (list job-id)))))) + (let ((pid (process-job job-id))) + (peek "PID" pid) + (hashv-set! processes pid + (list (current-time) (list job-id)))))))) jobs))) (sleep 15))) |