diff options
author | Christopher Baines <mail@cbaines.net> | 2023-05-20 11:36:39 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2023-05-20 11:36:39 +0100 |
commit | 01ca366eb547dec12ab34b5f6bb2c8ae97e40287 (patch) | |
tree | 61d2d4a512faeaeefe6a732fe0bbe36f8357648b | |
parent | 27cca18e6a1e480203ee3541e08532196c24a411 (diff) | |
download | nar-herder-01ca366eb547dec12ab34b5f6bb2c8ae97e40287.tar nar-herder-01ca366eb547dec12ab34b5f6bb2c8ae97e40287.tar.gz |
Port worker-thread improvements from the build coordiantor
-rw-r--r-- | nar-herder/utils.scm | 170 |
1 files changed, 120 insertions, 50 deletions
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index af78c0a..50c6cf9 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -337,69 +337,139 @@ falling back to en_US.utf8\n" (define* (make-worker-thread-set initializer #:key (parallelism 1) (delay-logger (lambda _ #f)) + (duration-logger (const #f)) destructor lifetime - (log-exception? (const #t))) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." + (log-exception? (const #t)) + (expire-on-exception? #f) + (name "unnamed")) (define param (make-parameter #f)) + (define (initializer/safe) + (let ((args + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running initializer in worker thread (~A): ~A:\n ~A\n" + name + initializer + exn) + #f) + (lambda () + (with-throw-handler #t + initializer + (lambda args + (backtrace)))) + #:unwind? #t))) + + (if args + args + ;; never give up, just keep retrying + (begin + (sleep 5) + (initializer/safe))))) + + (define (destructor/safe args) + (let ((success? + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running destructor in worker thread (~A): ~A:\n ~A\n" + name + initializer + exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (apply destructor args) + #t) + (lambda _ + (backtrace)))) + #:unwind? #t))) + + (or success? + #t + (begin + (sleep 5) + (destructor/safe args))))) + (let ((channel (make-channel))) (for-each (lambda _ (call-with-new-thread (lambda () - (let init ((args (initializer))) + (let init ((args (initializer/safe))) (parameterize ((param args)) (let loop ((current-lifetime lifetime)) - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second)) - (put-message - reply - (let ((start-time (get-internal-real-time))) - (with-exception-handler - (lambda (exn) - (list 'worker-thread-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (call-with-values - (lambda () - (apply proc args)) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - vals)))) - (lambda args - (when (match args - (('%exception exn) - (log-exception? exn)) - (_ #t)) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" args) - (backtrace))))) - #:unwind? #t)))))) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))) + (let ((exception? + (match (get-message channel) + (((? channel? reply) sent-time (? procedure? proc)) + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay + internal-time-units-per-second)) + + (let* ((start-time (get-internal-real-time)) + (response + (with-exception-handler + (lambda (exn) + (list 'worker-thread-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))) + (lambda args + (when (match args + (('%exception exn) + (log-exception? exn)) + (_ #t)) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" args) + (backtrace))))) + #:unwind? #t))) + (put-message reply + response) + + (match response + (('worker-thread-error duration _) + (when duration-logger + (duration-logger duration proc)) + #t) + ((duration . _) + (when duration-logger + (duration-logger duration proc)) + #f)))))))) + + (unless (and expire-on-exception? + exception?) + (if (number? current-lifetime) + (unless (< current-lifetime 0) + (loop (if current-lifetime + (- current-lifetime 1) + #f))) + (loop #f)))))) + (when destructor - (apply destructor args)) - (init (initializer)))))) + (destructor/safe args)) + + (init (initializer/safe)))))) (iota parallelism)) (worker-thread-set channel |