aboutsummaryrefslogtreecommitdiff
path: root/guix-data-service/utils.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-data-service/utils.scm')
-rw-r--r--guix-data-service/utils.scm448
1 files changed, 412 insertions, 36 deletions
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index d01fb5c..393db95 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -17,7 +17,9 @@
(define-module (guix-data-service utils)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
#:use-module (srfi srfi-11)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
@@ -27,6 +29,8 @@
#:use-module (ice-9 ports internal)
#:use-module (ice-9 suspendable-ports)
#:use-module (lzlib)
+ #:use-module ((guix build syscalls)
+ #:select (set-thread-name))
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers operations)
@@ -40,12 +44,26 @@
resource-pool-default-timeout
%resource-pool-timeout-handler
+ resource-pool-timeout-error?
make-resource-pool
destroy-resource-pool
call-with-resource-from-pool
with-resource-from-pool
resource-pool-stats
+ make-worker-thread-channel
+ %worker-thread-default-timeout
+ call-with-worker-thread
+ worker-thread-timeout-error?
+
+ fiberize
+
+ fibers-delay
+ fibers-force
+
+ fibers-batch-for-each
+ fibers-for-each
+
parallel-via-fibers
par-map&
letpar&
@@ -80,6 +98,12 @@
(define-syntax-rule (prevent-inlining-for-tests var)
(set! var var))
+(define-record-type <resource-pool>
+ (make-resource-pool-record name channel)
+ resource-pool?
+ (name resource-pool-name)
+ (channel resource-pool-channel))
+
(define* (make-resource-pool initializer max-size
#:key (min-size max-size)
(idle-seconds #f)
@@ -87,6 +111,7 @@
(duration-logger (const #f))
destructor
lifetime
+ scheduler
(name "unnamed"))
(define (initializer/safe)
(with-exception-handler
@@ -135,6 +160,13 @@
(checkout-failure-count 0))
(spawn-fiber
(lambda ()
+ (when idle-seconds
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (sleep idle-seconds)
+ (put-message channel '(check-for-idle-resources))))))
+
(while #t
(with-exception-handler
(lambda (exn)
@@ -149,15 +181,7 @@
(waiters '())
(resources-last-used '()))
- (match (if idle-seconds
- (perform-operation
- (choice-operation
- (get-operation channel)
- (wrap-operation
- ;; TODO Do something smarter
- (sleep-operation 10)
- (const '(check-for-idle-resources)))))
- (get-message channel))
+ (match (get-message channel)
(('checkout reply)
(if (null? available)
(if (= (length resources) max-size)
@@ -244,13 +268,15 @@
(waiters . ,(length waiters))
(checkout-failure-count . ,checkout-failure-count))))
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation reply stats)
- (const #t))
- (wrap-operation (sleep-operation 0.2)
- (const #f)))))
+ (spawn-fiber
+ (lambda ()
+ (perform-operation
+ (choice-operation
+ (wrap-operation
+ (put-operation reply stats)
+ (const #t))
+ (wrap-operation (sleep-operation 1)
+ (const #f)))))))
(loop resources
available
@@ -317,13 +343,16 @@
available
waiters
resources-last-used)))))
- #:unwind? #t))))
+ #:unwind? #t)))
+ (or scheduler
+ (current-scheduler)))
- channel))
+ (make-resource-pool-record name channel)))
(define (destroy-resource-pool pool)
(let ((reply (make-channel)))
- (put-message pool (list 'destroy reply))
+ (put-message (resource-pool-channel pool)
+ (list 'destroy reply))
(let ((msg (get-message reply)))
(unless (eq? msg 'destroy-success)
(error msg)))))
@@ -334,7 +363,7 @@
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
&error
- '()))
+ '(name)))
(define make-resource-pool-timeout-error
(record-constructor &resource-pool-timeout))
@@ -362,7 +391,8 @@ available. Return the resource once PROC has returned."
(perform-operation
(choice-operation
(wrap-operation
- (put-operation pool `(checkout ,reply))
+ (put-operation (resource-pool-channel pool)
+ `(checkout ,reply))
(const #t))
(wrap-operation (sleep-operation timeout-or-default)
(const #f))))
@@ -391,7 +421,8 @@ available. Return the resource once PROC has returned."
response))
#f)))
(let loop ()
- (put-message pool `(checkout ,reply))
+ (put-message (resource-pool-channel pool)
+ `(checkout ,reply))
(let ((response (get-message reply)))
(if (eq? response 'resource-pool-retry-checkout)
(loop)
@@ -403,11 +434,12 @@ available. Return the resource once PROC has returned."
(timeout-handler pool proc timeout))
(raise-exception
- (make-resource-pool-timeout-error)))
+ (make-resource-pool-timeout-error (resource-pool-name pool))))
(with-exception-handler
(lambda (exception)
- (put-message pool `(return ,resource))
+ (put-message (resource-pool-channel pool)
+ `(return ,resource))
(raise-exception exception))
(lambda ()
(call-with-values
@@ -418,14 +450,15 @@ available. Return the resource once PROC has returned."
(lambda _
(backtrace))))
(lambda vals
- (put-message pool `(return ,resource))
+ (put-message (resource-pool-channel pool)
+ `(return ,resource))
(apply values vals))))
#:unwind? #t)))
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
- pool
- (lambda (resource) exp ...)))
+ pool
+ (lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5))
(let ((reply (make-channel))
@@ -433,10 +466,13 @@ available. Return the resource once PROC has returned."
(perform-operation
(choice-operation
(wrap-operation
- (put-operation pool `(stats ,reply))
+ (put-operation (resource-pool-channel pool)
+ `(stats ,reply))
(const #t))
(wrap-operation (sleep-operation timeout)
- (const #f))))
+ (lambda _
+ (raise-exception
+ (make-resource-pool-timeout-error))))))
(let ((time-remaining
(- timeout
@@ -444,16 +480,356 @@ available. Return the resource once PROC has returned."
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
- (let ((response
- (perform-operation
- (choice-operation
- (get-operation reply)
- (wrap-operation (sleep-operation time-remaining)
- (const #f))))))
- response)
+ (perform-operation
+ (choice-operation
+ (get-operation reply)
+ (wrap-operation (sleep-operation time-remaining)
+ (lambda _
+ (raise-exception
+ (make-resource-pool-timeout-error))))))
(raise-exception
(make-resource-pool-timeout-error))))))
+(define %worker-thread-args
+ (make-parameter #f))
+
+(define* (make-worker-thread-channel initializer
+ #:key (parallelism 1)
+ (delay-logger (lambda _ #f))
+ (duration-logger (const #f))
+ destructor
+ lifetime
+ (log-exception? (const #t))
+ (expire-on-exception? #f)
+ (name "unnamed"))
+ "Return a channel used to offload work to a dedicated thread. ARGS are the
+arguments of the worker thread procedure."
+ (define thread-proc-vector
+ (make-vector parallelism #f))
+
+ (define (initializer/safe)
+ (let ((args
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running initializer in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ initializer
+ (lambda args
+ (backtrace))))
+ #:unwind? #t)))
+
+ (if args
+ args
+ ;; never give up, just keep retrying
+ (begin
+ (sleep 1)
+ (initializer/safe)))))
+
+ (define (destructor/safe args)
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running destructor in worker thread (~A): ~A:\n ~A\n"
+ name
+ destructor
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply destructor args)
+ #t)
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)))
+
+ (or success?
+ #t
+ (begin
+ (sleep 1)
+ (destructor/safe args)))))
+
+ (define (process thread-index channel args)
+ (let loop ((current-lifetime lifetime))
+ (let ((exception?
+ (match (get-message channel)
+ (((? channel? reply) sent-time (? procedure? proc))
+ (let ((time-delay
+ (- (get-internal-real-time)
+ sent-time)))
+ (delay-logger (/ time-delay
+ internal-time-units-per-second))
+
+ (let* ((start-time (get-internal-real-time))
+ (response
+ (with-exception-handler
+ (lambda (exn)
+ (list 'worker-thread-error
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ exn))
+ (lambda ()
+ (vector-set! thread-proc-vector
+ thread-index
+ proc)
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (start-stack
+ 'worker-thread
+ (apply proc args)))
+ (lambda vals
+ (cons (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ vals))))
+ (lambda args
+ (when (match args
+ (('%exception exn)
+ (log-exception? exn))
+ (_ #t))
+ (simple-format
+ (current-error-port)
+ "worker-thread: exception: ~A\n" args)
+ (backtrace)))))
+ #:unwind? #t)))
+ (put-message reply
+ response)
+
+ (vector-set! thread-proc-vector
+ thread-index
+ #f)
+
+ (match response
+ (('worker-thread-error duration _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #t)
+ ((duration . _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #f))))))))
+ (unless (and expire-on-exception?
+ exception?)
+ (if (number? current-lifetime)
+ (unless (< current-lifetime 0)
+ (loop (if current-lifetime
+ (- current-lifetime 1)
+ #f)))
+ (loop #f))))))
+
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda (thread-index)
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append
+ name " w t "
+ (number->string thread-index))))
+ (const #t))
+
+ (let init ((args (initializer/safe)))
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "worker-thread-channel: exception: ~A\n" exn))
+ (lambda ()
+ (parameterize ((%worker-thread-args args))
+ (process thread-index channel args)))
+ #:unwind? #t)
+
+ (when destructor
+ (destructor/safe args))
+
+ (init (initializer/safe))))))
+ (iota parallelism))
+
+ (values channel
+ thread-proc-vector)))
+
+(define &worker-thread-timeout
+ (make-exception-type '&worker-thread-timeout
+ &error
+ '()))
+
+(define make-worker-thread-timeout-error
+ (record-constructor &worker-thread-timeout))
+
+(define worker-thread-timeout-error?
+ (record-predicate &worker-thread-timeout))
+
+(define %worker-thread-default-timeout
+ (make-parameter 30))
+
+(define* (call-with-worker-thread channel proc #:key duration-logger
+ (timeout (%worker-thread-default-timeout)))
+ "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
+If already in the worker thread, call PROC immediately."
+ (let ((args (%worker-thread-args)))
+ (if args
+ (apply proc args)
+ (let* ((reply (make-channel))
+ (operation-success?
+ (perform-operation
+ (let ((put
+ (wrap-operation
+ (put-operation channel
+ (list reply
+ (get-internal-real-time)
+ proc))
+ (const #t))))
+
+ (if timeout
+ (choice-operation
+ put
+ (wrap-operation (sleep-operation timeout)
+ (const #f)))
+ put)))))
+
+ (unless operation-success?
+ (raise-exception
+ (make-worker-thread-timeout-error)))
+
+ (match (get-message reply)
+ (('worker-thread-error duration exn)
+ (when duration-logger
+ (duration-logger duration))
+ (raise-exception exn))
+ ((duration . result)
+ (when duration-logger
+ (duration-logger duration))
+ (apply values result)))))))
+
+(define* (fiberize proc #:key (parallelism 1))
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda _
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (let ((reply-channel args (car+cdr
+ (get-message channel))))
+ (put-message
+ reply-channel
+ (with-exception-handler
+ (lambda (exn)
+ (cons 'exception exn))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (apply proc args))
+ (lambda vals
+ (cons 'result vals))))
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)))))
+ #:parallel? #t))
+ (iota parallelism))
+
+ (lambda args
+ (let ((reply-channel (make-channel)))
+ (put-message channel (cons reply-channel args))
+ (match (get-message reply-channel)
+ (('result . vals) (apply values vals))
+ (('exception . exn) (raise-exception exn)))))))
+
+(define-record-type <fibers-promise>
+ (make-fibers-promise thunk values-box evaluated-condition)
+ fibers-promise?
+ (thunk fibers-promise-thunk)
+ (values-box fibers-promise-values-box)
+ (evaluated-condition fibers-promise-evaluated-condition))
+
+(define (fibers-delay thunk)
+ (make-fibers-promise
+ thunk
+ (make-atomic-box #f)
+ (make-condition)))
+
+(define (fibers-force fp)
+ (let ((res (atomic-box-compare-and-swap!
+ (fibers-promise-values-box fp)
+ #f
+ 'started)))
+ (if (eq? #f res)
+ (call-with-values
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (atomic-box-set! (fibers-promise-values-box fp)
+ exn)
+ (signal-condition!
+ (fibers-promise-evaluated-condition fp))
+ (raise-exception exn))
+ (fibers-promise-thunk fp)
+ #:unwind? #t))
+ (lambda vals
+ (atomic-box-set! (fibers-promise-values-box fp)
+ vals)
+ (signal-condition!
+ (fibers-promise-evaluated-condition fp))
+ (apply values vals)))
+ (if (eq? res 'started)
+ (begin
+ (wait (fibers-promise-evaluated-condition fp))
+ (let ((result (atomic-box-ref (fibers-promise-values-box fp))))
+ (if (exception? result)
+ (raise-exception result)
+ (apply values result))))
+ (if (exception? res)
+ (raise-exception res)
+ (apply values res))))))
+
+(define (fibers-batch-for-each proc batch-size . lists)
+ ;; Like split-at, but don't care about the order of the resulting lists, and
+ ;; don't error if the list is shorter than i elements
+ (define (split-at* lst i)
+ (let lp ((l lst) (n i) (acc '()))
+ (if (or (<= n 0) (null? l))
+ (values (reverse! acc) l)
+ (lp (cdr l) (- n 1) (cons (car l) acc)))))
+
+ ;; As this can be called with lists with tens of thousands of items in them,
+ ;; batch the
+ (define (get-batch lists)
+ (let ((split-lists
+ (map (lambda (lst)
+ (let ((batch rest (split-at* lst batch-size)))
+ (cons batch rest)))
+ lists)))
+ (values (map car split-lists)
+ (map cdr split-lists))))
+
+ (let loop ((lists lists))
+ (call-with-values
+ (lambda ()
+ (get-batch lists))
+ (lambda (batch rest)
+ (apply par-map& proc batch)
+ (unless (null? (car rest))
+ (loop rest)))))
+ *unspecified*)
+
+(define (fibers-for-each proc . lists)
+ (apply fibers-batch-for-each proc 20 lists))
+
(define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel)))
(spawn-fiber