diff options
Diffstat (limited to 'guix-data-service/utils.scm')
-rw-r--r-- | guix-data-service/utils.scm | 670 |
1 files changed, 23 insertions, 647 deletions
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index d01fb5c..7cd7342 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -17,7 +17,10 @@ (define-module (guix-data-service utils) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) #:use-module (srfi srfi-11) + #:use-module (srfi srfi-71) + #:use-module (ice-9 q) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 atomic) @@ -33,23 +36,12 @@ #:use-module (fibers timers) #:use-module (fibers conditions) #:use-module (fibers scheduler) + #:use-module (knots timeout) #:use-module (prometheus) #:export (call-with-time-logging with-time-logging prevent-inlining-for-tests - resource-pool-default-timeout - %resource-pool-timeout-handler - make-resource-pool - destroy-resource-pool - call-with-resource-from-pool - with-resource-from-pool - resource-pool-stats - - parallel-via-fibers - par-map& - letpar& - chunk chunk! chunk-for-each! @@ -58,10 +50,9 @@ get-guix-metrics-updater - call-with-sigint - run-server/patched + spawn-port-monitoring-fiber - spawn-port-monitoring-fiber)) + make-queueing-channel)) (define (call-with-time-logging action thunk) (simple-format #t "debug: Starting ~A\n" action) @@ -80,464 +71,31 @@ (define-syntax-rule (prevent-inlining-for-tests var) (set! var var)) -(define* (make-resource-pool initializer max-size - #:key (min-size max-size) - (idle-seconds #f) - (delay-logger (const #f)) - (duration-logger (const #f)) - destructor - lifetime - (name "unnamed")) - (define (initializer/safe) - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running ~A resource pool initializer: ~A:\n ~A\n" - name - initializer - exn) - #f) - (lambda () - (with-throw-handler #t - initializer - (lambda args - (backtrace)))) - #:unwind? #t)) - - (define (destructor/safe args) - (let ((success? - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running resource pool destructor (~A): ~A:\n ~A\n" - name - destructor - exn) - #f) - (lambda () - (with-throw-handler #t - (lambda () - (destructor args) - #t) - (lambda _ - (backtrace)))) - #:unwind? #t))) - - (or success? - #t - (begin - (sleep 5) - (destructor/safe args))))) - - (let ((channel (make-channel)) - (checkout-failure-count 0)) - (spawn-fiber - (lambda () - (while #t - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception in the ~A pool fiber: ~A\n" - name - exn)) - (lambda () - (let loop ((resources '()) - (available '()) - (waiters '()) - (resources-last-used '())) - - (match (if idle-seconds - (perform-operation - (choice-operation - (get-operation channel) - (wrap-operation - ;; TODO Do something smarter - (sleep-operation 10) - (const '(check-for-idle-resources))))) - (get-message channel)) - (('checkout reply) - (if (null? available) - (if (= (length resources) max-size) - (loop resources - available - (cons reply waiters) - resources-last-used) - (let ((new-resource (initializer/safe))) - (if new-resource - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply new-resource) - (const #t)) - (wrap-operation (sleep-operation 1) - (const #f)))))) - (unless checkout-success? - (set! checkout-failure-count - (+ 1 checkout-failure-count))) - - (loop (cons new-resource resources) - (if checkout-success? - available - (cons new-resource available)) - waiters - (cons (get-internal-real-time) - resources-last-used))) - (loop resources - available - (cons reply waiters) - resources-last-used)))) - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply (car available)) - (const #t)) - (wrap-operation (sleep-operation 1) - (const #f)))))) - (unless checkout-success? - (set! checkout-failure-count - (+ 1 checkout-failure-count))) - - (if checkout-success? - (loop resources - (cdr available) - waiters - resources-last-used) - (loop resources - available - waiters - resources-last-used))))) - (('return resource) - ;; When a resource is returned, prompt all the waiters to request - ;; again. This is to avoid the pool waiting on channels that may - ;; be dead. - (for-each - (lambda (waiter) - (spawn-fiber - (lambda () - (perform-operation - (choice-operation - (put-operation waiter 'resource-pool-retry-checkout) - (sleep-operation 0.2)))))) - waiters) - - (loop resources - (cons resource available) - ;; clear waiters, as they've been notified - '() - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used))) - (('stats reply) - (let ((stats - `((resources . ,(length resources)) - (available . ,(length available)) - (waiters . ,(length waiters)) - (checkout-failure-count . ,checkout-failure-count)))) - - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 0.2) - (const #f))))) - - (loop resources - available - waiters - resources-last-used)) - (('check-for-idle-resources) - (let* ((resources-last-used-seconds - (map - (lambda (internal-time) - (/ (- (get-internal-real-time) internal-time) - internal-time-units-per-second)) - resources-last-used)) - (resources-to-destroy - (filter-map - (lambda (resource last-used-seconds) - (if (and (member resource available) - (> last-used-seconds idle-seconds)) - resource - #f)) - resources - resources-last-used-seconds))) - - (for-each - (lambda (resource) - (destructor/safe resource)) - resources-to-destroy) - - (loop (lset-difference eq? resources resources-to-destroy) - (lset-difference eq? available resources-to-destroy) - waiters - (filter-map - (lambda (resource last-used) - (if (memq resource resources-to-destroy) - #f - last-used)) - resources - resources-last-used)))) - (('destroy reply) - (if (= (length resources) (length available)) - (begin - (for-each - (lambda (resource) - (destructor/safe resource)) - resources) - (put-message reply 'destroy-success)) - (begin - (spawn-fiber - (lambda () - (perform-operation - (choice-operation - (put-operation reply 'resource-pool-destroy-failed) - (sleep-operation 10))))) - (loop resources - available - waiters - resources-last-used)))) - (unknown - (simple-format - (current-error-port) - "unrecognised message to ~A resource pool channel: ~A\n" - name - unknown) - (loop resources - available - waiters - resources-last-used))))) - #:unwind? #t)))) - - channel)) - -(define (destroy-resource-pool pool) - (let ((reply (make-channel))) - (put-message pool (list 'destroy reply)) - (let ((msg (get-message reply))) - (unless (eq? msg 'destroy-success) - (error msg))))) - -(define resource-pool-default-timeout - (make-parameter #f)) - -(define &resource-pool-timeout - (make-exception-type '&recource-pool-timeout - &error - '())) - -(define make-resource-pool-timeout-error - (record-constructor &resource-pool-timeout)) - -(define resource-pool-timeout-error? - (record-predicate &resource-pool-timeout)) - -(define %resource-pool-timeout-handler - (make-parameter #f)) - -(define* (call-with-resource-from-pool pool proc #:key (timeout 'default) - (timeout-handler (%resource-pool-timeout-handler))) - "Call PROC with a resource from POOL, blocking until a resource becomes -available. Return the resource once PROC has returned." - - (define timeout-or-default - (if (eq? timeout 'default) - (resource-pool-default-timeout) - timeout)) - - (let ((resource - (let ((reply (make-channel))) - (if timeout-or-default - (let loop ((start-time (get-internal-real-time))) - (perform-operation - (choice-operation - (wrap-operation - (put-operation pool `(checkout ,reply)) - (const #t)) - (wrap-operation (sleep-operation timeout-or-default) - (const #f)))) - - (let ((time-remaining - (- timeout-or-default - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)))) - (if (> time-remaining 0) - (let ((response - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (const #f)))))) - (if (or (not response) - (eq? response 'resource-pool-retry-checkout)) - (if (> (- timeout-or-default - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)) - 0) - (loop start-time) - #f) - response)) - #f))) - (let loop () - (put-message pool `(checkout ,reply)) - (let ((response (get-message reply))) - (if (eq? response 'resource-pool-retry-checkout) - (loop) - response))))))) - - (when (or (not resource) - (eq? resource 'resource-pool-retry-checkout)) - (when timeout-handler - (timeout-handler pool proc timeout)) - - (raise-exception - (make-resource-pool-timeout-error))) - - (with-exception-handler - (lambda (exception) - (put-message pool `(return ,resource)) - (raise-exception exception)) - (lambda () - (call-with-values - (lambda () - (with-throw-handler #t - (lambda () - (proc resource)) - (lambda _ - (backtrace)))) - (lambda vals - (put-message pool `(return ,resource)) - (apply values vals)))) - #:unwind? #t))) - -(define-syntax-rule (with-resource-from-pool pool resource exp ...) - (call-with-resource-from-pool - pool - (lambda (resource) exp ...))) - -(define* (resource-pool-stats pool #:key (timeout 5)) - (let ((reply (make-channel)) - (start-time (get-internal-real-time))) - (perform-operation - (choice-operation - (wrap-operation - (put-operation pool `(stats ,reply)) - (const #t)) - (wrap-operation (sleep-operation timeout) - (const #f)))) - - (let ((time-remaining - (- timeout - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)))) - (if (> time-remaining 0) - (let ((response - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (const #f)))))) - response) - (raise-exception - (make-resource-pool-timeout-error)))))) - -(define (defer-to-parallel-fiber thunk) - (let ((reply (make-channel))) - (spawn-fiber - (lambda () - (with-exception-handler - (lambda (exn) - (put-message reply (cons 'exception exn))) - (lambda () - (call-with-values - (lambda () - (with-throw-handler #t - thunk - (lambda _ - (backtrace)))) - (lambda vals - (put-message reply vals)))) - #:unwind? #t)) - #:parallel? #t) - reply)) - -(define (fetch-result-of-defered-thunks . reply-channels) - (let ((responses (map get-message - reply-channels))) - (map - (match-lambda - (('exception . exn) - (raise-exception exn)) - (result - (apply values result))) - responses))) - -(define-syntax parallel-via-fibers - (lambda (x) - (syntax-case x () - ((_ e0 ...) - (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) - #'(let ((tmp0 (defer-to-parallel-fiber - (lambda () - e0))) - ...) - (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) - -(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) - (call-with-values - (lambda () (parallel-via-fibers e ...)) - (lambda (v ...) - b0 b1 ...))) - -(define (par-mapper' mapper cons) - (lambda (proc . lists) - (apply - fetch-result-of-defered-thunks - (let loop ((lists lists)) - (match lists - (((heads tails ...) ...) - (let ((tail (loop tails)) - (head (defer-to-parallel-fiber - (lambda () - (apply proc heads))))) - (cons head tail))) - (_ - '())))))) - -(define par-map& (par-mapper' map cons)) - (define (chunk lst max-length) - (if (> (length lst) - max-length) + (let ((len (length lst))) + (cond + ((= 0 len) '()) + ((> (length lst) max-length) (call-with-values (lambda () (split-at lst max-length)) (lambda (first-lst rest) (cons first-lst - (chunk rest max-length)))) - (list lst))) + (chunk rest max-length))))) + (else + (list lst))))) (define (chunk! lst max-length) - (if (> (length lst) - max-length) + (let ((len (length lst))) + (cond + ((= 0 len) '()) + ((> (length lst) max-length) (call-with-values (lambda () (split-at! lst max-length)) (lambda (first-lst rest) (cons first-lst - (chunk! rest max-length)))) - (list lst))) + (chunk! rest max-length))))) + (else + (list lst))))) (define* (chunk-for-each! proc chunk-size #:rest lsts) (define (do-one-iteration lsts) @@ -558,10 +116,10 @@ available. Return the resource once PROC has returned." (apply proc lsts))) (let ((list-lengths (map length lsts))) - (unless (eq? 1 (length (delete-duplicates list-lengths))) + (unless (= 1 (length (delete-duplicates list-lengths))) (error "lists not equal length")) - (unless (eq? 0 (first list-lengths)) + (unless (= 0 (first list-lengths)) (do-one-iteration lsts))) #t) @@ -606,173 +164,6 @@ available. Return the resource once PROC has returned." 0))) #:unwind? #t)))) -;; This variant of run-server from the fibers library supports running -;; multiple servers within one process. -(define run-server/patched - (let ((fibers-web-server-module - (resolve-module '(fibers web server)))) - - (define set-nonblocking! - (module-ref fibers-web-server-module 'set-nonblocking!)) - - (define make-default-socket - (module-ref fibers-web-server-module 'make-default-socket)) - - (define socket-loop - (module-ref fibers-web-server-module 'socket-loop)) - - (lambda* (handler - #:key - (host #f) - (family AF_INET) - (addr (if host - (inet-pton family host) - INADDR_LOOPBACK)) - (port 8080) - (socket (make-default-socket family addr port))) - ;; We use a large backlog by default. If the server is suddenly hit - ;; with a number of connections on a small backlog, clients won't - ;; receive confirmation for their SYN, leading them to retry -- - ;; probably successfully, but with a large latency. - (listen socket 1024) - (set-nonblocking! socket) - (sigaction SIGPIPE SIG_IGN) - (spawn-fiber (lambda () (socket-loop socket handler)))))) - -(define &port-timeout - (make-exception-type '&port-timeout - &external-error - '(port))) - -(define make-port-timeout-error - (record-constructor &port-timeout)) - -(define port-timeout-error? - (record-predicate &port-timeout)) - -(define &port-read-timeout - (make-exception-type '&port-read-timeout - &port-timeout - '())) - -(define make-port-read-timeout-error - (record-constructor &port-read-timeout)) - -(define port-read-timeout-error? - (record-predicate &port-read-timeout)) - -(define &port-write-timeout - (make-exception-type '&port-write-timeout - &port-timeout - '())) - -(define make-port-write-timeout-error - (record-constructor &port-write-timeout)) - -(define port-write-timeout-error? - (record-predicate &port-write-timeout)) - -;; These procedure are subject to spurious wakeups. - -(define (readable? port) - "Test if PORT is writable." - (match (select (vector port) #() #() 0) - ((#() #() #()) #f) - ((#(_) #() #()) #t))) - -(define (writable? port) - "Test if PORT is writable." - (match (select #() (vector port) #() 0) - ((#() #() #()) #f) - ((#() #(_) #()) #t))) - -(define (make-wait-operation ready? schedule-when-ready port - port-ready-fd this-procedure) - (make-base-operation - #f - (lambda _ - (and (ready? (port-ready-fd port)) values)) - (lambda (flag sched resume) - (define (commit) - (match (atomic-box-compare-and-swap! flag 'W 'S) - ('W (resume values)) - ('C (commit)) - ('S #f))) - (schedule-when-ready - sched (port-ready-fd port) commit)))) - -(define (wait-until-port-readable-operation port) - "Make an operation that will succeed when PORT is readable." - (unless (input-port? port) - (error "refusing to wait forever for input on non-input port")) - (make-wait-operation readable? schedule-task-when-fd-readable port - port-read-wait-fd - wait-until-port-readable-operation)) - -(define (wait-until-port-writable-operation port) - "Make an operation that will succeed when PORT is writable." - (unless (output-port? port) - (error "refusing to wait forever for output on non-output port")) - (make-wait-operation writable? schedule-task-when-fd-writable port - port-write-wait-fd - wait-until-port-writable-operation)) - -(define* (with-fibers-port-timeouts thunk - #:key timeout - (read-timeout timeout) - (write-timeout timeout)) - (define (no-fibers-wait port mode timeout) - (define poll-timeout-ms 200) - - ;; When the GC runs, it restarts the poll syscall, but the timeout - ;; remains unchanged! When the timeout is longer than the time - ;; between the syscall restarting, I think this renders the - ;; timeout useless. Therefore, this code uses a short timeout, and - ;; repeatedly calls poll while watching the clock to see if it has - ;; timed out overall. - (let ((timeout-internal - (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) - (let loop ((poll-value - (port-poll port mode poll-timeout-ms))) - (if (= poll-value 0) - (if (> (get-internal-real-time) - timeout-internal) - (raise-exception - (if (string=? mode "r") - (make-port-read-timeout-error port) - (make-port-write-timeout-error port))) - (loop (port-poll port mode poll-timeout-ms))) - poll-value)))) - - (parameterize - ((current-read-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-readable-operation port) - (wrap-operation - (sleep-operation read-timeout) - (lambda () - (raise-exception - (make-port-read-timeout-error thunk port)))))) - (no-fibers-wait port "r" read-timeout)))) - (current-write-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-writable-operation port) - (wrap-operation - (sleep-operation write-timeout) - (lambda () - (raise-exception - (make-port-write-timeout-error thunk port)))))) - (no-fibers-wait port "w" write-timeout))))) - (thunk))) - (define (spawn-port-monitoring-fiber port error-condition) (spawn-fiber (lambda () @@ -785,25 +176,10 @@ available. Return the resource once PROC has returned." port exn) (signal-condition! error-condition)) (lambda () - (with-fibers-port-timeouts + (with-port-timeouts (lambda () (let ((sock (socket PF_INET SOCK_STREAM 0))) (connect sock AF_INET INADDR_LOOPBACK port) (close-port sock))) #:timeout 20)) #:unwind? #t))))) - -;; Copied from (fibers web server) -(define (call-with-sigint thunk cvar) - (let ((handler #f)) - (dynamic-wind - (lambda () - (set! handler - (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) - thunk - (lambda () - (if handler - ;; restore Scheme handler, SIG_IGN or SIG_DFL. - (sigaction SIGINT (car handler) (cdr handler)) - ;; restore original C handler. - (sigaction SIGINT #f)))))) |