From f9af463e81d9fcc712d7c58a491101d8d17457e8 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 31 May 2021 23:07:13 +0100 Subject: Refactor the code around work queues --- guix-build-coordinator/utils.scm | 98 +++++++++++++++++++--------------------- 1 file changed, 47 insertions(+), 51 deletions(-) diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index 45302c8..1b7556e 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -711,42 +711,22 @@ References: ~a~%" '() running-job-args)))) - (define (thread-process-job thread-index) - (and=> - (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "thread ~A: exception for deq!: ~A\n" - thread-index exn) - (unlock-mutex queue-mutex) - #f) - (lambda () - (deq! queue)) - #:unwind? #t) - (lambda (job-args) - (hash-set! running-job-args - thread-index - job-args) - (unlock-mutex queue-mutex) - (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "job raised exception: ~A\n" - job-args)) - (lambda () - (with-throw-handler #t - (lambda () - (apply proc job-args)) - (lambda (key . args) - (simple-format (current-error-port) - "exception when handling job: ~A ~A\n" - key args) - (backtrace)))) - #:unwind? #t) - (with-mutex queue-mutex - (hash-set! running-job-args - thread-index - #f))))) + (define (thread-process-job job-args) + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "job raised exception: ~A\n" + job-args)) + (lambda () + (with-throw-handler #t + (lambda () + (apply proc job-args)) + (lambda (key . args) + (simple-format (current-error-port) + "exception when handling job: ~A ~A\n" + key args) + (backtrace)))) + #:unwind? #t)) (define (start-thread thread-index) (define (too-many-threads?) @@ -774,21 +754,37 @@ References: ~a~%" (if (too-many-threads?) (stop-thread) - (if (q-empty? queue) - ;; #f from wait-condition-variable indicates a timeout - (if (wait-condition-variable - job-available - queue-mutex - (+ 9 (time-second (current-time)))) - ;; Another thread could have taken the job in the - ;; mean time - (if (q-empty? queue) - (unlock-mutex queue-mutex) - ;; thread-process-job will unlock queue-mutex - (thread-process-job thread-index)) - (unlock-mutex queue-mutex)) - (thread-process-job thread-index))) - (loop))))) + (let ((job-args + (if (q-empty? queue) + ;; #f from wait-condition-variable indicates a timeout + (if (wait-condition-variable + job-available + queue-mutex + (+ 9 (time-second (current-time)))) + ;; Another thread could have taken + ;; the job in the mean time + (if (q-empty? queue) + #f + (deq! queue)) + #f) + (deq! queue)))) + + (if job-args + (begin + (hash-set! running-job-args + thread-index + job-args) + + (unlock-mutex queue-mutex) + (thread-process-job job-args) + + (with-mutex queue-mutex + (hash-set! running-job-args + thread-index + #f))) + (unlock-mutex queue-mutex)) + + (loop))))))) (define start-new-threads-if-necessary (let ((previous-thread-started-at (make-time time-monotonic 0 0))) -- cgit v1.2.3