aboutsummaryrefslogtreecommitdiff
path: root/guix-data-service/jobs.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-data-service/jobs.scm')
-rw-r--r--guix-data-service/jobs.scm241
1 files changed, 211 insertions, 30 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm
index a0f59dc..b151367 100644
--- a/guix-data-service/jobs.scm
+++ b/guix-data-service/jobs.scm
@@ -20,11 +20,93 @@
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
#:use-module (ice-9 atomic)
+ #:use-module (ice-9 textual-ports)
+ #:use-module (squee)
+ #:use-module (guix-data-service utils)
+ #:use-module (guix-data-service database)
#:use-module (guix-data-service jobs load-new-guix-revision)
- #:export (process-jobs
+ #:export (log-for-job
+ count-log-parts
+ combine-log-parts!
+
+ process-jobs
default-max-processes))
+(define (log-part-sequence-name job-id)
+ (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id))
+
+(define (start-thread-for-process-output job-id)
+ (define (insert conn job_id s)
+ (exec-query
+ conn
+ (string-append "
+INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents)
+VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
+ (list job_id s)))
+
+
+ (match (pipe)
+ ((port-to-read-from . port-to-write-to)
+
+ (setvbuf port-to-read-from 'line)
+ (setvbuf port-to-write-to 'line)
+
+ (let ((flags (fcntl port-to-read-from F_GETFL)))
+ (fcntl port-to-read-from F_SETFL (logior O_NONBLOCK flags)))
+ (let ((flags (fcntl port-to-write-to F_GETFL)))
+ (fcntl port-to-write-to F_SETFL (logior O_NONBLOCK flags)))
+
+ (call-with-new-thread
+ (lambda ()
+ (with-postgresql-connection
+ (simple-format #f "~A job logging" job-id)
+ (lambda (logging-conn)
+ (exec-query
+ logging-conn
+ (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A"
+ (log-part-sequence-name job-id)))
+ (exec-query
+ logging-conn
+ "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
+ (list job-id))
+
+ (let loop ((line (get-line port-to-read-from)))
+ (let ((line-with-newline
+ (string-append line "\n")))
+ (catch #t
+ (lambda ()
+ (insert logging-conn job-id line-with-newline)
+ (display line-with-newline))
+ (lambda (key . args)
+ (display
+ (simple-format
+ #f
+ "
+error: ~A: ~A
+error: could not insert log part: '~A'\n\n"
+ key args line))
+ (catch #t
+ (lambda ()
+ (insert
+ logging-conn
+ job-id
+ (simple-format
+ #f
+ "
+guix-data-service: error: missing log line: ~A
+\n" key)))
+ (lambda _
+ #t)))))
+ (loop (get-line port-to-read-from)))))))
+
+ port-to-write-to)))
+
+(define (cleanup-logging conn job-id)
+ (drop-log-parts-sequence conn job-id)
+ (with-time-logging "vacuuming log parts"
+ (vacuum-log-parts-table conn)))
+
(define* (process-jobs conn #:key max-processes
latest-branch-revision-max-processes
skip-system-tests?)
@@ -32,29 +114,137 @@
(fetch-unlocked-jobs conn))
(define (process-job job-id)
- (apply execlp
- "guix-data-service-process-job"
- "guix-data-service-process-job"
- job-id
- (if skip-system-tests?
- '("--skip-system-tests")
- '())))
+ (let ((log-port (start-thread-for-process-output job-id)))
+ (spawn
+ "guix-data-service-process-job"
+ (cons* "guix-data-service-process-job"
+ job-id
+ (if skip-system-tests?
+ '("--skip-system-tests")
+ '()))
+ #:output log-port
+ #:error log-port)))
+
+ (define (post-job job-id)
+ (when (> (count-log-parts conn job-id)
+ 0)
+ (combine-log-parts! conn job-id)
+ (cleanup-logging conn job-id)))
(define (handle-job-failure job-id)
(record-job-event conn job-id "failure")
(display (simple-format #f "recording failure for job ~A\n" job-id)
- (current-error-port))
- (when (> (count-log-parts conn job-id)
- 0)
- (combine-log-parts! conn job-id)))
+ (current-error-port)))
(process-jobs-concurrently fetch-new-jobs
process-job
+ post-job
handle-job-failure
#:max-processes max-processes
#:priority-max-processes
latest-branch-revision-max-processes))
+
+(define* (log-for-job conn job-id
+ #:key
+ character-limit
+ start-character)
+ (define (sql-html-escape s)
+ (string-append
+ "replace("
+ (string-append
+ "replace("
+ (string-append
+ "replace("
+ s
+ ",'&','&')")
+ ",'<','&lt;')")
+ ",'>','&gt;')"))
+
+ (define (get-characters s)
+ (if start-character
+ (simple-format #f "substr(~A, ~A, ~A)"
+ s start-character
+ character-limit)
+ (simple-format #f "right(~A, ~A)" s character-limit)))
+
+ (define log-query
+ (string-append
+ "SELECT "
+ (sql-html-escape (get-characters "contents"))
+ " FROM load_new_guix_revision_job_logs"
+ " WHERE job_id = $1 AND contents IS NOT NULL"))
+
+ (define parts-query
+ (string-append
+ "SELECT "
+ (sql-html-escape
+ (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)"))
+ " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"))
+
+ (match (exec-query conn log-query (list job-id))
+ (((contents))
+ contents)
+ (()
+ (match (exec-query conn parts-query (list job-id))
+ (((contents))
+ contents)))))
+
+(define (count-log-parts conn job-id)
+ (match (exec-query
+ conn
+ "
+SELECT COUNT(*)
+FROM load_new_guix_revision_job_log_parts
+WHERE job_id = $1"
+ (list job-id))
+ (((id))
+ (string->number id))))
+
+(define (combine-log-parts! conn job-id)
+ (with-postgresql-transaction
+ conn
+ (lambda (conn)
+ (exec-query
+ conn
+ (string-append
+ "
+UPDATE load_new_guix_revision_job_logs SET contents = (
+ SELECT STRING_AGG(contents, '' ORDER BY id ASC)
+ FROM load_new_guix_revision_job_log_parts
+ WHERE job_id = $1
+ GROUP BY job_id
+)
+WHERE job_id = $1")
+ (list job-id))
+ (exec-query
+ conn
+ "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
+ (list job-id)))))
+
+(define (drop-log-parts-sequence conn job-id)
+ (with-postgresql-transaction
+ conn
+ (lambda (conn)
+ (exec-query conn
+ "SET LOCAL lock_timeout = '10s'")
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "error when dropping sequence: ~A"
+ exn))
+ (lambda ()
+ (exec-query conn
+ (string-append
+ "DROP SEQUENCE IF EXISTS "
+ (log-part-sequence-name job-id))))
+ #:unwind? #t))))
+
+(define (vacuum-log-parts-table conn)
+ (exec-query
+ conn
+ "VACUUM load_new_guix_revision_job_log_parts"))
+
(define default-max-processes
(max (round (/ (current-processor-count)
4))
@@ -67,6 +257,7 @@
(define* (process-jobs-concurrently
fetch-new-jobs
process-job
+ post-job
handle-job-failure
#:key
(max-processes default-max-processes)
@@ -108,9 +299,10 @@
;; No process to wait for
#f)
((pid . status)
- (unless (eq? status 0)
- (match (hash-ref processes pid)
- ((_ (id))
+ (match (hash-ref processes pid)
+ ((_ (id))
+ (post-job id)
+ (unless (eq? status 0)
(simple-format (current-error-port)
"pid ~A (job: ~A) failed with status ~A\n"
pid id status)
@@ -161,20 +353,6 @@
(kill pid SIGTERM)))
processes))
- (define (fork-and-process-job job-args)
- (match (primitive-fork)
- (0
- (dynamic-wind
- (const #t)
- (lambda ()
- (apply process-job job-args))
- (lambda ()
- (primitive-exit 127))))
- (pid
- (hashv-set! processes pid
- (list (current-time) job-args))
- #t)))
-
(define exit?
(make-atomic-box #f))
@@ -206,6 +384,9 @@
(if priority?
priority-max-processes
max-processes))
- (fork-and-process-job (list job-id))))))
+ (let ((pid (process-job job-id)))
+ (peek "PID" pid)
+ (hashv-set! processes pid
+ (list (current-time) (list job-id))))))))
jobs)))
(sleep 15)))