aboutsummaryrefslogtreecommitdiff
path: root/guix-data-service/utils.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-data-service/utils.scm')
-rw-r--r--guix-data-service/utils.scm232
1 files changed, 232 insertions, 0 deletions
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index 736e24d..e5b4a45 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -29,6 +29,8 @@
#:use-module (ice-9 ports internal)
#:use-module (ice-9 suspendable-ports)
#:use-module (lzlib)
+ #:use-module ((guix build syscalls)
+ #:select (set-thread-name))
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers operations)
@@ -48,6 +50,11 @@
with-resource-from-pool
resource-pool-stats
+ make-worker-thread-channel
+ %worker-thread-default-timeout
+ call-with-worker-thread
+ worker-thread-timeout-error?
+
fibers-delay
fibers-force
@@ -464,6 +471,231 @@ available. Return the resource once PROC has returned."
(raise-exception
(make-resource-pool-timeout-error))))))
+(define %worker-thread-args
+ (make-parameter #f))
+
+(define* (make-worker-thread-channel initializer
+ #:key (parallelism 1)
+ (delay-logger (lambda _ #f))
+ (duration-logger (const #f))
+ destructor
+ lifetime
+ (log-exception? (const #t))
+ (expire-on-exception? #f)
+ (name "unnamed"))
+ "Return a channel used to offload work to a dedicated thread. ARGS are the
+arguments of the worker thread procedure."
+ (define thread-proc-vector
+ (make-vector parallelism #f))
+
+ (define (initializer/safe)
+ (let ((args
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running initializer in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ initializer
+ (lambda args
+ (backtrace))))
+ #:unwind? #t)))
+
+ (if args
+ args
+ ;; never give up, just keep retrying
+ (begin
+ (sleep 1)
+ (initializer/safe)))))
+
+ (define (destructor/safe args)
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running destructor in worker thread (~A): ~A:\n ~A\n"
+ name
+ destructor
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply destructor args)
+ #t)
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)))
+
+ (or success?
+ #t
+ (begin
+ (sleep 1)
+ (destructor/safe args)))))
+
+ (define (process thread-index channel args)
+ (let loop ((current-lifetime lifetime))
+ (let ((exception?
+ (match (get-message channel)
+ (((? channel? reply) sent-time (? procedure? proc))
+ (let ((time-delay
+ (- (get-internal-real-time)
+ sent-time)))
+ (delay-logger (/ time-delay
+ internal-time-units-per-second))
+
+ (let* ((start-time (get-internal-real-time))
+ (response
+ (with-exception-handler
+ (lambda (exn)
+ (list 'worker-thread-error
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ exn))
+ (lambda ()
+ (vector-set! thread-proc-vector
+ thread-index
+ proc)
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (start-stack
+ 'worker-thread
+ (apply proc args)))
+ (lambda vals
+ (cons (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ vals))))
+ (lambda args
+ (when (match args
+ (('%exception exn)
+ (log-exception? exn))
+ (_ #t))
+ (simple-format
+ (current-error-port)
+ "worker-thread: exception: ~A\n" args)
+ (backtrace)))))
+ #:unwind? #t)))
+ (put-message reply
+ response)
+
+ (vector-set! thread-proc-vector
+ thread-index
+ #f)
+
+ (match response
+ (('worker-thread-error duration _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #t)
+ ((duration . _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #f))))))))
+ (unless (and expire-on-exception?
+ exception?)
+ (if (number? current-lifetime)
+ (unless (< current-lifetime 0)
+ (loop (if current-lifetime
+ (- current-lifetime 1)
+ #f)))
+ (loop #f))))))
+
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda (thread-index)
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append
+ name " w t "
+ (number->string thread-index))))
+ (const #t))
+
+ (let init ((args (initializer/safe)))
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "worker-thread-channel: exception: ~A\n" exn))
+ (lambda ()
+ (parameterize ((%worker-thread-args args))
+ (process thread-index channel args)))
+ #:unwind? #t)
+
+ (when destructor
+ (destructor/safe args))
+
+ (init (initializer/safe))))))
+ (iota parallelism))
+
+ (values channel
+ thread-proc-vector)))
+
+(define &worker-thread-timeout
+ (make-exception-type '&worker-thread-timeout
+ &error
+ '()))
+
+(define make-worker-thread-timeout-error
+ (record-constructor &worker-thread-timeout))
+
+(define worker-thread-timeout-error?
+ (record-predicate &worker-thread-timeout))
+
+(define %worker-thread-default-timeout
+ (make-parameter 30))
+
+(define* (call-with-worker-thread channel proc #:key duration-logger
+ (timeout (%worker-thread-default-timeout)))
+ "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
+If already in the worker thread, call PROC immediately."
+ (let ((args (%worker-thread-args)))
+ (if args
+ (apply proc args)
+ (let* ((reply (make-channel))
+ (operation-success?
+ (perform-operation
+ (let ((put
+ (wrap-operation
+ (put-operation channel
+ (list reply
+ (get-internal-real-time)
+ proc))
+ (const #t))))
+
+ (if timeout
+ (choice-operation
+ put
+ (wrap-operation (sleep-operation timeout)
+ (const #f)))
+ put)))))
+
+ (unless operation-success?
+ (raise-exception
+ (make-worker-thread-timeout-error)))
+
+ (match (get-message reply)
+ (('worker-thread-error duration exn)
+ (when duration-logger
+ (duration-logger duration))
+ (raise-exception exn))
+ ((duration . result)
+ (when duration-logger
+ (duration-logger duration))
+ (apply values result)))))))
+
(define-record-type <fibers-promise>
(make-fibers-promise thunk values-box evaluated-condition)
fibers-promise?