diff options
author | Christopher Baines <mail@cbaines.net> | 2024-07-20 00:20:20 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2024-07-20 00:20:20 +0100 |
commit | 2a80304e0c6b9c3c6fbb65afe8f2249d9b23fc1c (patch) | |
tree | deacb5e5fd0537fc74779b6ab40119f71421517f | |
parent | 7df7fd3e5269e6106ee879c42c94c4805746c151 (diff) | |
download | data-service-2a80304e0c6b9c3c6fbb65afe8f2249d9b23fc1c.tar data-service-2a80304e0c6b9c3c6fbb65afe8f2249d9b23fc1c.tar.gz |
Add worker thread utils
-rw-r--r-- | guix-data-service/utils.scm | 232 |
1 files changed, 232 insertions, 0 deletions
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index 736e24d..e5b4a45 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -29,6 +29,8 @@ #:use-module (ice-9 ports internal) #:use-module (ice-9 suspendable-ports) #:use-module (lzlib) + #:use-module ((guix build syscalls) + #:select (set-thread-name)) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers operations) @@ -48,6 +50,11 @@ with-resource-from-pool resource-pool-stats + make-worker-thread-channel + %worker-thread-default-timeout + call-with-worker-thread + worker-thread-timeout-error? + fibers-delay fibers-force @@ -464,6 +471,231 @@ available. Return the resource once PROC has returned." (raise-exception (make-resource-pool-timeout-error)))))) +(define %worker-thread-args + (make-parameter #f)) + +(define* (make-worker-thread-channel initializer + #:key (parallelism 1) + (delay-logger (lambda _ #f)) + (duration-logger (const #f)) + destructor + lifetime + (log-exception? (const #t)) + (expire-on-exception? #f) + (name "unnamed")) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the worker thread procedure." + (define thread-proc-vector + (make-vector parallelism #f)) + + (define (initializer/safe) + (let ((args + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running initializer in worker thread (~A): ~A:\n ~A\n" + name + initializer + exn) + #f) + (lambda () + (with-throw-handler #t + initializer + (lambda args + (backtrace)))) + #:unwind? #t))) + + (if args + args + ;; never give up, just keep retrying + (begin + (sleep 1) + (initializer/safe))))) + + (define (destructor/safe args) + (let ((success? + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running destructor in worker thread (~A): ~A:\n ~A\n" + name + destructor + exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (apply destructor args) + #t) + (lambda _ + (backtrace)))) + #:unwind? #t))) + + (or success? + #t + (begin + (sleep 1) + (destructor/safe args))))) + + (define (process thread-index channel args) + (let loop ((current-lifetime lifetime)) + (let ((exception? + (match (get-message channel) + (((? channel? reply) sent-time (? procedure? proc)) + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay + internal-time-units-per-second)) + + (let* ((start-time (get-internal-real-time)) + (response + (with-exception-handler + (lambda (exn) + (list 'worker-thread-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (vector-set! thread-proc-vector + thread-index + proc) + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (start-stack + 'worker-thread + (apply proc args))) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))) + (lambda args + (when (match args + (('%exception exn) + (log-exception? exn)) + (_ #t)) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" args) + (backtrace))))) + #:unwind? #t))) + (put-message reply + response) + + (vector-set! thread-proc-vector + thread-index + #f) + + (match response + (('worker-thread-error duration _) + (when duration-logger + (duration-logger duration proc)) + #t) + ((duration . _) + (when duration-logger + (duration-logger duration proc)) + #f)))))))) + (unless (and expire-on-exception? + exception?) + (if (number? current-lifetime) + (unless (< current-lifetime 0) + (loop (if current-lifetime + (- current-lifetime 1) + #f))) + (loop #f)))))) + + (let ((channel (make-channel))) + (for-each + (lambda (thread-index) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append + name " w t " + (number->string thread-index)))) + (const #t)) + + (let init ((args (initializer/safe))) + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "worker-thread-channel: exception: ~A\n" exn)) + (lambda () + (parameterize ((%worker-thread-args args)) + (process thread-index channel args))) + #:unwind? #t) + + (when destructor + (destructor/safe args)) + + (init (initializer/safe)))))) + (iota parallelism)) + + (values channel + thread-proc-vector))) + +(define &worker-thread-timeout + (make-exception-type '&worker-thread-timeout + &error + '())) + +(define make-worker-thread-timeout-error + (record-constructor &worker-thread-timeout)) + +(define worker-thread-timeout-error? + (record-predicate &worker-thread-timeout)) + +(define %worker-thread-default-timeout + (make-parameter 30)) + +(define* (call-with-worker-thread channel proc #:key duration-logger + (timeout (%worker-thread-default-timeout))) + "Send PROC to the worker thread through CHANNEL. Return the result of PROC. +If already in the worker thread, call PROC immediately." + (let ((args (%worker-thread-args))) + (if args + (apply proc args) + (let* ((reply (make-channel)) + (operation-success? + (perform-operation + (let ((put + (wrap-operation + (put-operation channel + (list reply + (get-internal-real-time) + proc)) + (const #t)))) + + (if timeout + (choice-operation + put + (wrap-operation (sleep-operation timeout) + (const #f))) + put))))) + + (unless operation-success? + (raise-exception + (make-worker-thread-timeout-error))) + + (match (get-message reply) + (('worker-thread-error duration exn) + (when duration-logger + (duration-logger duration)) + (raise-exception exn)) + ((duration . result) + (when duration-logger + (duration-logger duration)) + (apply values result))))))) + (define-record-type <fibers-promise> (make-fibers-promise thunk values-box evaluated-condition) fibers-promise? |