aboutsummaryrefslogtreecommitdiff
path: root/guix-data-service/utils.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-data-service/utils.scm')
-rw-r--r--guix-data-service/utils.scm670
1 files changed, 23 insertions, 647 deletions
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index d01fb5c..7cd7342 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -17,7 +17,10 @@
(define-module (guix-data-service utils)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
#:use-module (srfi srfi-11)
+ #:use-module (srfi srfi-71)
+ #:use-module (ice-9 q)
#:use-module (ice-9 ftw)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
@@ -33,23 +36,12 @@
#:use-module (fibers timers)
#:use-module (fibers conditions)
#:use-module (fibers scheduler)
+ #:use-module (knots timeout)
#:use-module (prometheus)
#:export (call-with-time-logging
with-time-logging
prevent-inlining-for-tests
- resource-pool-default-timeout
- %resource-pool-timeout-handler
- make-resource-pool
- destroy-resource-pool
- call-with-resource-from-pool
- with-resource-from-pool
- resource-pool-stats
-
- parallel-via-fibers
- par-map&
- letpar&
-
chunk
chunk!
chunk-for-each!
@@ -58,10 +50,9 @@
get-guix-metrics-updater
- call-with-sigint
- run-server/patched
+ spawn-port-monitoring-fiber
- spawn-port-monitoring-fiber))
+ make-queueing-channel))
(define (call-with-time-logging action thunk)
(simple-format #t "debug: Starting ~A\n" action)
@@ -80,464 +71,31 @@
(define-syntax-rule (prevent-inlining-for-tests var)
(set! var var))
-(define* (make-resource-pool initializer max-size
- #:key (min-size max-size)
- (idle-seconds #f)
- (delay-logger (const #f))
- (duration-logger (const #f))
- destructor
- lifetime
- (name "unnamed"))
- (define (initializer/safe)
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running ~A resource pool initializer: ~A:\n ~A\n"
- name
- initializer
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- initializer
- (lambda args
- (backtrace))))
- #:unwind? #t))
-
- (define (destructor/safe args)
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running resource pool destructor (~A): ~A:\n ~A\n"
- name
- destructor
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (destructor args)
- #t)
- (lambda _
- (backtrace))))
- #:unwind? #t)))
-
- (or success?
- #t
- (begin
- (sleep 5)
- (destructor/safe args)))))
-
- (let ((channel (make-channel))
- (checkout-failure-count 0))
- (spawn-fiber
- (lambda ()
- (while #t
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception in the ~A pool fiber: ~A\n"
- name
- exn))
- (lambda ()
- (let loop ((resources '())
- (available '())
- (waiters '())
- (resources-last-used '()))
-
- (match (if idle-seconds
- (perform-operation
- (choice-operation
- (get-operation channel)
- (wrap-operation
- ;; TODO Do something smarter
- (sleep-operation 10)
- (const '(check-for-idle-resources)))))
- (get-message channel))
- (('checkout reply)
- (if (null? available)
- (if (= (length resources) max-size)
- (loop resources
- available
- (cons reply waiters)
- resources-last-used)
- (let ((new-resource (initializer/safe)))
- (if new-resource
- (let ((checkout-success?
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation reply new-resource)
- (const #t))
- (wrap-operation (sleep-operation 1)
- (const #f))))))
- (unless checkout-success?
- (set! checkout-failure-count
- (+ 1 checkout-failure-count)))
-
- (loop (cons new-resource resources)
- (if checkout-success?
- available
- (cons new-resource available))
- waiters
- (cons (get-internal-real-time)
- resources-last-used)))
- (loop resources
- available
- (cons reply waiters)
- resources-last-used))))
- (let ((checkout-success?
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation reply (car available))
- (const #t))
- (wrap-operation (sleep-operation 1)
- (const #f))))))
- (unless checkout-success?
- (set! checkout-failure-count
- (+ 1 checkout-failure-count)))
-
- (if checkout-success?
- (loop resources
- (cdr available)
- waiters
- resources-last-used)
- (loop resources
- available
- waiters
- resources-last-used)))))
- (('return resource)
- ;; When a resource is returned, prompt all the waiters to request
- ;; again. This is to avoid the pool waiting on channels that may
- ;; be dead.
- (for-each
- (lambda (waiter)
- (spawn-fiber
- (lambda ()
- (perform-operation
- (choice-operation
- (put-operation waiter 'resource-pool-retry-checkout)
- (sleep-operation 0.2))))))
- waiters)
-
- (loop resources
- (cons resource available)
- ;; clear waiters, as they've been notified
- '()
- (begin
- (list-set!
- resources-last-used
- (list-index (lambda (x)
- (eq? x resource))
- resources)
- (get-internal-real-time))
- resources-last-used)))
- (('stats reply)
- (let ((stats
- `((resources . ,(length resources))
- (available . ,(length available))
- (waiters . ,(length waiters))
- (checkout-failure-count . ,checkout-failure-count))))
-
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation reply stats)
- (const #t))
- (wrap-operation (sleep-operation 0.2)
- (const #f)))))
-
- (loop resources
- available
- waiters
- resources-last-used))
- (('check-for-idle-resources)
- (let* ((resources-last-used-seconds
- (map
- (lambda (internal-time)
- (/ (- (get-internal-real-time) internal-time)
- internal-time-units-per-second))
- resources-last-used))
- (resources-to-destroy
- (filter-map
- (lambda (resource last-used-seconds)
- (if (and (member resource available)
- (> last-used-seconds idle-seconds))
- resource
- #f))
- resources
- resources-last-used-seconds)))
-
- (for-each
- (lambda (resource)
- (destructor/safe resource))
- resources-to-destroy)
-
- (loop (lset-difference eq? resources resources-to-destroy)
- (lset-difference eq? available resources-to-destroy)
- waiters
- (filter-map
- (lambda (resource last-used)
- (if (memq resource resources-to-destroy)
- #f
- last-used))
- resources
- resources-last-used))))
- (('destroy reply)
- (if (= (length resources) (length available))
- (begin
- (for-each
- (lambda (resource)
- (destructor/safe resource))
- resources)
- (put-message reply 'destroy-success))
- (begin
- (spawn-fiber
- (lambda ()
- (perform-operation
- (choice-operation
- (put-operation reply 'resource-pool-destroy-failed)
- (sleep-operation 10)))))
- (loop resources
- available
- waiters
- resources-last-used))))
- (unknown
- (simple-format
- (current-error-port)
- "unrecognised message to ~A resource pool channel: ~A\n"
- name
- unknown)
- (loop resources
- available
- waiters
- resources-last-used)))))
- #:unwind? #t))))
-
- channel))
-
-(define (destroy-resource-pool pool)
- (let ((reply (make-channel)))
- (put-message pool (list 'destroy reply))
- (let ((msg (get-message reply)))
- (unless (eq? msg 'destroy-success)
- (error msg)))))
-
-(define resource-pool-default-timeout
- (make-parameter #f))
-
-(define &resource-pool-timeout
- (make-exception-type '&recource-pool-timeout
- &error
- '()))
-
-(define make-resource-pool-timeout-error
- (record-constructor &resource-pool-timeout))
-
-(define resource-pool-timeout-error?
- (record-predicate &resource-pool-timeout))
-
-(define %resource-pool-timeout-handler
- (make-parameter #f))
-
-(define* (call-with-resource-from-pool pool proc #:key (timeout 'default)
- (timeout-handler (%resource-pool-timeout-handler)))
- "Call PROC with a resource from POOL, blocking until a resource becomes
-available. Return the resource once PROC has returned."
-
- (define timeout-or-default
- (if (eq? timeout 'default)
- (resource-pool-default-timeout)
- timeout))
-
- (let ((resource
- (let ((reply (make-channel)))
- (if timeout-or-default
- (let loop ((start-time (get-internal-real-time)))
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation pool `(checkout ,reply))
- (const #t))
- (wrap-operation (sleep-operation timeout-or-default)
- (const #f))))
-
- (let ((time-remaining
- (- timeout-or-default
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second))))
- (if (> time-remaining 0)
- (let ((response
- (perform-operation
- (choice-operation
- (get-operation reply)
- (wrap-operation (sleep-operation time-remaining)
- (const #f))))))
- (if (or (not response)
- (eq? response 'resource-pool-retry-checkout))
- (if (> (- timeout-or-default
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second))
- 0)
- (loop start-time)
- #f)
- response))
- #f)))
- (let loop ()
- (put-message pool `(checkout ,reply))
- (let ((response (get-message reply)))
- (if (eq? response 'resource-pool-retry-checkout)
- (loop)
- response)))))))
-
- (when (or (not resource)
- (eq? resource 'resource-pool-retry-checkout))
- (when timeout-handler
- (timeout-handler pool proc timeout))
-
- (raise-exception
- (make-resource-pool-timeout-error)))
-
- (with-exception-handler
- (lambda (exception)
- (put-message pool `(return ,resource))
- (raise-exception exception))
- (lambda ()
- (call-with-values
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (proc resource))
- (lambda _
- (backtrace))))
- (lambda vals
- (put-message pool `(return ,resource))
- (apply values vals))))
- #:unwind? #t)))
-
-(define-syntax-rule (with-resource-from-pool pool resource exp ...)
- (call-with-resource-from-pool
- pool
- (lambda (resource) exp ...)))
-
-(define* (resource-pool-stats pool #:key (timeout 5))
- (let ((reply (make-channel))
- (start-time (get-internal-real-time)))
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation pool `(stats ,reply))
- (const #t))
- (wrap-operation (sleep-operation timeout)
- (const #f))))
-
- (let ((time-remaining
- (- timeout
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second))))
- (if (> time-remaining 0)
- (let ((response
- (perform-operation
- (choice-operation
- (get-operation reply)
- (wrap-operation (sleep-operation time-remaining)
- (const #f))))))
- response)
- (raise-exception
- (make-resource-pool-timeout-error))))))
-
-(define (defer-to-parallel-fiber thunk)
- (let ((reply (make-channel)))
- (spawn-fiber
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (put-message reply (cons 'exception exn)))
- (lambda ()
- (call-with-values
- (lambda ()
- (with-throw-handler #t
- thunk
- (lambda _
- (backtrace))))
- (lambda vals
- (put-message reply vals))))
- #:unwind? #t))
- #:parallel? #t)
- reply))
-
-(define (fetch-result-of-defered-thunks . reply-channels)
- (let ((responses (map get-message
- reply-channels)))
- (map
- (match-lambda
- (('exception . exn)
- (raise-exception exn))
- (result
- (apply values result)))
- responses)))
-
-(define-syntax parallel-via-fibers
- (lambda (x)
- (syntax-case x ()
- ((_ e0 ...)
- (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
- #'(let ((tmp0 (defer-to-parallel-fiber
- (lambda ()
- e0)))
- ...)
- (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
-
-(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
- (call-with-values
- (lambda () (parallel-via-fibers e ...))
- (lambda (v ...)
- b0 b1 ...)))
-
-(define (par-mapper' mapper cons)
- (lambda (proc . lists)
- (apply
- fetch-result-of-defered-thunks
- (let loop ((lists lists))
- (match lists
- (((heads tails ...) ...)
- (let ((tail (loop tails))
- (head (defer-to-parallel-fiber
- (lambda ()
- (apply proc heads)))))
- (cons head tail)))
- (_
- '()))))))
-
-(define par-map& (par-mapper' map cons))
-
(define (chunk lst max-length)
- (if (> (length lst)
- max-length)
+ (let ((len (length lst)))
+ (cond
+ ((= 0 len) '())
+ ((> (length lst) max-length)
(call-with-values (lambda ()
(split-at lst max-length))
(lambda (first-lst rest)
(cons first-lst
- (chunk rest max-length))))
- (list lst)))
+ (chunk rest max-length)))))
+ (else
+ (list lst)))))
(define (chunk! lst max-length)
- (if (> (length lst)
- max-length)
+ (let ((len (length lst)))
+ (cond
+ ((= 0 len) '())
+ ((> (length lst) max-length)
(call-with-values (lambda ()
(split-at! lst max-length))
(lambda (first-lst rest)
(cons first-lst
- (chunk! rest max-length))))
- (list lst)))
+ (chunk! rest max-length)))))
+ (else
+ (list lst)))))
(define* (chunk-for-each! proc chunk-size #:rest lsts)
(define (do-one-iteration lsts)
@@ -558,10 +116,10 @@ available. Return the resource once PROC has returned."
(apply proc lsts)))
(let ((list-lengths (map length lsts)))
- (unless (eq? 1 (length (delete-duplicates list-lengths)))
+ (unless (= 1 (length (delete-duplicates list-lengths)))
(error "lists not equal length"))
- (unless (eq? 0 (first list-lengths))
+ (unless (= 0 (first list-lengths))
(do-one-iteration lsts)))
#t)
@@ -606,173 +164,6 @@ available. Return the resource once PROC has returned."
0)))
#:unwind? #t))))
-;; This variant of run-server from the fibers library supports running
-;; multiple servers within one process.
-(define run-server/patched
- (let ((fibers-web-server-module
- (resolve-module '(fibers web server))))
-
- (define set-nonblocking!
- (module-ref fibers-web-server-module 'set-nonblocking!))
-
- (define make-default-socket
- (module-ref fibers-web-server-module 'make-default-socket))
-
- (define socket-loop
- (module-ref fibers-web-server-module 'socket-loop))
-
- (lambda* (handler
- #:key
- (host #f)
- (family AF_INET)
- (addr (if host
- (inet-pton family host)
- INADDR_LOOPBACK))
- (port 8080)
- (socket (make-default-socket family addr port)))
- ;; We use a large backlog by default. If the server is suddenly hit
- ;; with a number of connections on a small backlog, clients won't
- ;; receive confirmation for their SYN, leading them to retry --
- ;; probably successfully, but with a large latency.
- (listen socket 1024)
- (set-nonblocking! socket)
- (sigaction SIGPIPE SIG_IGN)
- (spawn-fiber (lambda () (socket-loop socket handler))))))
-
-(define &port-timeout
- (make-exception-type '&port-timeout
- &external-error
- '(port)))
-
-(define make-port-timeout-error
- (record-constructor &port-timeout))
-
-(define port-timeout-error?
- (record-predicate &port-timeout))
-
-(define &port-read-timeout
- (make-exception-type '&port-read-timeout
- &port-timeout
- '()))
-
-(define make-port-read-timeout-error
- (record-constructor &port-read-timeout))
-
-(define port-read-timeout-error?
- (record-predicate &port-read-timeout))
-
-(define &port-write-timeout
- (make-exception-type '&port-write-timeout
- &port-timeout
- '()))
-
-(define make-port-write-timeout-error
- (record-constructor &port-write-timeout))
-
-(define port-write-timeout-error?
- (record-predicate &port-write-timeout))
-
-;; These procedure are subject to spurious wakeups.
-
-(define (readable? port)
- "Test if PORT is writable."
- (match (select (vector port) #() #() 0)
- ((#() #() #()) #f)
- ((#(_) #() #()) #t)))
-
-(define (writable? port)
- "Test if PORT is writable."
- (match (select #() (vector port) #() 0)
- ((#() #() #()) #f)
- ((#() #(_) #()) #t)))
-
-(define (make-wait-operation ready? schedule-when-ready port
- port-ready-fd this-procedure)
- (make-base-operation
- #f
- (lambda _
- (and (ready? (port-ready-fd port)) values))
- (lambda (flag sched resume)
- (define (commit)
- (match (atomic-box-compare-and-swap! flag 'W 'S)
- ('W (resume values))
- ('C (commit))
- ('S #f)))
- (schedule-when-ready
- sched (port-ready-fd port) commit))))
-
-(define (wait-until-port-readable-operation port)
- "Make an operation that will succeed when PORT is readable."
- (unless (input-port? port)
- (error "refusing to wait forever for input on non-input port"))
- (make-wait-operation readable? schedule-task-when-fd-readable port
- port-read-wait-fd
- wait-until-port-readable-operation))
-
-(define (wait-until-port-writable-operation port)
- "Make an operation that will succeed when PORT is writable."
- (unless (output-port? port)
- (error "refusing to wait forever for output on non-output port"))
- (make-wait-operation writable? schedule-task-when-fd-writable port
- port-write-wait-fd
- wait-until-port-writable-operation))
-
-(define* (with-fibers-port-timeouts thunk
- #:key timeout
- (read-timeout timeout)
- (write-timeout timeout))
- (define (no-fibers-wait port mode timeout)
- (define poll-timeout-ms 200)
-
- ;; When the GC runs, it restarts the poll syscall, but the timeout
- ;; remains unchanged! When the timeout is longer than the time
- ;; between the syscall restarting, I think this renders the
- ;; timeout useless. Therefore, this code uses a short timeout, and
- ;; repeatedly calls poll while watching the clock to see if it has
- ;; timed out overall.
- (let ((timeout-internal
- (+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
- (let loop ((poll-value
- (port-poll port mode poll-timeout-ms)))
- (if (= poll-value 0)
- (if (> (get-internal-real-time)
- timeout-internal)
- (raise-exception
- (if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
- (loop (port-poll port mode poll-timeout-ms)))
- poll-value))))
-
- (parameterize
- ((current-read-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-readable-operation port)
- (wrap-operation
- (sleep-operation read-timeout)
- (lambda ()
- (raise-exception
- (make-port-read-timeout-error thunk port))))))
- (no-fibers-wait port "r" read-timeout))))
- (current-write-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-writable-operation port)
- (wrap-operation
- (sleep-operation write-timeout)
- (lambda ()
- (raise-exception
- (make-port-write-timeout-error thunk port))))))
- (no-fibers-wait port "w" write-timeout)))))
- (thunk)))
-
(define (spawn-port-monitoring-fiber port error-condition)
(spawn-fiber
(lambda ()
@@ -785,25 +176,10 @@ available. Return the resource once PROC has returned."
port exn)
(signal-condition! error-condition))
(lambda ()
- (with-fibers-port-timeouts
+ (with-port-timeouts
(lambda ()
(let ((sock (socket PF_INET SOCK_STREAM 0)))
(connect sock AF_INET INADDR_LOOPBACK port)
(close-port sock)))
#:timeout 20))
#:unwind? #t)))))
-
-;; Copied from (fibers web server)
-(define (call-with-sigint thunk cvar)
- (let ((handler #f))
- (dynamic-wind
- (lambda ()
- (set! handler
- (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
- thunk
- (lambda ()
- (if handler
- ;; restore Scheme handler, SIG_IGN or SIG_DFL.
- (sigaction SIGINT (car handler) (cdr handler))
- ;; restore original C handler.
- (sigaction SIGINT #f))))))