diff options
author | Christopher Baines <mail@cbaines.net> | 2020-10-03 21:32:46 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2020-10-03 21:32:46 +0100 |
commit | e2e55c69de1eceb77998ab059a943711ef7779fd (patch) | |
tree | 48cb91e3130c3cc8718529cbc699fc09df0ea94d /guix-data-service/utils.scm | |
parent | 18b6dd9e6d4463e47ce457187d956c1c3dd8dd08 (diff) | |
download | data-service-e2e55c69de1eceb77998ab059a943711ef7779fd.tar data-service-e2e55c69de1eceb77998ab059a943711ef7779fd.tar.gz |
Rework the shortlived PostgreSQL specific connection channel
In to a generic thing more like (ice-9 futures). Including copying some bits
from the (ice-9 threads) module and adapting them to work with this fibers
approach, rather than futures. The advantage being that using fibers channels
doesn't block the threads being used by fibers, whereas futures would.
Diffstat (limited to 'guix-data-service/utils.scm')
-rw-r--r-- | guix-data-service/utils.scm | 102 |
1 files changed, 101 insertions, 1 deletions
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index 738f839..855c819 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -17,9 +17,18 @@ (define-module (guix-data-service utils) #:use-module (srfi srfi-11) + #:use-module (ice-9 match) + #:use-module (ice-9 threads) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers conditions) #:export (call-with-time-logging with-time-logging - prevent-inlining-for-tests)) + prevent-inlining-for-tests + + parallel-via-thread-pool-channel + par-map& + letpar&)) (define (call-with-time-logging action thunk) (simple-format #t "debug: Starting ~A\n" action) @@ -38,3 +47,94 @@ (define-syntax-rule (prevent-inlining-for-tests var) (set! var var)) + + +(define* (make-thread-pool-channel #:key (threads 8)) + (parameterize (((@@ (fibers internal) current-fiber) #f)) + (let ((channel (make-channel))) + (for-each + (lambda _ + (call-with-new-thread + (lambda () + (let loop () + (match (get-message channel) + (((? channel? reply) . (? procedure? proc)) + (put-message + reply + (with-exception-handler + (lambda (exn) + (cons 'worker-thread-error exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "worker thread: exception: ~A\n" + exn) + (backtrace) + (raise-exception exn)) + (lambda () + (call-with-values + proc + (lambda vals + vals))))) + #:unwind? #t)) + (loop)) + (_ #f)))))) + (iota threads)) + channel))) + +(define %thread-pool-mutex (make-mutex)) +(define %thread-pool-channel #f) + +(define (make-thread-pool-channel!') + (with-mutex %thread-pool-mutex + (unless %thread-pool-channel + (set! %thread-pool-channel (make-thread-pool-channel)) + (set! make-thread-pool-channel! (lambda () #t))))) + +(define make-thread-pool-channel! + (lambda () (make-thread-pool-channel!'))) + +(define (defer-to-thread-pool-channel thunk) + (make-thread-pool-channel!) + (let ((reply (make-channel))) + (put-message %thread-pool-channel (cons reply thunk)) + reply)) + +(define (fetch-result-of-defered-thunk reply-channel) + (match (get-message reply-channel) + (('worker-thread-error . exn) + (raise-exception exn)) + (result + (apply values result)))) + +(define-syntax parallel-via-thread-pool-channel + (lambda (x) + (syntax-case x () + ((_ e0 ...) + (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) + #'(let ((tmp0 (defer-to-thread-pool-channel + (lambda () + e0))) + ...) + (values (fetch-result-of-defered-thunk tmp0) ...))))))) + +(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) + (call-with-values + (lambda () (parallel-via-thread-pool-channel e ...)) + (lambda (v ...) + b0 b1 ...))) + +(define (par-mapper' mapper cons) + (lambda (proc . lists) + (let loop ((lists lists)) + (match lists + (((heads tails ...) ...) + (let ((tail (defer-to-thread-pool-channel (loop tails))) + (head (apply proc heads))) + (cons head (fetch-result-of-defered-thunk tail)))) + (_ + '()))))) + +(define par-map& (par-mapper' map cons)) |