aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-05-20 11:36:39 +0100
committerChristopher Baines <mail@cbaines.net>2023-05-20 11:36:39 +0100
commit01ca366eb547dec12ab34b5f6bb2c8ae97e40287 (patch)
tree61d2d4a512faeaeefe6a732fe0bbe36f8357648b
parent27cca18e6a1e480203ee3541e08532196c24a411 (diff)
downloadnar-herder-01ca366eb547dec12ab34b5f6bb2c8ae97e40287.tar
nar-herder-01ca366eb547dec12ab34b5f6bb2c8ae97e40287.tar.gz
Port worker-thread improvements from the build coordiantor
-rw-r--r--nar-herder/utils.scm170
1 files changed, 120 insertions, 50 deletions
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index af78c0a..50c6cf9 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -337,69 +337,139 @@ falling back to en_US.utf8\n"
(define* (make-worker-thread-set initializer
#:key (parallelism 1)
(delay-logger (lambda _ #f))
+ (duration-logger (const #f))
destructor
lifetime
- (log-exception? (const #t)))
- "Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
+ (log-exception? (const #t))
+ (expire-on-exception? #f)
+ (name "unnamed"))
(define param
(make-parameter #f))
+ (define (initializer/safe)
+ (let ((args
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running initializer in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ initializer
+ (lambda args
+ (backtrace))))
+ #:unwind? #t)))
+
+ (if args
+ args
+ ;; never give up, just keep retrying
+ (begin
+ (sleep 5)
+ (initializer/safe)))))
+
+ (define (destructor/safe args)
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running destructor in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply destructor args)
+ #t)
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)))
+
+ (or success?
+ #t
+ (begin
+ (sleep 5)
+ (destructor/safe args)))))
+
(let ((channel (make-channel)))
(for-each
(lambda _
(call-with-new-thread
(lambda ()
- (let init ((args (initializer)))
+ (let init ((args (initializer/safe)))
(parameterize ((param args))
(let loop ((current-lifetime lifetime))
- (match (get-message channel)
- (((? channel? reply) sent-time (? procedure? proc))
- (let ((time-delay
- (- (get-internal-real-time)
- sent-time)))
- (delay-logger (/ time-delay
- internal-time-units-per-second))
- (put-message
- reply
- (let ((start-time (get-internal-real-time)))
- (with-exception-handler
- (lambda (exn)
- (list 'worker-thread-error
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (call-with-values
- (lambda ()
- (apply proc args))
- (lambda vals
- (cons (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- vals))))
- (lambda args
- (when (match args
- (('%exception exn)
- (log-exception? exn))
- (_ #t))
- (simple-format
- (current-error-port)
- "worker-thread: exception: ~A\n" args)
- (backtrace)))))
- #:unwind? #t))))))
- (if (number? current-lifetime)
- (unless (< current-lifetime 0)
- (loop (if current-lifetime
- (- current-lifetime 1)
- #f)))
- (loop #f))))
+ (let ((exception?
+ (match (get-message channel)
+ (((? channel? reply) sent-time (? procedure? proc))
+ (let ((time-delay
+ (- (get-internal-real-time)
+ sent-time)))
+ (delay-logger (/ time-delay
+ internal-time-units-per-second))
+
+ (let* ((start-time (get-internal-real-time))
+ (response
+ (with-exception-handler
+ (lambda (exn)
+ (list 'worker-thread-error
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ exn))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (apply proc args))
+ (lambda vals
+ (cons (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ vals))))
+ (lambda args
+ (when (match args
+ (('%exception exn)
+ (log-exception? exn))
+ (_ #t))
+ (simple-format
+ (current-error-port)
+ "worker-thread: exception: ~A\n" args)
+ (backtrace)))))
+ #:unwind? #t)))
+ (put-message reply
+ response)
+
+ (match response
+ (('worker-thread-error duration _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #t)
+ ((duration . _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #f))))))))
+
+ (unless (and expire-on-exception?
+ exception?)
+ (if (number? current-lifetime)
+ (unless (< current-lifetime 0)
+ (loop (if current-lifetime
+ (- current-lifetime 1)
+ #f)))
+ (loop #f))))))
+
(when destructor
- (apply destructor args))
- (init (initializer))))))
+ (destructor/safe args))
+
+ (init (initializer/safe))))))
(iota parallelism))
(worker-thread-set channel