aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2019-07-12 19:58:37 +0100
committerChristopher Baines <mail@cbaines.net>2019-07-12 23:00:44 +0100
commit83ef624b978f196892d2a28fc59797a15cded131 (patch)
tree539e6b6b67edb6507be8ba9c77ddab531c27dbea
parent09d927cb99d488b1d9024ec7592b900b6645d065 (diff)
downloaddata-service-83ef624b978f196892d2a28fc59797a15cded131.tar
data-service-83ef624b978f196892d2a28fc59797a15cded131.tar.gz
Switch to processing jobs in parallel
This should speed up processing new revisions, reduce latency between finding out about new revisions and processing them, as well as help manage memory usage, by processing each job in a process that then exits.
-rw-r--r--guix-data-service/jobs.scm100
-rw-r--r--guix-data-service/jobs/load-new-guix-revision.scm121
-rw-r--r--tests/jobs-load-new-guix-revision.scm42
3 files changed, 194 insertions, 69 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm
index 7fe361b..2d515ca 100644
--- a/guix-data-service/jobs.scm
+++ b/guix-data-service/jobs.scm
@@ -1,11 +1,103 @@
(define-module (guix-data-service jobs)
#:use-module (ice-9 match)
+ #:use-module (ice-9 format)
#:use-module (guix-data-service jobs load-new-guix-revision)
#:export (process-jobs))
(define (process-jobs conn)
+ (define (fetch-new-jobs)
+ (fetch-unlocked-jobs conn))
+
+ (define (process-job job-id)
+ (execlp "guix-data-service-process-job"
+ "guix-data-service-process-job"
+ job-id))
+
+ (process-jobs-concurrently fetch-new-jobs
+ process-job))
+
+(define default-max-processes
+ (max (round (/ (current-processor-count)
+ 4))
+ 1))
+
+(define* (process-jobs-concurrently fetch-new-jobs
+ process-job
+ #:key (max-processes
+ default-max-processes))
+ (define processes
+ (make-hash-table))
+
+ (define (display-status)
+ (display
+ (string-append
+ "\n\n"
+ (let ((running-jobs (hash-count (const #t) processes)))
+ (cond
+ ((eq? running-jobs 0)
+ "status: 0 running jobs")
+ ((eq? running-jobs 1)
+ "status: 1 running job")
+ (else
+ (simple-format #f "status: ~A running jobs"
+ running-jobs))))
+ "\n"
+ (string-concatenate
+ (hash-map->list
+ (lambda (pid job-args)
+ (format #f " pid: ~5d job args: ~a\n"
+ pid job-args))
+ processes))
+ "\n")))
+
+ (define (wait-on-processes)
+ (catch
+ #t
+ (lambda ()
+ (match (waitpid WAIT_ANY WNOHANG)
+ ((0 . status)
+ ;; No process to wait for
+ #f)
+ ((pid . status)
+ (let ((job-args (hashv-ref processes pid)))
+ (hashv-remove! processes pid)
+ (simple-format
+ (current-error-port)
+ "pid ~A failed with status ~A\n"
+ pid status))
+ (wait-on-processes))))
+ (lambda (key . args)
+ (simple-format #t "key ~A args ~A\n"
+ key args))))
+
+ (define (fork-and-process-job job-args)
+ (match (primitive-fork)
+ (0
+ (dynamic-wind
+ (const #t)
+ (lambda ()
+ (apply process-job job-args))
+ (lambda ()
+ (primitive-exit 127))))
+ (pid
+ (hashv-set! processes pid job-args)
+ #t)))
+
(while #t
- (match (process-next-load-new-guix-revision-job conn)
- (#f (unless (eq? 0 (sleep 5))
- (exit 0)))
- (_ (simple-format #t "\nFinished processing job\n\n")))))
+ (wait-on-processes)
+ (display-status)
+ (match (fetch-new-jobs)
+ (()
+ ;; Nothing to do
+ #f)
+ ((jobs ...)
+ (for-each
+ (lambda (job-args)
+ (let ((current-processes
+ (hash-count (const #t) processes)))
+ (when (< current-processes
+ max-processes)
+ (fork-and-process-job job-args))))
+ jobs)))
+ (unless (eq? 0 (sleep 15))
+ (exit 0))))
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm
index a69b690..a1e22ba 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -24,7 +24,8 @@
#:use-module (guix-data-service model package-metadata)
#:use-module (guix-data-service model derivation)
#:export (log-for-job
- process-next-load-new-guix-revision-job
+ fetch-unlocked-jobs
+ process-load-new-guix-revision-job
select-job-for-commit
select-jobs-and-events
enqueue-load-new-guix-revision-job
@@ -671,18 +672,20 @@ ORDER BY load_new_guix_revision_jobs.id DESC")
(list (number->string n)))))
result))
-(define (select-next-job-to-process conn)
+(define (select-job-for-update conn id)
(exec-query
conn
(string-append
"SELECT id, commit, source, git_repository_id "
"FROM load_new_guix_revision_jobs "
- "WHERE succeeded_at IS NULL AND NOT EXISTS ("
+ "WHERE id = $1 AND succeeded_at IS NULL AND NOT EXISTS ("
"SELECT 1 "
"FROM load_new_guix_revision_job_events "
;; Skip jobs that have failed, to avoid trying them over and over again
"WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure'"
- ") ORDER BY id DESC LIMIT 1")))
+ ") ORDER BY id DESC "
+ "FOR NO KEY UPDATE SKIP LOCKED")
+ (list id)))
(define (record-job-event conn job-id event)
(exec-query
@@ -701,43 +704,73 @@ ORDER BY load_new_guix_revision_jobs.id DESC")
"WHERE id = $1 ")
(list id)))
-(define (process-next-load-new-guix-revision-job conn)
- (match (select-next-job-to-process conn)
- (((id commit source git-repository-id))
- (let ((previous-output-port (current-output-port))
- (previous-error-port (current-error-port)))
- (record-job-event conn id "start")
- (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n"
- id commit source)
- (exec-query conn "BEGIN")
- (if (or (guix-revision-exists? conn git-repository-id commit)
- (eq? (log-time
- (string-append "loading revision " commit)
- (lambda ()
- (let ((result
- (with-postgresql-connection
- (simple-format #f "load-new-guix-revision ~A logging" id)
- (lambda (logging-conn)
- (insert-empty-log-entry logging-conn id)
- (let ((logging-port (log-port id logging-conn)))
- (set-current-output-port logging-port)
- (set-current-error-port logging-port)
- (let ((result
- (parameterize ((current-build-output-port logging-port))
- (load-new-guix-revision conn git-repository-id commit))))
- (combine-log-parts! logging-conn id)
- result))))))
- (set-current-output-port previous-output-port)
- (set-current-error-port previous-error-port)
- result)))
- #t))
- (begin
- (record-job-succeeded conn id)
- (record-job-event conn id "success")
- (exec-query conn "COMMIT")
- #t)
- (begin
- (exec-query conn "ROLLBACK")
- (record-job-event conn id "failure")
- #f))))
- (_ #f)))
+(define (fetch-unlocked-jobs conn)
+ (exec-query
+ conn
+ "
+SELECT id FROM load_new_guix_revision_jobs
+WHERE
+ succeeded_at IS NULL AND
+ NOT EXISTS (
+ SELECT 1
+ FROM load_new_guix_revision_job_events
+ -- Skip jobs that have failed, to avoid trying them over and over again
+ WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure'
+ )
+ORDER BY id DESC
+FOR NO KEY UPDATE SKIP LOCKED"))
+
+(define (process-load-new-guix-revision-job id)
+ (with-postgresql-connection
+ (simple-format #f "load-new-guix-revision ~A" id)
+ (lambda (conn)
+ (exec-query conn "BEGIN")
+
+ (match (select-job-for-update conn id)
+ (((id commit source git-repository-id))
+
+ ;; With a separate connection, outside of the transaction so the event
+ ;; gets persisted regardless.
+ (with-postgresql-connection
+ (simple-format #f "load-new-guix-revision ~A start-event" id)
+ (lambda (start-event-conn)
+ (record-job-event start-event-conn id "start")))
+
+ (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n"
+ id commit source)
+
+ (if (or (guix-revision-exists? conn git-repository-id commit)
+ (eq? (log-time
+ (string-append "loading revision " commit)
+ (lambda ()
+ (let* ((previous-output-port (current-output-port))
+ (previous-error-port (current-error-port))
+ (result
+ (with-postgresql-connection
+ (simple-format #f "load-new-guix-revision ~A logging" id)
+ (lambda (logging-conn)
+ (insert-empty-log-entry logging-conn id)
+ (let ((logging-port (log-port id logging-conn)))
+ (set-current-output-port logging-port)
+ (set-current-error-port logging-port)
+ (let ((result
+ (parameterize ((current-build-output-port logging-port))
+ (load-new-guix-revision conn git-repository-id commit))))
+ (combine-log-parts! logging-conn id)
+ result))))))
+ (set-current-output-port previous-output-port)
+ (set-current-error-port previous-error-port)
+ result)))
+ #t))
+ (begin
+ (record-job-succeeded conn id)
+ (record-job-event conn id "success")
+ (exec-query conn "COMMIT")
+ #t)
+ (begin
+ (exec-query conn "ROLLBACK")
+ (record-job-event conn id "failure")
+ #f)))
+ (()
+ (simple-format #t "job ~A not found to be processed\n"
+ id))))))
diff --git a/tests/jobs-load-new-guix-revision.scm b/tests/jobs-load-new-guix-revision.scm
index 944feee..aee6225 100644
--- a/tests/jobs-load-new-guix-revision.scm
+++ b/tests/jobs-load-new-guix-revision.scm
@@ -32,13 +32,13 @@
(lambda (conn git-repository-id commit store-path)
#t))
- (enqueue-load-new-guix-revision-job
- conn
- (git-repository-url->git-repository-id conn "test-url")
- "test-commit"
- "test-source")
-
- (process-next-load-new-guix-revision-job conn))))
+ (match (enqueue-load-new-guix-revision-job
+ conn
+ (git-repository-url->git-repository-id conn "test-url")
+ "test-commit"
+ "test-source")
+ ((id)
+ (process-load-new-guix-revision-job id))))))
(test-equal "test build store item failure"
#f
@@ -48,13 +48,13 @@
(lambda (conn git-repository-id commit)
#f))
- (enqueue-load-new-guix-revision-job
- conn
- (git-repository-url->git-repository-id conn "test-url")
- "test-commit"
- "test-source")
-
- (process-next-load-new-guix-revision-job conn)))
+ (match (enqueue-load-new-guix-revision-job
+ conn
+ (git-repository-url->git-repository-id conn "test-url")
+ "test-commit"
+ "test-source")
+ ((id)
+ (process-load-new-guix-revision-job id)))))
(test-equal "test extract information failure"
#f
@@ -70,12 +70,12 @@
(lambda (conn git-repository-id commit store-path)
#f))
- (enqueue-load-new-guix-revision-job
- conn
- (git-repository-url->git-repository-id conn "test-url")
- "test-commit"
- "test-source")
-
- (process-next-load-new-guix-revision-job conn))))))
+ (match (enqueue-load-new-guix-revision-job
+ conn
+ (git-repository-url->git-repository-id conn "test-url")
+ "test-commit"
+ "test-source")
+ ((id)
+ (process-load-new-guix-revision-job id))))))))
(test-end)