aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2021-05-31 23:07:13 +0100
committerChristopher Baines <mail@cbaines.net>2021-05-31 23:07:13 +0100
commitf9af463e81d9fcc712d7c58a491101d8d17457e8 (patch)
tree4bdf642be2bed124a91280c6f87d58b644e2ed2b
parent953ef433118442a026b20e71033ba81aa81771b7 (diff)
downloadbuild-coordinator-f9af463e81d9fcc712d7c58a491101d8d17457e8.tar
build-coordinator-f9af463e81d9fcc712d7c58a491101d8d17457e8.tar.gz
Refactor the code around work queues
-rw-r--r--guix-build-coordinator/utils.scm98
1 files changed, 47 insertions, 51 deletions
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index 45302c8..1b7556e 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -711,42 +711,22 @@ References: ~a~%"
'()
running-job-args))))
- (define (thread-process-job thread-index)
- (and=>
- (with-exception-handler
- (lambda (exn)
- (simple-format (current-error-port)
- "thread ~A: exception for deq!: ~A\n"
- thread-index exn)
- (unlock-mutex queue-mutex)
- #f)
- (lambda ()
- (deq! queue))
- #:unwind? #t)
- (lambda (job-args)
- (hash-set! running-job-args
- thread-index
- job-args)
- (unlock-mutex queue-mutex)
- (with-exception-handler
- (lambda (exn)
- (simple-format (current-error-port)
- "job raised exception: ~A\n"
- job-args))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply proc job-args))
- (lambda (key . args)
- (simple-format (current-error-port)
- "exception when handling job: ~A ~A\n"
- key args)
- (backtrace))))
- #:unwind? #t)
- (with-mutex queue-mutex
- (hash-set! running-job-args
- thread-index
- #f)))))
+ (define (thread-process-job job-args)
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "job raised exception: ~A\n"
+ job-args))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply proc job-args))
+ (lambda (key . args)
+ (simple-format (current-error-port)
+ "exception when handling job: ~A ~A\n"
+ key args)
+ (backtrace))))
+ #:unwind? #t))
(define (start-thread thread-index)
(define (too-many-threads?)
@@ -774,21 +754,37 @@ References: ~a~%"
(if (too-many-threads?)
(stop-thread)
- (if (q-empty? queue)
- ;; #f from wait-condition-variable indicates a timeout
- (if (wait-condition-variable
- job-available
- queue-mutex
- (+ 9 (time-second (current-time))))
- ;; Another thread could have taken the job in the
- ;; mean time
- (if (q-empty? queue)
- (unlock-mutex queue-mutex)
- ;; thread-process-job will unlock queue-mutex
- (thread-process-job thread-index))
- (unlock-mutex queue-mutex))
- (thread-process-job thread-index)))
- (loop)))))
+ (let ((job-args
+ (if (q-empty? queue)
+ ;; #f from wait-condition-variable indicates a timeout
+ (if (wait-condition-variable
+ job-available
+ queue-mutex
+ (+ 9 (time-second (current-time))))
+ ;; Another thread could have taken
+ ;; the job in the mean time
+ (if (q-empty? queue)
+ #f
+ (deq! queue))
+ #f)
+ (deq! queue))))
+
+ (if job-args
+ (begin
+ (hash-set! running-job-args
+ thread-index
+ job-args)
+
+ (unlock-mutex queue-mutex)
+ (thread-process-job job-args)
+
+ (with-mutex queue-mutex
+ (hash-set! running-job-args
+ thread-index
+ #f)))
+ (unlock-mutex queue-mutex))
+
+ (loop)))))))
(define start-new-threads-if-necessary
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))