diff options
author | Christopher Baines <mail@cbaines.net> | 2024-10-31 16:43:13 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2024-10-31 16:43:13 +0000 |
commit | 55af7c82e8e69954bc9f9625a62426dc10997919 (patch) | |
tree | 5961e3388d22769a427f190712cbdbb1aa5682b7 | |
parent | f8ac6e3dd920d3dd7c05c1d57c92865f1e340313 (diff) | |
download | data-service-55af7c82e8e69954bc9f9625a62426dc10997919.tar data-service-55af7c82e8e69954bc9f9625a62426dc10997919.tar.gz |
Add new fibers utilities
The new fibers-map uses the same batching approach that fibers-for-each uses,
and fibers-map-with-progress allows tracking on the results while the map is
happening.
-rw-r--r-- | guix-data-service/utils.scm | 147 |
1 files changed, 115 insertions, 32 deletions
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index 5f70f38..25624e4 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -65,10 +65,13 @@ fibers-batch-for-each fibers-for-each + fibers-batch-map + fibers-map parallel-via-fibers par-map& letpar& + fibers-map-with-progress chunk chunk! @@ -805,38 +808,24 @@ If already in the worker thread, call PROC immediately." (atomic-box-set! (fibers-promise-values-box fp) #f)) -(define (fibers-batch-for-each proc batch-size . lists) - ;; Like split-at, but don't care about the order of the resulting lists, and - ;; don't error if the list is shorter than i elements - (define (split-at* lst i) - (let lp ((l lst) (n i) (acc '())) - (if (or (<= n 0) (null? l)) - (values (reverse! acc) l) - (lp (cdr l) (- n 1) (cons (car l) acc))))) - - ;; As this can be called with lists with tens of thousands of items in them, - ;; batch the - (define (get-batch lists) - (let ((split-lists - (map (lambda (lst) - (let ((batch rest (split-at* lst batch-size))) - (cons batch rest))) - lists))) - (values (map car split-lists) - (map cdr split-lists)))) - - (let loop ((lists lists)) - (call-with-values - (lambda () - (get-batch lists)) - (lambda (batch rest) - (apply par-map& proc batch) - (unless (null? (car rest)) - (loop rest))))) - *unspecified*) - -(define (fibers-for-each proc . lists) - (apply fibers-batch-for-each proc 20 lists)) +;; Like split-at, but don't care about the order of the resulting lists, and +;; don't error if the list is shorter than i elements +(define (split-at* lst i) + (let lp ((l lst) (n i) (acc '())) + (if (or (<= n 0) (null? l)) + (values (reverse! acc) l) + (lp (cdr l) (- n 1) (cons (car l) acc))))) + +;; As this can be called with lists with tens of thousands of items in them, +;; batch the +(define (get-batch batch-size lists) + (let ((split-lists + (map (lambda (lst) + (let ((batch rest (split-at* lst batch-size))) + (cons batch rest))) + lists))) + (values (map car split-lists) + (map cdr split-lists)))) (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) @@ -869,6 +858,50 @@ If already in the worker thread, call PROC immediately." (apply values result))) responses))) +(define (fibers-batch-map proc batch-size . lists) + (let loop ((lists lists) + (result '())) + (let ((batch + rest + (get-batch batch-size lists))) + (if (any null? batch) + result + (let ((response-channels + (apply map + (lambda args + (defer-to-parallel-fiber + (lambda () + (apply proc args)))) + batch))) + (loop rest + (append! result + (apply fetch-result-of-defered-thunks + response-channels)))))))) + +(define (fibers-map proc . lists) + (apply fibers-batch-map proc 20 lists)) + +(define (fibers-batch-for-each proc batch-size . lists) + (let loop ((lists lists)) + (let ((batch + rest + (get-batch batch-size lists))) + (if (any null? batch) + *unspecified* + (let ((response-channels + (apply map + (lambda args + (defer-to-parallel-fiber + (lambda () + (apply proc args)))) + batch))) + (apply fetch-result-of-defered-thunks + response-channels) + (loop rest)))))) + +(define (fibers-for-each proc . lists) + (apply fibers-batch-for-each proc 20 lists)) + (define-syntax parallel-via-fibers (lambda (x) (syntax-case x () @@ -903,6 +936,56 @@ If already in the worker thread, call PROC immediately." (define par-map& (par-mapper' map cons)) +(define* (fibers-map-with-progress proc lists #:key report) + (let loop ((channels-to-results + (apply map + (lambda args + (cons (defer-to-parallel-fiber + (lambda () + (apply proc args))) + #f)) + lists))) + (let ((active-channels + (filter-map car channels-to-results))) + (when report + (report (apply map + list + (map cdr channels-to-results) + lists))) + (if (null? active-channels) + (map + (match-lambda + ((#f . ('exception . exn)) + (raise-exception exn)) + ((#f . ('result . val)) + val)) + channels-to-results) + (loop + (perform-operation + (apply + choice-operation + (filter-map + (lambda (p) + (match p + ((channel . _) + (if channel + (wrap-operation + (get-operation channel) + (lambda (result) + (map (match-lambda + ((c . r) + (if (eq? channel c) + (cons #f + (match result + (('exception . exn) + result) + (_ + (cons 'result result)))) + (cons c r)))) + channels-to-results))) + #f)))) + channels-to-results)))))))) + (define (chunk lst max-length) (if (> (length lst) max-length) |