aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2025-02-04 12:52:12 +0000
committerChristopher Baines <mail@cbaines.net>2025-02-04 13:16:10 +0000
commita11cd24e57ad334c26700691dca0164f410cc392 (patch)
tree70431ab47ac4b651f7b9d991607be3307c3835d7
parentaadbee0d0e452670719d030e5a40ff115ba10c68 (diff)
downloadknots-a11cd24e57ad334c26700691dca0164f410cc392.tar
knots-a11cd24e57ad334c26700691dca0164f410cc392.tar.gz
Improve resource pool performance when there are lots of waiters
-rw-r--r--knots/resource-pool.scm323
-rw-r--r--tests/resource-pool.scm40
2 files changed, 223 insertions, 140 deletions
diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm
index fddbb60..8f7bcc9 100644
--- a/knots/resource-pool.scm
+++ b/knots/resource-pool.scm
@@ -29,6 +29,7 @@
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
+ #:use-module (knots)
#:use-module (knots parallelism)
#:export (resource-pool?
@@ -38,9 +39,6 @@
resource-pool-configuration
destroy-resource-pool
- resource-pool-default-timeout
- resource-pool-retry-checkout-timeout
-
&resource-pool-timeout
resource-pool-timeout-error-pool
resource-pool-timeout-error?
@@ -87,8 +85,9 @@
lifetime
scheduler
(name "unnamed")
- (reply-timeout 0.5)
- (add-resources-parallelism 1))
+ (reply-timeout 1)
+ (add-resources-parallelism 1)
+ default-checkout-timeout)
(define channel (make-channel))
(define pool
@@ -104,7 +103,8 @@
(lifetime . ,lifetime)
(scheduler . ,scheduler)
(name . ,name)
- (reply-timeout . ,reply-timeout))))
+ (reply-timeout . ,reply-timeout)
+ (default-checkout-timeout . ,default-checkout-timeout))))
(define checkout-failure-count 0)
@@ -232,23 +232,38 @@
(cons (get-internal-real-time)
resources-last-used))
- (begin
- (if reply-timeout
- ;; Don't sleep in this fiber, so spawn a new
- ;; fiber to handle handing over the
- ;; resource, and returning it if there's a
- ;; timeout
- (spawn-fiber-for-checkout (last waiters)
- resource)
- (put-message (last waiters) resource))
-
- (loop (cons resource resources)
- available
- (drop-right! waiters 1)
- (cons (get-internal-real-time)
- resources-last-used))))))
-
- (('checkout reply)
+ (let* ((current-internal-time (get-internal-real-time))
+ (alive-waiters
+ dead-waiters
+ (partition!
+ (match-lambda
+ ((reply . timeout)
+ (or (not timeout)
+ (> timeout current-internal-time))))
+ waiters)))
+ (if (null? alive-waiters)
+ (loop (cons resource resources)
+ (cons resource available)
+ '()
+ (cons (get-internal-real-time)
+ resources-last-used))
+ (begin
+ (if reply-timeout
+ ;; Don't sleep in this fiber, so spawn a new
+ ;; fiber to handle handing over the
+ ;; resource, and returning it if there's a
+ ;; timeout
+ (spawn-fiber-for-checkout (car (last alive-waiters))
+ resource)
+ (put-message (car (last alive-waiters)) resource))
+
+ (loop (cons resource resources)
+ available
+ (drop-right! alive-waiters 1)
+ (cons (get-internal-real-time)
+ resources-last-used))))))))
+
+ (('checkout reply timeout-time)
(if (null? available)
(begin
(unless (= (length resources) max-size)
@@ -256,22 +271,31 @@
(loop resources
available
- (cons reply waiters)
+ (cons (cons reply timeout-time)
+ waiters)
resources-last-used))
- (let ((resource (car available)))
- (if reply-timeout
- ;; Don't sleep in this fiber, so spawn a
- ;; new fiber to handle handing over the
- ;; resource, and returning it if there's a
- ;; timeout
- (spawn-fiber-for-checkout reply resource)
- (put-message reply resource))
+ ;; If this client is still waiting
+ (if (> timeout-time
+ (get-internal-real-time))
+ (let ((resource (car available)))
+ (if reply-timeout
+ ;; Don't sleep in this fiber, so spawn a
+ ;; new fiber to handle handing over the
+ ;; resource, and returning it if there's a
+ ;; timeout
+ (spawn-fiber-for-checkout reply
+ resource)
+ (put-message reply resource))
- (loop resources
- (cdr available)
- waiters
- resources-last-used))))
+ (loop resources
+ (cdr available)
+ waiters
+ resources-last-used))
+ (loop resources
+ available
+ waiters
+ resources-last-used))))
(((and (or 'return
'return-failed-checkout)
@@ -296,27 +320,49 @@
(get-internal-real-time))
resources-last-used))
- (begin
- (if reply-timeout
- ;; Don't sleep in this fiber, so spawn a new
- ;; fiber to handle handing over the
- ;; resource, and returning it if there's a
- ;; timeout
- (spawn-fiber-for-checkout (last waiters)
- resource)
- (put-message (last waiters) resource))
-
- (loop resources
- available
- (drop-right! waiters 1)
- (begin
- (list-set!
- resources-last-used
- (list-index (lambda (x)
- (eq? x resource))
- resources)
- (get-internal-real-time))
- resources-last-used)))))
+ (let* ((current-internal-time (get-internal-real-time))
+ (alive-waiters
+ dead-waiters
+ (partition!
+ (match-lambda
+ ((reply . timeout)
+ (or (not timeout)
+ (> timeout current-internal-time))))
+ waiters)))
+ (if (null? alive-waiters)
+ (loop resources
+ (cons resource available)
+ '()
+ (begin
+ (when (eq? return-type 'return)
+ (list-set!
+ resources-last-used
+ (list-index (lambda (x)
+ (eq? x resource))
+ resources)
+ (get-internal-real-time)))
+ resources-last-used))
+ (begin
+ (if reply-timeout
+ ;; Don't sleep in this fiber, so spawn a new
+ ;; fiber to handle handing over the
+ ;; resource, and returning it if there's a
+ ;; timeout
+ (spawn-fiber-for-checkout (car (last alive-waiters))
+ resource)
+ (put-message (car (last alive-waiters)) resource))
+
+ (loop resources
+ available
+ (drop-right! alive-waiters 1)
+ (begin
+ (list-set!
+ resources-last-used
+ (list-index (lambda (x)
+ (eq? x resource))
+ resources)
+ (get-internal-real-time))
+ resources-last-used)))))))
(('remove resource)
(let ((index
@@ -359,8 +405,8 @@
(wrap-operation
(put-operation reply stats)
(const #t))
- (wrap-operation (sleep-operation
- reply-timeout)
+ (wrap-operation (sleep-operation (or reply-timeout
+ 5))
(const #f)))))))
(loop resources
@@ -495,12 +541,6 @@
(unless (eq? msg 'destroy-success)
(error msg)))))
-(define resource-pool-default-timeout
- (make-parameter #f))
-
-(define resource-pool-retry-checkout-timeout
- (make-parameter 5))
-
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
&error
@@ -526,87 +566,104 @@
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned."
- (define retry-timeout
- (resource-pool-retry-checkout-timeout))
-
(define timeout-or-default
(if (eq? timeout 'default)
- (resource-pool-default-timeout)
+ (assq-ref (resource-pool-configuration pool)
+ 'default-checkout-timeout)
timeout))
+ (define resource-pool-reply-timeout
+ (assq-ref (resource-pool-configuration pool)
+ 'reply-timeout))
+
(let ((resource
- (let ((reply (make-channel)))
- (let loop ((start-time (get-internal-real-time)))
- (let ((request-success?
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation (resource-pool-channel pool)
- `(checkout ,reply))
- (const #t))
- (wrap-operation (sleep-operation (or timeout-or-default
- retry-timeout))
- (const #f))))))
- (if request-success?
- (let ((time-remaining
- (- (or timeout-or-default
- retry-timeout)
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second))))
- (if (> time-remaining 0)
- (let ((response
- (perform-operation
- (choice-operation
- (get-operation reply)
- (wrap-operation (sleep-operation time-remaining)
- (const #f))))))
- (if (or (not response)
- (eq? response 'resource-pool-retry-checkout))
- (if (> (- (or timeout-or-default
- retry-timeout)
- (/ (- (get-internal-real-time)
- start-time)
+ (if timeout-or-default
+ (let loop ((reply (make-channel))
+ (start-time (get-internal-real-time)))
+ (let ((request-success?
+ (perform-operation
+ (choice-operation
+ (wrap-operation
+ (put-operation (resource-pool-channel pool)
+ (list 'checkout
+ reply
+ (+ start-time
+ (* timeout-or-default
+ internal-time-units-per-second))))
+ (const #t))
+ (wrap-operation (sleep-operation timeout-or-default)
+ (const #f))))))
+ (if request-success?
+ (let ((time-remaining
+ (- timeout-or-default
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second))))
+ (if (> time-remaining 0)
+ (let ((response
+ (perform-operation
+ (choice-operation
+ (get-operation reply)
+ (wrap-operation (sleep-operation time-remaining)
+ (const #f))))))
+ (if (or (not response)
+ (eq? response 'resource-pool-retry-checkout))
+ (if (> (- timeout-or-default
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second))
+ 0)
+ (loop (make-channel)
+ start-time)
+ #f)
+ response))
+ #f)))))
+ (let loop ((reply (make-channel)))
+ (put-message (resource-pool-channel pool)
+ (list 'checkout
+ reply
+ (if resource-pool-reply-timeout
+ (+ (get-internal-real-time)
+ (* resource-pool-reply-timeout
internal-time-units-per-second))
- 0)
- (loop start-time)
- (if (eq? timeout-or-default #f)
- (loop (get-internal-real-time))
- #f))
- response))
- (if (eq? timeout-or-default #f)
- (loop (get-internal-real-time))
- #f)))
- (if (eq? timeout-or-default #f)
- (loop (get-internal-real-time))
- #f)))))))
-
- (when (or (not resource)
- (eq? resource 'resource-pool-retry-checkout))
+ #f)))
+ (if resource-pool-reply-timeout
+ (let ((resource-or-timeout
+ (perform-operation
+ (choice-operation
+ (get-operation reply)
+ (wrap-operation
+ (sleep-operation resource-pool-reply-timeout)
+ (const 'resource-pool-reply-timeout))))))
+ (if (or (eq? resource-or-timeout
+ 'resource-pool-reply-timeout)
+ (eq? resource-or-timeout
+ 'resource-pool-retry-checkout))
+ (loop (make-channel))
+ resource-or-timeout))
+ (get-message reply))))))
+
+ (when (not resource)
(when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception
(make-resource-pool-timeout-error pool)))
- (with-exception-handler
- (lambda (exception)
- (put-message (resource-pool-channel pool)
- `(return ,resource))
- (raise-exception exception))
- (lambda ()
- (call-with-values
+ (call-with-values
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (print-backtrace-and-exception/knots exn)
+ (put-message (resource-pool-channel pool)
+ `(return ,resource))
+ (raise-exception exn))
(lambda ()
- (with-throw-handler #t
- (lambda ()
- (proc resource))
- (lambda _
- (backtrace))))
- (lambda vals
- (put-message (resource-pool-channel pool)
- `(return ,resource))
- (apply values vals))))
- #:unwind? #t)))
+ (proc resource))))
+ (lambda vals
+ (put-message (resource-pool-channel pool)
+ `(return ,resource))
+ (apply values vals)))))
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm
index 50dadbd..cf108de 100644
--- a/tests/resource-pool.scm
+++ b/tests/resource-pool.scm
@@ -105,15 +105,41 @@
(error "collision detected")))
(new-number))
1)))
- (fibers-for-each
+ (fibers-batch-for-each
+ (lambda _
+ (with-resource-from-pool
+ resource-pool res
+ (let ((start-val counter))
+ (sleep 0.05)
+ (if (= start-val counter)
+ (set! counter (+ 1 counter))
+ (error "collision detected")))))
+ 20
+ (iota 50)))))
+
+(run-fibers-for-tests
+ (lambda ()
+ (let* ((counter 0)
+ (resource-pool (make-resource-pool
+ (lambda ()
+ (let ((start-val counter))
+ (sleep 0.05)
+ (if (= start-val counter)
+ (set! counter (+ 1 counter))
+ (error "collision detected")))
+ (new-number))
+ 1
+ #:reply-timeout #f)))
+ (fibers-batch-for-each
(lambda _
(with-resource-from-pool
- resource-pool res
- (let ((start-val counter))
- (sleep 0.05)
- (if (= start-val counter)
- (set! counter (+ 1 counter))
- (error "collision detected")))))
+ resource-pool res
+ (let ((start-val counter))
+ (sleep 0.05)
+ (if (= start-val counter)
+ (set! counter (+ 1 counter))
+ (error "collision detected")))))
+ 20
(iota 50)))))
(display "resource-pool test finished successfully\n")