diff options
-rw-r--r-- | guix-data-service/jobs.scm | 100 | ||||
-rw-r--r-- | guix-data-service/jobs/load-new-guix-revision.scm | 121 | ||||
-rw-r--r-- | tests/jobs-load-new-guix-revision.scm | 42 |
3 files changed, 194 insertions, 69 deletions
diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm index 7fe361b..2d515ca 100644 --- a/guix-data-service/jobs.scm +++ b/guix-data-service/jobs.scm @@ -1,11 +1,103 @@ (define-module (guix-data-service jobs) #:use-module (ice-9 match) + #:use-module (ice-9 format) #:use-module (guix-data-service jobs load-new-guix-revision) #:export (process-jobs)) (define (process-jobs conn) + (define (fetch-new-jobs) + (fetch-unlocked-jobs conn)) + + (define (process-job job-id) + (execlp "guix-data-service-process-job" + "guix-data-service-process-job" + job-id)) + + (process-jobs-concurrently fetch-new-jobs + process-job)) + +(define default-max-processes + (max (round (/ (current-processor-count) + 4)) + 1)) + +(define* (process-jobs-concurrently fetch-new-jobs + process-job + #:key (max-processes + default-max-processes)) + (define processes + (make-hash-table)) + + (define (display-status) + (display + (string-append + "\n\n" + (let ((running-jobs (hash-count (const #t) processes))) + (cond + ((eq? running-jobs 0) + "status: 0 running jobs") + ((eq? running-jobs 1) + "status: 1 running job") + (else + (simple-format #f "status: ~A running jobs" + running-jobs)))) + "\n" + (string-concatenate + (hash-map->list + (lambda (pid job-args) + (format #f " pid: ~5d job args: ~a\n" + pid job-args)) + processes)) + "\n"))) + + (define (wait-on-processes) + (catch + #t + (lambda () + (match (waitpid WAIT_ANY WNOHANG) + ((0 . status) + ;; No process to wait for + #f) + ((pid . status) + (let ((job-args (hashv-ref processes pid))) + (hashv-remove! processes pid) + (simple-format + (current-error-port) + "pid ~A failed with status ~A\n" + pid status)) + (wait-on-processes)))) + (lambda (key . args) + (simple-format #t "key ~A args ~A\n" + key args)))) + + (define (fork-and-process-job job-args) + (match (primitive-fork) + (0 + (dynamic-wind + (const #t) + (lambda () + (apply process-job job-args)) + (lambda () + (primitive-exit 127)))) + (pid + (hashv-set! processes pid job-args) + #t))) + (while #t - (match (process-next-load-new-guix-revision-job conn) - (#f (unless (eq? 0 (sleep 5)) - (exit 0))) - (_ (simple-format #t "\nFinished processing job\n\n"))))) + (wait-on-processes) + (display-status) + (match (fetch-new-jobs) + (() + ;; Nothing to do + #f) + ((jobs ...) + (for-each + (lambda (job-args) + (let ((current-processes + (hash-count (const #t) processes))) + (when (< current-processes + max-processes) + (fork-and-process-job job-args)))) + jobs))) + (unless (eq? 0 (sleep 15)) + (exit 0)))) diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index a69b690..a1e22ba 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -24,7 +24,8 @@ #:use-module (guix-data-service model package-metadata) #:use-module (guix-data-service model derivation) #:export (log-for-job - process-next-load-new-guix-revision-job + fetch-unlocked-jobs + process-load-new-guix-revision-job select-job-for-commit select-jobs-and-events enqueue-load-new-guix-revision-job @@ -671,18 +672,20 @@ ORDER BY load_new_guix_revision_jobs.id DESC") (list (number->string n))))) result)) -(define (select-next-job-to-process conn) +(define (select-job-for-update conn id) (exec-query conn (string-append "SELECT id, commit, source, git_repository_id " "FROM load_new_guix_revision_jobs " - "WHERE succeeded_at IS NULL AND NOT EXISTS (" + "WHERE id = $1 AND succeeded_at IS NULL AND NOT EXISTS (" "SELECT 1 " "FROM load_new_guix_revision_job_events " ;; Skip jobs that have failed, to avoid trying them over and over again "WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure'" - ") ORDER BY id DESC LIMIT 1"))) + ") ORDER BY id DESC " + "FOR NO KEY UPDATE SKIP LOCKED") + (list id))) (define (record-job-event conn job-id event) (exec-query @@ -701,43 +704,73 @@ ORDER BY load_new_guix_revision_jobs.id DESC") "WHERE id = $1 ") (list id))) -(define (process-next-load-new-guix-revision-job conn) - (match (select-next-job-to-process conn) - (((id commit source git-repository-id)) - (let ((previous-output-port (current-output-port)) - (previous-error-port (current-error-port))) - (record-job-event conn id "start") - (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n" - id commit source) - (exec-query conn "BEGIN") - (if (or (guix-revision-exists? conn git-repository-id commit) - (eq? (log-time - (string-append "loading revision " commit) - (lambda () - (let ((result - (with-postgresql-connection - (simple-format #f "load-new-guix-revision ~A logging" id) - (lambda (logging-conn) - (insert-empty-log-entry logging-conn id) - (let ((logging-port (log-port id logging-conn))) - (set-current-output-port logging-port) - (set-current-error-port logging-port) - (let ((result - (parameterize ((current-build-output-port logging-port)) - (load-new-guix-revision conn git-repository-id commit)))) - (combine-log-parts! logging-conn id) - result)))))) - (set-current-output-port previous-output-port) - (set-current-error-port previous-error-port) - result))) - #t)) - (begin - (record-job-succeeded conn id) - (record-job-event conn id "success") - (exec-query conn "COMMIT") - #t) - (begin - (exec-query conn "ROLLBACK") - (record-job-event conn id "failure") - #f)))) - (_ #f))) +(define (fetch-unlocked-jobs conn) + (exec-query + conn + " +SELECT id FROM load_new_guix_revision_jobs +WHERE + succeeded_at IS NULL AND + NOT EXISTS ( + SELECT 1 + FROM load_new_guix_revision_job_events + -- Skip jobs that have failed, to avoid trying them over and over again + WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure' + ) +ORDER BY id DESC +FOR NO KEY UPDATE SKIP LOCKED")) + +(define (process-load-new-guix-revision-job id) + (with-postgresql-connection + (simple-format #f "load-new-guix-revision ~A" id) + (lambda (conn) + (exec-query conn "BEGIN") + + (match (select-job-for-update conn id) + (((id commit source git-repository-id)) + + ;; With a separate connection, outside of the transaction so the event + ;; gets persisted regardless. + (with-postgresql-connection + (simple-format #f "load-new-guix-revision ~A start-event" id) + (lambda (start-event-conn) + (record-job-event start-event-conn id "start"))) + + (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n" + id commit source) + + (if (or (guix-revision-exists? conn git-repository-id commit) + (eq? (log-time + (string-append "loading revision " commit) + (lambda () + (let* ((previous-output-port (current-output-port)) + (previous-error-port (current-error-port)) + (result + (with-postgresql-connection + (simple-format #f "load-new-guix-revision ~A logging" id) + (lambda (logging-conn) + (insert-empty-log-entry logging-conn id) + (let ((logging-port (log-port id logging-conn))) + (set-current-output-port logging-port) + (set-current-error-port logging-port) + (let ((result + (parameterize ((current-build-output-port logging-port)) + (load-new-guix-revision conn git-repository-id commit)))) + (combine-log-parts! logging-conn id) + result)))))) + (set-current-output-port previous-output-port) + (set-current-error-port previous-error-port) + result))) + #t)) + (begin + (record-job-succeeded conn id) + (record-job-event conn id "success") + (exec-query conn "COMMIT") + #t) + (begin + (exec-query conn "ROLLBACK") + (record-job-event conn id "failure") + #f))) + (() + (simple-format #t "job ~A not found to be processed\n" + id)))))) diff --git a/tests/jobs-load-new-guix-revision.scm b/tests/jobs-load-new-guix-revision.scm index 944feee..aee6225 100644 --- a/tests/jobs-load-new-guix-revision.scm +++ b/tests/jobs-load-new-guix-revision.scm @@ -32,13 +32,13 @@ (lambda (conn git-repository-id commit store-path) #t)) - (enqueue-load-new-guix-revision-job - conn - (git-repository-url->git-repository-id conn "test-url") - "test-commit" - "test-source") - - (process-next-load-new-guix-revision-job conn)))) + (match (enqueue-load-new-guix-revision-job + conn + (git-repository-url->git-repository-id conn "test-url") + "test-commit" + "test-source") + ((id) + (process-load-new-guix-revision-job id)))))) (test-equal "test build store item failure" #f @@ -48,13 +48,13 @@ (lambda (conn git-repository-id commit) #f)) - (enqueue-load-new-guix-revision-job - conn - (git-repository-url->git-repository-id conn "test-url") - "test-commit" - "test-source") - - (process-next-load-new-guix-revision-job conn))) + (match (enqueue-load-new-guix-revision-job + conn + (git-repository-url->git-repository-id conn "test-url") + "test-commit" + "test-source") + ((id) + (process-load-new-guix-revision-job id))))) (test-equal "test extract information failure" #f @@ -70,12 +70,12 @@ (lambda (conn git-repository-id commit store-path) #f)) - (enqueue-load-new-guix-revision-job - conn - (git-repository-url->git-repository-id conn "test-url") - "test-commit" - "test-source") - - (process-next-load-new-guix-revision-job conn)))))) + (match (enqueue-load-new-guix-revision-job + conn + (git-repository-url->git-repository-id conn "test-url") + "test-commit" + "test-source") + ((id) + (process-load-new-guix-revision-job id)))))))) (test-end) |