diff options
author | Christopher Baines <mail@cbaines.net> | 2025-02-04 12:52:12 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2025-02-04 13:16:10 +0000 |
commit | a11cd24e57ad334c26700691dca0164f410cc392 (patch) | |
tree | 70431ab47ac4b651f7b9d991607be3307c3835d7 | |
parent | aadbee0d0e452670719d030e5a40ff115ba10c68 (diff) | |
download | knots-a11cd24e57ad334c26700691dca0164f410cc392.tar knots-a11cd24e57ad334c26700691dca0164f410cc392.tar.gz |
Improve resource pool performance when there are lots of waiters
-rw-r--r-- | knots/resource-pool.scm | 323 | ||||
-rw-r--r-- | tests/resource-pool.scm | 40 |
2 files changed, 223 insertions, 140 deletions
diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index fddbb60..8f7bcc9 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -29,6 +29,7 @@ #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) + #:use-module (knots) #:use-module (knots parallelism) #:export (resource-pool? @@ -38,9 +39,6 @@ resource-pool-configuration destroy-resource-pool - resource-pool-default-timeout - resource-pool-retry-checkout-timeout - &resource-pool-timeout resource-pool-timeout-error-pool resource-pool-timeout-error? @@ -87,8 +85,9 @@ lifetime scheduler (name "unnamed") - (reply-timeout 0.5) - (add-resources-parallelism 1)) + (reply-timeout 1) + (add-resources-parallelism 1) + default-checkout-timeout) (define channel (make-channel)) (define pool @@ -104,7 +103,8 @@ (lifetime . ,lifetime) (scheduler . ,scheduler) (name . ,name) - (reply-timeout . ,reply-timeout)))) + (reply-timeout . ,reply-timeout) + (default-checkout-timeout . ,default-checkout-timeout)))) (define checkout-failure-count 0) @@ -232,23 +232,38 @@ (cons (get-internal-real-time) resources-last-used)) - (begin - (if reply-timeout - ;; Don't sleep in this fiber, so spawn a new - ;; fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout (last waiters) - resource) - (put-message (last waiters) resource)) - - (loop (cons resource resources) - available - (drop-right! waiters 1) - (cons (get-internal-real-time) - resources-last-used)))))) - - (('checkout reply) + (let* ((current-internal-time (get-internal-real-time)) + (alive-waiters + dead-waiters + (partition! + (match-lambda + ((reply . timeout) + (or (not timeout) + (> timeout current-internal-time)))) + waiters))) + (if (null? alive-waiters) + (loop (cons resource resources) + (cons resource available) + '() + (cons (get-internal-real-time) + resources-last-used)) + (begin + (if reply-timeout + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout (car (last alive-waiters)) + resource) + (put-message (car (last alive-waiters)) resource)) + + (loop (cons resource resources) + available + (drop-right! alive-waiters 1) + (cons (get-internal-real-time) + resources-last-used)))))))) + + (('checkout reply timeout-time) (if (null? available) (begin (unless (= (length resources) max-size) @@ -256,22 +271,31 @@ (loop resources available - (cons reply waiters) + (cons (cons reply timeout-time) + waiters) resources-last-used)) - (let ((resource (car available))) - (if reply-timeout - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout reply resource) - (put-message reply resource)) + ;; If this client is still waiting + (if (> timeout-time + (get-internal-real-time)) + (let ((resource (car available))) + (if reply-timeout + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout reply + resource) + (put-message reply resource)) - (loop resources - (cdr available) - waiters - resources-last-used)))) + (loop resources + (cdr available) + waiters + resources-last-used)) + (loop resources + available + waiters + resources-last-used)))) (((and (or 'return 'return-failed-checkout) @@ -296,27 +320,49 @@ (get-internal-real-time)) resources-last-used)) - (begin - (if reply-timeout - ;; Don't sleep in this fiber, so spawn a new - ;; fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout (last waiters) - resource) - (put-message (last waiters) resource)) - - (loop resources - available - (drop-right! waiters 1) - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used))))) + (let* ((current-internal-time (get-internal-real-time)) + (alive-waiters + dead-waiters + (partition! + (match-lambda + ((reply . timeout) + (or (not timeout) + (> timeout current-internal-time)))) + waiters))) + (if (null? alive-waiters) + (loop resources + (cons resource available) + '() + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time))) + resources-last-used)) + (begin + (if reply-timeout + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout (car (last alive-waiters)) + resource) + (put-message (car (last alive-waiters)) resource)) + + (loop resources + available + (drop-right! alive-waiters 1) + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used))))))) (('remove resource) (let ((index @@ -359,8 +405,8 @@ (wrap-operation (put-operation reply stats) (const #t)) - (wrap-operation (sleep-operation - reply-timeout) + (wrap-operation (sleep-operation (or reply-timeout + 5)) (const #f))))))) (loop resources @@ -495,12 +541,6 @@ (unless (eq? msg 'destroy-success) (error msg))))) -(define resource-pool-default-timeout - (make-parameter #f)) - -(define resource-pool-retry-checkout-timeout - (make-parameter 5)) - (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout &error @@ -526,87 +566,104 @@ "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." - (define retry-timeout - (resource-pool-retry-checkout-timeout)) - (define timeout-or-default (if (eq? timeout 'default) - (resource-pool-default-timeout) + (assq-ref (resource-pool-configuration pool) + 'default-checkout-timeout) timeout)) + (define resource-pool-reply-timeout + (assq-ref (resource-pool-configuration pool) + 'reply-timeout)) + (let ((resource - (let ((reply (make-channel))) - (let loop ((start-time (get-internal-real-time))) - (let ((request-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation (resource-pool-channel pool) - `(checkout ,reply)) - (const #t)) - (wrap-operation (sleep-operation (or timeout-or-default - retry-timeout)) - (const #f)))))) - (if request-success? - (let ((time-remaining - (- (or timeout-or-default - retry-timeout) - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)))) - (if (> time-remaining 0) - (let ((response - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (const #f)))))) - (if (or (not response) - (eq? response 'resource-pool-retry-checkout)) - (if (> (- (or timeout-or-default - retry-timeout) - (/ (- (get-internal-real-time) - start-time) + (if timeout-or-default + (let loop ((reply (make-channel)) + (start-time (get-internal-real-time))) + (let ((request-success? + (perform-operation + (choice-operation + (wrap-operation + (put-operation (resource-pool-channel pool) + (list 'checkout + reply + (+ start-time + (* timeout-or-default + internal-time-units-per-second)))) + (const #t)) + (wrap-operation (sleep-operation timeout-or-default) + (const #f)))))) + (if request-success? + (let ((time-remaining + (- timeout-or-default + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second)))) + (if (> time-remaining 0) + (let ((response + (perform-operation + (choice-operation + (get-operation reply) + (wrap-operation (sleep-operation time-remaining) + (const #f)))))) + (if (or (not response) + (eq? response 'resource-pool-retry-checkout)) + (if (> (- timeout-or-default + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second)) + 0) + (loop (make-channel) + start-time) + #f) + response)) + #f))))) + (let loop ((reply (make-channel))) + (put-message (resource-pool-channel pool) + (list 'checkout + reply + (if resource-pool-reply-timeout + (+ (get-internal-real-time) + (* resource-pool-reply-timeout internal-time-units-per-second)) - 0) - (loop start-time) - (if (eq? timeout-or-default #f) - (loop (get-internal-real-time)) - #f)) - response)) - (if (eq? timeout-or-default #f) - (loop (get-internal-real-time)) - #f))) - (if (eq? timeout-or-default #f) - (loop (get-internal-real-time)) - #f))))))) - - (when (or (not resource) - (eq? resource 'resource-pool-retry-checkout)) + #f))) + (if resource-pool-reply-timeout + (let ((resource-or-timeout + (perform-operation + (choice-operation + (get-operation reply) + (wrap-operation + (sleep-operation resource-pool-reply-timeout) + (const 'resource-pool-reply-timeout)))))) + (if (or (eq? resource-or-timeout + 'resource-pool-reply-timeout) + (eq? resource-or-timeout + 'resource-pool-retry-checkout)) + (loop (make-channel)) + resource-or-timeout)) + (get-message reply)))))) + + (when (not resource) (when timeout-handler (timeout-handler pool proc timeout)) (raise-exception (make-resource-pool-timeout-error pool))) - (with-exception-handler - (lambda (exception) - (put-message (resource-pool-channel pool) - `(return ,resource)) - (raise-exception exception)) - (lambda () - (call-with-values + (call-with-values + (lambda () + (with-exception-handler + (lambda (exn) + (print-backtrace-and-exception/knots exn) + (put-message (resource-pool-channel pool) + `(return ,resource)) + (raise-exception exn)) (lambda () - (with-throw-handler #t - (lambda () - (proc resource)) - (lambda _ - (backtrace)))) - (lambda vals - (put-message (resource-pool-channel pool) - `(return ,resource)) - (apply values vals)))) - #:unwind? #t))) + (proc resource)))) + (lambda vals + (put-message (resource-pool-channel pool) + `(return ,resource)) + (apply values vals))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 50dadbd..cf108de 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -105,15 +105,41 @@ (error "collision detected"))) (new-number)) 1))) - (fibers-for-each + (fibers-batch-for-each + (lambda _ + (with-resource-from-pool + resource-pool res + (let ((start-val counter)) + (sleep 0.05) + (if (= start-val counter) + (set! counter (+ 1 counter)) + (error "collision detected"))))) + 20 + (iota 50))))) + +(run-fibers-for-tests + (lambda () + (let* ((counter 0) + (resource-pool (make-resource-pool + (lambda () + (let ((start-val counter)) + (sleep 0.05) + (if (= start-val counter) + (set! counter (+ 1 counter)) + (error "collision detected"))) + (new-number)) + 1 + #:reply-timeout #f))) + (fibers-batch-for-each (lambda _ (with-resource-from-pool - resource-pool res - (let ((start-val counter)) - (sleep 0.05) - (if (= start-val counter) - (set! counter (+ 1 counter)) - (error "collision detected"))))) + resource-pool res + (let ((start-val counter)) + (sleep 0.05) + (if (= start-val counter) + (set! counter (+ 1 counter)) + (error "collision detected"))))) + 20 (iota 50))))) (display "resource-pool test finished successfully\n") |