aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-data-service/jobs.scm100
-rw-r--r--guix-data-service/jobs/load-new-guix-revision.scm121
-rw-r--r--tests/jobs-load-new-guix-revision.scm42
3 files changed, 194 insertions, 69 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm
index 7fe361b..2d515ca 100644
--- a/guix-data-service/jobs.scm
+++ b/guix-data-service/jobs.scm
@@ -1,11 +1,103 @@
(define-module (guix-data-service jobs)
#:use-module (ice-9 match)
+ #:use-module (ice-9 format)
#:use-module (guix-data-service jobs load-new-guix-revision)
#:export (process-jobs))
(define (process-jobs conn)
+ (define (fetch-new-jobs)
+ (fetch-unlocked-jobs conn))
+
+ (define (process-job job-id)
+ (execlp "guix-data-service-process-job"
+ "guix-data-service-process-job"
+ job-id))
+
+ (process-jobs-concurrently fetch-new-jobs
+ process-job))
+
+(define default-max-processes
+ (max (round (/ (current-processor-count)
+ 4))
+ 1))
+
+(define* (process-jobs-concurrently fetch-new-jobs
+ process-job
+ #:key (max-processes
+ default-max-processes))
+ (define processes
+ (make-hash-table))
+
+ (define (display-status)
+ (display
+ (string-append
+ "\n\n"
+ (let ((running-jobs (hash-count (const #t) processes)))
+ (cond
+ ((eq? running-jobs 0)
+ "status: 0 running jobs")
+ ((eq? running-jobs 1)
+ "status: 1 running job")
+ (else
+ (simple-format #f "status: ~A running jobs"
+ running-jobs))))
+ "\n"
+ (string-concatenate
+ (hash-map->list
+ (lambda (pid job-args)
+ (format #f " pid: ~5d job args: ~a\n"
+ pid job-args))
+ processes))
+ "\n")))
+
+ (define (wait-on-processes)
+ (catch
+ #t
+ (lambda ()
+ (match (waitpid WAIT_ANY WNOHANG)
+ ((0 . status)
+ ;; No process to wait for
+ #f)
+ ((pid . status)
+ (let ((job-args (hashv-ref processes pid)))
+ (hashv-remove! processes pid)
+ (simple-format
+ (current-error-port)
+ "pid ~A failed with status ~A\n"
+ pid status))
+ (wait-on-processes))))
+ (lambda (key . args)
+ (simple-format #t "key ~A args ~A\n"
+ key args))))
+
+ (define (fork-and-process-job job-args)
+ (match (primitive-fork)
+ (0
+ (dynamic-wind
+ (const #t)
+ (lambda ()
+ (apply process-job job-args))
+ (lambda ()
+ (primitive-exit 127))))
+ (pid
+ (hashv-set! processes pid job-args)
+ #t)))
+
(while #t
- (match (process-next-load-new-guix-revision-job conn)
- (#f (unless (eq? 0 (sleep 5))
- (exit 0)))
- (_ (simple-format #t "\nFinished processing job\n\n")))))
+ (wait-on-processes)
+ (display-status)
+ (match (fetch-new-jobs)
+ (()
+ ;; Nothing to do
+ #f)
+ ((jobs ...)
+ (for-each
+ (lambda (job-args)
+ (let ((current-processes
+ (hash-count (const #t) processes)))
+ (when (< current-processes
+ max-processes)
+ (fork-and-process-job job-args))))
+ jobs)))
+ (unless (eq? 0 (sleep 15))
+ (exit 0))))
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm
index a69b690..a1e22ba 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -24,7 +24,8 @@
#:use-module (guix-data-service model package-metadata)
#:use-module (guix-data-service model derivation)
#:export (log-for-job
- process-next-load-new-guix-revision-job
+ fetch-unlocked-jobs
+ process-load-new-guix-revision-job
select-job-for-commit
select-jobs-and-events
enqueue-load-new-guix-revision-job
@@ -671,18 +672,20 @@ ORDER BY load_new_guix_revision_jobs.id DESC")
(list (number->string n)))))
result))
-(define (select-next-job-to-process conn)
+(define (select-job-for-update conn id)
(exec-query
conn
(string-append
"SELECT id, commit, source, git_repository_id "
"FROM load_new_guix_revision_jobs "
- "WHERE succeeded_at IS NULL AND NOT EXISTS ("
+ "WHERE id = $1 AND succeeded_at IS NULL AND NOT EXISTS ("
"SELECT 1 "
"FROM load_new_guix_revision_job_events "
;; Skip jobs that have failed, to avoid trying them over and over again
"WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure'"
- ") ORDER BY id DESC LIMIT 1")))
+ ") ORDER BY id DESC "
+ "FOR NO KEY UPDATE SKIP LOCKED")
+ (list id)))
(define (record-job-event conn job-id event)
(exec-query
@@ -701,43 +704,73 @@ ORDER BY load_new_guix_revision_jobs.id DESC")
"WHERE id = $1 ")
(list id)))
-(define (process-next-load-new-guix-revision-job conn)
- (match (select-next-job-to-process conn)
- (((id commit source git-repository-id))
- (let ((previous-output-port (current-output-port))
- (previous-error-port (current-error-port)))
- (record-job-event conn id "start")
- (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n"
- id commit source)
- (exec-query conn "BEGIN")
- (if (or (guix-revision-exists? conn git-repository-id commit)
- (eq? (log-time
- (string-append "loading revision " commit)
- (lambda ()
- (let ((result
- (with-postgresql-connection
- (simple-format #f "load-new-guix-revision ~A logging" id)
- (lambda (logging-conn)
- (insert-empty-log-entry logging-conn id)
- (let ((logging-port (log-port id logging-conn)))
- (set-current-output-port logging-port)
- (set-current-error-port logging-port)
- (let ((result
- (parameterize ((current-build-output-port logging-port))
- (load-new-guix-revision conn git-repository-id commit))))
- (combine-log-parts! logging-conn id)
- result))))))
- (set-current-output-port previous-output-port)
- (set-current-error-port previous-error-port)
- result)))
- #t))
- (begin
- (record-job-succeeded conn id)
- (record-job-event conn id "success")
- (exec-query conn "COMMIT")
- #t)
- (begin
- (exec-query conn "ROLLBACK")
- (record-job-event conn id "failure")
- #f))))
- (_ #f)))
+(define (fetch-unlocked-jobs conn)
+ (exec-query
+ conn
+ "
+SELECT id FROM load_new_guix_revision_jobs
+WHERE
+ succeeded_at IS NULL AND
+ NOT EXISTS (
+ SELECT 1
+ FROM load_new_guix_revision_job_events
+ -- Skip jobs that have failed, to avoid trying them over and over again
+ WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure'
+ )
+ORDER BY id DESC
+FOR NO KEY UPDATE SKIP LOCKED"))
+
+(define (process-load-new-guix-revision-job id)
+ (with-postgresql-connection
+ (simple-format #f "load-new-guix-revision ~A" id)
+ (lambda (conn)
+ (exec-query conn "BEGIN")
+
+ (match (select-job-for-update conn id)
+ (((id commit source git-repository-id))
+
+ ;; With a separate connection, outside of the transaction so the event
+ ;; gets persisted regardless.
+ (with-postgresql-connection
+ (simple-format #f "load-new-guix-revision ~A start-event" id)
+ (lambda (start-event-conn)
+ (record-job-event start-event-conn id "start")))
+
+ (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n"
+ id commit source)
+
+ (if (or (guix-revision-exists? conn git-repository-id commit)
+ (eq? (log-time
+ (string-append "loading revision " commit)
+ (lambda ()
+ (let* ((previous-output-port (current-output-port))
+ (previous-error-port (current-error-port))
+ (result
+ (with-postgresql-connection
+ (simple-format #f "load-new-guix-revision ~A logging" id)
+ (lambda (logging-conn)
+ (insert-empty-log-entry logging-conn id)
+ (let ((logging-port (log-port id logging-conn)))
+ (set-current-output-port logging-port)
+ (set-current-error-port logging-port)
+ (let ((result
+ (parameterize ((current-build-output-port logging-port))
+ (load-new-guix-revision conn git-repository-id commit))))
+ (combine-log-parts! logging-conn id)
+ result))))))
+ (set-current-output-port previous-output-port)
+ (set-current-error-port previous-error-port)
+ result)))
+ #t))
+ (begin
+ (record-job-succeeded conn id)
+ (record-job-event conn id "success")
+ (exec-query conn "COMMIT")
+ #t)
+ (begin
+ (exec-query conn "ROLLBACK")
+ (record-job-event conn id "failure")
+ #f)))
+ (()
+ (simple-format #t "job ~A not found to be processed\n"
+ id))))))
diff --git a/tests/jobs-load-new-guix-revision.scm b/tests/jobs-load-new-guix-revision.scm
index 944feee..aee6225 100644
--- a/tests/jobs-load-new-guix-revision.scm
+++ b/tests/jobs-load-new-guix-revision.scm
@@ -32,13 +32,13 @@
(lambda (conn git-repository-id commit store-path)
#t))
- (enqueue-load-new-guix-revision-job
- conn
- (git-repository-url->git-repository-id conn "test-url")
- "test-commit"
- "test-source")
-
- (process-next-load-new-guix-revision-job conn))))
+ (match (enqueue-load-new-guix-revision-job
+ conn
+ (git-repository-url->git-repository-id conn "test-url")
+ "test-commit"
+ "test-source")
+ ((id)
+ (process-load-new-guix-revision-job id))))))
(test-equal "test build store item failure"
#f
@@ -48,13 +48,13 @@
(lambda (conn git-repository-id commit)
#f))
- (enqueue-load-new-guix-revision-job
- conn
- (git-repository-url->git-repository-id conn "test-url")
- "test-commit"
- "test-source")
-
- (process-next-load-new-guix-revision-job conn)))
+ (match (enqueue-load-new-guix-revision-job
+ conn
+ (git-repository-url->git-repository-id conn "test-url")
+ "test-commit"
+ "test-source")
+ ((id)
+ (process-load-new-guix-revision-job id)))))
(test-equal "test extract information failure"
#f
@@ -70,12 +70,12 @@
(lambda (conn git-repository-id commit store-path)
#f))
- (enqueue-load-new-guix-revision-job
- conn
- (git-repository-url->git-repository-id conn "test-url")
- "test-commit"
- "test-source")
-
- (process-next-load-new-guix-revision-job conn))))))
+ (match (enqueue-load-new-guix-revision-job
+ conn
+ (git-repository-url->git-repository-id conn "test-url")
+ "test-commit"
+ "test-source")
+ ((id)
+ (process-load-new-guix-revision-job id))))))))
(test-end)