aboutsummaryrefslogtreecommitdiff
path: root/guix-data-service/jobs.scm
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-11-05 13:42:03 +0000
committerChristopher Baines <mail@cbaines.net>2023-11-05 13:46:20 +0000
commitc3cb04cb8052492dae715012586f9e3d2d64579d (patch)
tree1f56c415ef46e68d60d623e0a0bea70c4bd74ceb /guix-data-service/jobs.scm
parentf5acc60288e0ad9f0c1093f3d50af1347e4df1df (diff)
downloaddata-service-c3cb04cb8052492dae715012586f9e3d2d64579d.tar
data-service-c3cb04cb8052492dae715012586f9e3d2d64579d.tar.gz
Use fibers when processing new revisions
Just have one fiber at the moment, but this will enable using fibers for parallelism in the future. Fibers seemed to cause problems with the logging setup, which was a bit odd in the first place. So move logging to the parent process which is better anyway.
Diffstat (limited to 'guix-data-service/jobs.scm')
-rw-r--r--guix-data-service/jobs.scm241
1 files changed, 211 insertions, 30 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm
index a0f59dc..b151367 100644
--- a/guix-data-service/jobs.scm
+++ b/guix-data-service/jobs.scm
@@ -20,11 +20,93 @@
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
#:use-module (ice-9 atomic)
+ #:use-module (ice-9 textual-ports)
+ #:use-module (squee)
+ #:use-module (guix-data-service utils)
+ #:use-module (guix-data-service database)
#:use-module (guix-data-service jobs load-new-guix-revision)
- #:export (process-jobs
+ #:export (log-for-job
+ count-log-parts
+ combine-log-parts!
+
+ process-jobs
default-max-processes))
+(define (log-part-sequence-name job-id)
+ (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id))
+
+(define (start-thread-for-process-output job-id)
+ (define (insert conn job_id s)
+ (exec-query
+ conn
+ (string-append "
+INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents)
+VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
+ (list job_id s)))
+
+
+ (match (pipe)
+ ((port-to-read-from . port-to-write-to)
+
+ (setvbuf port-to-read-from 'line)
+ (setvbuf port-to-write-to 'line)
+
+ (let ((flags (fcntl port-to-read-from F_GETFL)))
+ (fcntl port-to-read-from F_SETFL (logior O_NONBLOCK flags)))
+ (let ((flags (fcntl port-to-write-to F_GETFL)))
+ (fcntl port-to-write-to F_SETFL (logior O_NONBLOCK flags)))
+
+ (call-with-new-thread
+ (lambda ()
+ (with-postgresql-connection
+ (simple-format #f "~A job logging" job-id)
+ (lambda (logging-conn)
+ (exec-query
+ logging-conn
+ (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A"
+ (log-part-sequence-name job-id)))
+ (exec-query
+ logging-conn
+ "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
+ (list job-id))
+
+ (let loop ((line (get-line port-to-read-from)))
+ (let ((line-with-newline
+ (string-append line "\n")))
+ (catch #t
+ (lambda ()
+ (insert logging-conn job-id line-with-newline)
+ (display line-with-newline))
+ (lambda (key . args)
+ (display
+ (simple-format
+ #f
+ "
+error: ~A: ~A
+error: could not insert log part: '~A'\n\n"
+ key args line))
+ (catch #t
+ (lambda ()
+ (insert
+ logging-conn
+ job-id
+ (simple-format
+ #f
+ "
+guix-data-service: error: missing log line: ~A
+\n" key)))
+ (lambda _
+ #t)))))
+ (loop (get-line port-to-read-from)))))))
+
+ port-to-write-to)))
+
+(define (cleanup-logging conn job-id)
+ (drop-log-parts-sequence conn job-id)
+ (with-time-logging "vacuuming log parts"
+ (vacuum-log-parts-table conn)))
+
(define* (process-jobs conn #:key max-processes
latest-branch-revision-max-processes
skip-system-tests?)
@@ -32,29 +114,137 @@
(fetch-unlocked-jobs conn))
(define (process-job job-id)
- (apply execlp
- "guix-data-service-process-job"
- "guix-data-service-process-job"
- job-id
- (if skip-system-tests?
- '("--skip-system-tests")
- '())))
+ (let ((log-port (start-thread-for-process-output job-id)))
+ (spawn
+ "guix-data-service-process-job"
+ (cons* "guix-data-service-process-job"
+ job-id
+ (if skip-system-tests?
+ '("--skip-system-tests")
+ '()))
+ #:output log-port
+ #:error log-port)))
+
+ (define (post-job job-id)
+ (when (> (count-log-parts conn job-id)
+ 0)
+ (combine-log-parts! conn job-id)
+ (cleanup-logging conn job-id)))
(define (handle-job-failure job-id)
(record-job-event conn job-id "failure")
(display (simple-format #f "recording failure for job ~A\n" job-id)
- (current-error-port))
- (when (> (count-log-parts conn job-id)
- 0)
- (combine-log-parts! conn job-id)))
+ (current-error-port)))
(process-jobs-concurrently fetch-new-jobs
process-job
+ post-job
handle-job-failure
#:max-processes max-processes
#:priority-max-processes
latest-branch-revision-max-processes))
+
+(define* (log-for-job conn job-id
+ #:key
+ character-limit
+ start-character)
+ (define (sql-html-escape s)
+ (string-append
+ "replace("
+ (string-append
+ "replace("
+ (string-append
+ "replace("
+ s
+ ",'&','&amp;')")
+ ",'<','&lt;')")
+ ",'>','&gt;')"))
+
+ (define (get-characters s)
+ (if start-character
+ (simple-format #f "substr(~A, ~A, ~A)"
+ s start-character
+ character-limit)
+ (simple-format #f "right(~A, ~A)" s character-limit)))
+
+ (define log-query
+ (string-append
+ "SELECT "
+ (sql-html-escape (get-characters "contents"))
+ " FROM load_new_guix_revision_job_logs"
+ " WHERE job_id = $1 AND contents IS NOT NULL"))
+
+ (define parts-query
+ (string-append
+ "SELECT "
+ (sql-html-escape
+ (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)"))
+ " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"))
+
+ (match (exec-query conn log-query (list job-id))
+ (((contents))
+ contents)
+ (()
+ (match (exec-query conn parts-query (list job-id))
+ (((contents))
+ contents)))))
+
+(define (count-log-parts conn job-id)
+ (match (exec-query
+ conn
+ "
+SELECT COUNT(*)
+FROM load_new_guix_revision_job_log_parts
+WHERE job_id = $1"
+ (list job-id))
+ (((id))
+ (string->number id))))
+
+(define (combine-log-parts! conn job-id)
+ (with-postgresql-transaction
+ conn
+ (lambda (conn)
+ (exec-query
+ conn
+ (string-append
+ "
+UPDATE load_new_guix_revision_job_logs SET contents = (
+ SELECT STRING_AGG(contents, '' ORDER BY id ASC)
+ FROM load_new_guix_revision_job_log_parts
+ WHERE job_id = $1
+ GROUP BY job_id
+)
+WHERE job_id = $1")
+ (list job-id))
+ (exec-query
+ conn
+ "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
+ (list job-id)))))
+
+(define (drop-log-parts-sequence conn job-id)
+ (with-postgresql-transaction
+ conn
+ (lambda (conn)
+ (exec-query conn
+ "SET LOCAL lock_timeout = '10s'")
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "error when dropping sequence: ~A"
+ exn))
+ (lambda ()
+ (exec-query conn
+ (string-append
+ "DROP SEQUENCE IF EXISTS "
+ (log-part-sequence-name job-id))))
+ #:unwind? #t))))
+
+(define (vacuum-log-parts-table conn)
+ (exec-query
+ conn
+ "VACUUM load_new_guix_revision_job_log_parts"))
+
(define default-max-processes
(max (round (/ (current-processor-count)
4))
@@ -67,6 +257,7 @@
(define* (process-jobs-concurrently
fetch-new-jobs
process-job
+ post-job
handle-job-failure
#:key
(max-processes default-max-processes)
@@ -108,9 +299,10 @@
;; No process to wait for
#f)
((pid . status)
- (unless (eq? status 0)
- (match (hash-ref processes pid)
- ((_ (id))
+ (match (hash-ref processes pid)
+ ((_ (id))
+ (post-job id)
+ (unless (eq? status 0)
(simple-format (current-error-port)
"pid ~A (job: ~A) failed with status ~A\n"
pid id status)
@@ -161,20 +353,6 @@
(kill pid SIGTERM)))
processes))
- (define (fork-and-process-job job-args)
- (match (primitive-fork)
- (0
- (dynamic-wind
- (const #t)
- (lambda ()
- (apply process-job job-args))
- (lambda ()
- (primitive-exit 127))))
- (pid
- (hashv-set! processes pid
- (list (current-time) job-args))
- #t)))
-
(define exit?
(make-atomic-box #f))
@@ -206,6 +384,9 @@
(if priority?
priority-max-processes
max-processes))
- (fork-and-process-job (list job-id))))))
+ (let ((pid (process-job job-id)))
+ (peek "PID" pid)
+ (hashv-set! processes pid
+ (list (current-time) (list job-id))))))))
jobs)))
(sleep 15)))