diff options
Diffstat (limited to 'guix-data-service/utils.scm')
-rw-r--r-- | guix-data-service/utils.scm | 448 |
1 files changed, 412 insertions, 36 deletions
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index d01fb5c..393db95 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -17,7 +17,9 @@ (define-module (guix-data-service utils) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) #:use-module (srfi srfi-11) + #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 atomic) @@ -27,6 +29,8 @@ #:use-module (ice-9 ports internal) #:use-module (ice-9 suspendable-ports) #:use-module (lzlib) + #:use-module ((guix build syscalls) + #:select (set-thread-name)) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers operations) @@ -40,12 +44,26 @@ resource-pool-default-timeout %resource-pool-timeout-handler + resource-pool-timeout-error? make-resource-pool destroy-resource-pool call-with-resource-from-pool with-resource-from-pool resource-pool-stats + make-worker-thread-channel + %worker-thread-default-timeout + call-with-worker-thread + worker-thread-timeout-error? + + fiberize + + fibers-delay + fibers-force + + fibers-batch-for-each + fibers-for-each + parallel-via-fibers par-map& letpar& @@ -80,6 +98,12 @@ (define-syntax-rule (prevent-inlining-for-tests var) (set! var var)) +(define-record-type <resource-pool> + (make-resource-pool-record name channel) + resource-pool? + (name resource-pool-name) + (channel resource-pool-channel)) + (define* (make-resource-pool initializer max-size #:key (min-size max-size) (idle-seconds #f) @@ -87,6 +111,7 @@ (duration-logger (const #f)) destructor lifetime + scheduler (name "unnamed")) (define (initializer/safe) (with-exception-handler @@ -135,6 +160,13 @@ (checkout-failure-count 0)) (spawn-fiber (lambda () + (when idle-seconds + (spawn-fiber + (lambda () + (while #t + (sleep idle-seconds) + (put-message channel '(check-for-idle-resources)))))) + (while #t (with-exception-handler (lambda (exn) @@ -149,15 +181,7 @@ (waiters '()) (resources-last-used '())) - (match (if idle-seconds - (perform-operation - (choice-operation - (get-operation channel) - (wrap-operation - ;; TODO Do something smarter - (sleep-operation 10) - (const '(check-for-idle-resources))))) - (get-message channel)) + (match (get-message channel) (('checkout reply) (if (null? available) (if (= (length resources) max-size) @@ -244,13 +268,15 @@ (waiters . ,(length waiters)) (checkout-failure-count . ,checkout-failure-count)))) - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 0.2) - (const #f))))) + (spawn-fiber + (lambda () + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply stats) + (const #t)) + (wrap-operation (sleep-operation 1) + (const #f))))))) (loop resources available @@ -317,13 +343,16 @@ available waiters resources-last-used))))) - #:unwind? #t)))) + #:unwind? #t))) + (or scheduler + (current-scheduler))) - channel)) + (make-resource-pool-record name channel))) (define (destroy-resource-pool pool) (let ((reply (make-channel))) - (put-message pool (list 'destroy reply)) + (put-message (resource-pool-channel pool) + (list 'destroy reply)) (let ((msg (get-message reply))) (unless (eq? msg 'destroy-success) (error msg))))) @@ -334,7 +363,7 @@ (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout &error - '())) + '(name))) (define make-resource-pool-timeout-error (record-constructor &resource-pool-timeout)) @@ -362,7 +391,8 @@ available. Return the resource once PROC has returned." (perform-operation (choice-operation (wrap-operation - (put-operation pool `(checkout ,reply)) + (put-operation (resource-pool-channel pool) + `(checkout ,reply)) (const #t)) (wrap-operation (sleep-operation timeout-or-default) (const #f)))) @@ -391,7 +421,8 @@ available. Return the resource once PROC has returned." response)) #f))) (let loop () - (put-message pool `(checkout ,reply)) + (put-message (resource-pool-channel pool) + `(checkout ,reply)) (let ((response (get-message reply))) (if (eq? response 'resource-pool-retry-checkout) (loop) @@ -403,11 +434,12 @@ available. Return the resource once PROC has returned." (timeout-handler pool proc timeout)) (raise-exception - (make-resource-pool-timeout-error))) + (make-resource-pool-timeout-error (resource-pool-name pool)))) (with-exception-handler (lambda (exception) - (put-message pool `(return ,resource)) + (put-message (resource-pool-channel pool) + `(return ,resource)) (raise-exception exception)) (lambda () (call-with-values @@ -418,14 +450,15 @@ available. Return the resource once PROC has returned." (lambda _ (backtrace)))) (lambda vals - (put-message pool `(return ,resource)) + (put-message (resource-pool-channel pool) + `(return ,resource)) (apply values vals)))) #:unwind? #t))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool - pool - (lambda (resource) exp ...))) + pool + (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) (let ((reply (make-channel)) @@ -433,10 +466,13 @@ available. Return the resource once PROC has returned." (perform-operation (choice-operation (wrap-operation - (put-operation pool `(stats ,reply)) + (put-operation (resource-pool-channel pool) + `(stats ,reply)) (const #t)) (wrap-operation (sleep-operation timeout) - (const #f)))) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error)))))) (let ((time-remaining (- timeout @@ -444,16 +480,356 @@ available. Return the resource once PROC has returned." start-time) internal-time-units-per-second)))) (if (> time-remaining 0) - (let ((response - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (const #f)))))) - response) + (perform-operation + (choice-operation + (get-operation reply) + (wrap-operation (sleep-operation time-remaining) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error)))))) (raise-exception (make-resource-pool-timeout-error)))))) +(define %worker-thread-args + (make-parameter #f)) + +(define* (make-worker-thread-channel initializer + #:key (parallelism 1) + (delay-logger (lambda _ #f)) + (duration-logger (const #f)) + destructor + lifetime + (log-exception? (const #t)) + (expire-on-exception? #f) + (name "unnamed")) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the worker thread procedure." + (define thread-proc-vector + (make-vector parallelism #f)) + + (define (initializer/safe) + (let ((args + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running initializer in worker thread (~A): ~A:\n ~A\n" + name + initializer + exn) + #f) + (lambda () + (with-throw-handler #t + initializer + (lambda args + (backtrace)))) + #:unwind? #t))) + + (if args + args + ;; never give up, just keep retrying + (begin + (sleep 1) + (initializer/safe))))) + + (define (destructor/safe args) + (let ((success? + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running destructor in worker thread (~A): ~A:\n ~A\n" + name + destructor + exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (apply destructor args) + #t) + (lambda _ + (backtrace)))) + #:unwind? #t))) + + (or success? + #t + (begin + (sleep 1) + (destructor/safe args))))) + + (define (process thread-index channel args) + (let loop ((current-lifetime lifetime)) + (let ((exception? + (match (get-message channel) + (((? channel? reply) sent-time (? procedure? proc)) + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay + internal-time-units-per-second)) + + (let* ((start-time (get-internal-real-time)) + (response + (with-exception-handler + (lambda (exn) + (list 'worker-thread-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (vector-set! thread-proc-vector + thread-index + proc) + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (start-stack + 'worker-thread + (apply proc args))) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))) + (lambda args + (when (match args + (('%exception exn) + (log-exception? exn)) + (_ #t)) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" args) + (backtrace))))) + #:unwind? #t))) + (put-message reply + response) + + (vector-set! thread-proc-vector + thread-index + #f) + + (match response + (('worker-thread-error duration _) + (when duration-logger + (duration-logger duration proc)) + #t) + ((duration . _) + (when duration-logger + (duration-logger duration proc)) + #f)))))))) + (unless (and expire-on-exception? + exception?) + (if (number? current-lifetime) + (unless (< current-lifetime 0) + (loop (if current-lifetime + (- current-lifetime 1) + #f))) + (loop #f)))))) + + (let ((channel (make-channel))) + (for-each + (lambda (thread-index) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append + name " w t " + (number->string thread-index)))) + (const #t)) + + (let init ((args (initializer/safe))) + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "worker-thread-channel: exception: ~A\n" exn)) + (lambda () + (parameterize ((%worker-thread-args args)) + (process thread-index channel args))) + #:unwind? #t) + + (when destructor + (destructor/safe args)) + + (init (initializer/safe)))))) + (iota parallelism)) + + (values channel + thread-proc-vector))) + +(define &worker-thread-timeout + (make-exception-type '&worker-thread-timeout + &error + '())) + +(define make-worker-thread-timeout-error + (record-constructor &worker-thread-timeout)) + +(define worker-thread-timeout-error? + (record-predicate &worker-thread-timeout)) + +(define %worker-thread-default-timeout + (make-parameter 30)) + +(define* (call-with-worker-thread channel proc #:key duration-logger + (timeout (%worker-thread-default-timeout))) + "Send PROC to the worker thread through CHANNEL. Return the result of PROC. +If already in the worker thread, call PROC immediately." + (let ((args (%worker-thread-args))) + (if args + (apply proc args) + (let* ((reply (make-channel)) + (operation-success? + (perform-operation + (let ((put + (wrap-operation + (put-operation channel + (list reply + (get-internal-real-time) + proc)) + (const #t)))) + + (if timeout + (choice-operation + put + (wrap-operation (sleep-operation timeout) + (const #f))) + put))))) + + (unless operation-success? + (raise-exception + (make-worker-thread-timeout-error))) + + (match (get-message reply) + (('worker-thread-error duration exn) + (when duration-logger + (duration-logger duration)) + (raise-exception exn)) + ((duration . result) + (when duration-logger + (duration-logger duration)) + (apply values result))))))) + +(define* (fiberize proc #:key (parallelism 1)) + (let ((channel (make-channel))) + (for-each + (lambda _ + (spawn-fiber + (lambda () + (while #t + (let ((reply-channel args (car+cdr + (get-message channel)))) + (put-message + reply-channel + (with-exception-handler + (lambda (exn) + (cons 'exception exn)) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals + (cons 'result vals)))) + (lambda _ + (backtrace)))) + #:unwind? #t))))) + #:parallel? #t)) + (iota parallelism)) + + (lambda args + (let ((reply-channel (make-channel))) + (put-message channel (cons reply-channel args)) + (match (get-message reply-channel) + (('result . vals) (apply values vals)) + (('exception . exn) (raise-exception exn))))))) + +(define-record-type <fibers-promise> + (make-fibers-promise thunk values-box evaluated-condition) + fibers-promise? + (thunk fibers-promise-thunk) + (values-box fibers-promise-values-box) + (evaluated-condition fibers-promise-evaluated-condition)) + +(define (fibers-delay thunk) + (make-fibers-promise + thunk + (make-atomic-box #f) + (make-condition))) + +(define (fibers-force fp) + (let ((res (atomic-box-compare-and-swap! + (fibers-promise-values-box fp) + #f + 'started))) + (if (eq? #f res) + (call-with-values + (lambda () + (with-exception-handler + (lambda (exn) + (atomic-box-set! (fibers-promise-values-box fp) + exn) + (signal-condition! + (fibers-promise-evaluated-condition fp)) + (raise-exception exn)) + (fibers-promise-thunk fp) + #:unwind? #t)) + (lambda vals + (atomic-box-set! (fibers-promise-values-box fp) + vals) + (signal-condition! + (fibers-promise-evaluated-condition fp)) + (apply values vals))) + (if (eq? res 'started) + (begin + (wait (fibers-promise-evaluated-condition fp)) + (let ((result (atomic-box-ref (fibers-promise-values-box fp)))) + (if (exception? result) + (raise-exception result) + (apply values result)))) + (if (exception? res) + (raise-exception res) + (apply values res)))))) + +(define (fibers-batch-for-each proc batch-size . lists) + ;; Like split-at, but don't care about the order of the resulting lists, and + ;; don't error if the list is shorter than i elements + (define (split-at* lst i) + (let lp ((l lst) (n i) (acc '())) + (if (or (<= n 0) (null? l)) + (values (reverse! acc) l) + (lp (cdr l) (- n 1) (cons (car l) acc))))) + + ;; As this can be called with lists with tens of thousands of items in them, + ;; batch the + (define (get-batch lists) + (let ((split-lists + (map (lambda (lst) + (let ((batch rest (split-at* lst batch-size))) + (cons batch rest))) + lists))) + (values (map car split-lists) + (map cdr split-lists)))) + + (let loop ((lists lists)) + (call-with-values + (lambda () + (get-batch lists)) + (lambda (batch rest) + (apply par-map& proc batch) + (unless (null? (car rest)) + (loop rest))))) + *unspecified*) + +(define (fibers-for-each proc . lists) + (apply fibers-batch-for-each proc 20 lists)) + (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) (spawn-fiber |