From 5af6233e5b411d1735f704d311ee46c0eec8ab6f Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 18 Jan 2024 14:39:39 +0000 Subject: Make it possible to destroy a resource pool And implement removing idle resources. --- guix-data-service/utils.scm | 141 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 127 insertions(+), 14 deletions(-) (limited to 'guix-data-service') diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index c91b10b..e1c6d84 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -34,6 +34,7 @@ resource-pool-default-timeout make-resource-pool + destroy-resource-pool call-with-resource-from-pool with-resource-from-pool resource-pool-stats @@ -73,7 +74,7 @@ (define* (make-resource-pool initializer max-size #:key (min-size max-size) - (idle-duration #f) + (idle-seconds #f) (delay-logger (const #f)) (duration-logger (const #f)) destructor @@ -96,6 +97,32 @@ (backtrace)))) #:unwind? #t)) + (define (destructor/safe args) + (let ((success? + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running resource pool destructor (~A): ~A:\n ~A\n" + name + destructor + exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (destructor args) + #t) + (lambda _ + (backtrace)))) + #:unwind? #t))) + + (or success? + #t + (begin + (sleep 5) + (destructor/safe args))))) + (let ((channel (make-channel))) (spawn-fiber (lambda () @@ -110,15 +137,25 @@ (lambda () (let loop ((resources '()) (available '()) - (waiters '())) - - (match (get-message channel) + (waiters '()) + (resources-last-used '())) + + (match (if idle-seconds + (perform-operation + (choice-operation + (get-operation channel) + (wrap-operation + ;; TODO Do something smarter + (sleep-operation 10) + (const '(check-for-idle-resources))))) + (get-message channel)) (('checkout reply) (if (null? available) (if (= (length resources) max-size) (loop resources available - (cons reply waiters)) + (cons reply waiters) + resources-last-used) (let ((new-resource (initializer/safe))) (if new-resource (let ((checkout-success? @@ -133,10 +170,13 @@ (if checkout-success? available (cons new-resource available)) - waiters)) + waiters + (cons (get-internal-real-time) + resources-last-used))) (loop resources available - (cons reply waiters))))) + (cons reply waiters) + resources-last-used)))) (let ((checkout-success? (perform-operation (choice-operation @@ -148,10 +188,12 @@ (if checkout-success? (loop resources (cdr available) - waiters) + waiters + resources-last-used) (loop resources available - waiters))))) + waiters + resources-last-used))))) (('return resource) ;; When a resource is returned, prompt all the waiters to request ;; again. This is to avoid the pool waiting on channels that may @@ -169,7 +211,15 @@ (loop resources (cons resource available) ;; clear waiters, as they've been notified - '())) + '() + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used))) (('stats reply) (let ((stats `((resources . ,(length resources)) @@ -186,7 +236,59 @@ (loop resources available - waiters)) + waiters + resources-last-used)) + (('check-for-idle-resources) + (let* ((resources-last-used-seconds + (map + (lambda (internal-time) + (/ (- (get-internal-real-time) internal-time) + internal-time-units-per-second)) + resources-last-used)) + (resources-to-destroy + (filter-map + (lambda (resource last-used-seconds) + (if (and (member resource available) + (> last-used-seconds idle-seconds)) + resource + #f)) + resources + resources-last-used-seconds))) + + (for-each + (lambda (resource) + (destructor/safe resource)) + resources-to-destroy) + + (loop (lset-difference eq? resources resources-to-destroy) + (lset-difference eq? available resources-to-destroy) + waiters + (filter-map + (lambda (resource last-used) + (if (memq resource resources-to-destroy) + #f + last-used)) + resources + resources-last-used)))) + (('destroy reply) + (if (= (length resources) (length available)) + (begin + (for-each + (lambda (resource) + (destructor/safe resource)) + resources) + (put-message reply 'destroy-success)) + (begin + (spawn-fiber + (lambda () + (perform-operation + (choice-operation + (put-operation reply 'resource-pool-destroy-failed) + (sleep-operation 10))))) + (loop resources + available + waiters + resources-last-used)))) (unknown (simple-format (current-error-port) @@ -195,11 +297,19 @@ unknown) (loop resources available - waiters))))) + waiters + resources-last-used))))) #:unwind? #t)))) channel)) +(define (destroy-resource-pool pool) + (let ((reply (make-channel))) + (put-message pool (list 'destroy reply)) + (let ((msg (get-message reply))) + (unless (eq? msg 'destroy-success) + (error msg))))) + (define resource-pool-default-timeout (make-parameter #f)) @@ -258,9 +368,12 @@ available. Return the resource once PROC has returned." #f) response)) #f))) - (begin + (let loop () (put-message pool `(checkout ,reply)) - (get-message reply)))))) + (let ((response (get-message reply))) + (if (eq? response 'resource-pool-retry-checkout) + (loop) + response))))))) (when (or (not resource) (eq? resource 'resource-pool-retry-checkout)) -- cgit v1.2.3