aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2024-10-31 16:43:13 +0000
committerChristopher Baines <mail@cbaines.net>2024-10-31 16:43:13 +0000
commit55af7c82e8e69954bc9f9625a62426dc10997919 (patch)
tree5961e3388d22769a427f190712cbdbb1aa5682b7
parentf8ac6e3dd920d3dd7c05c1d57c92865f1e340313 (diff)
downloaddata-service-55af7c82e8e69954bc9f9625a62426dc10997919.tar
data-service-55af7c82e8e69954bc9f9625a62426dc10997919.tar.gz
Add new fibers utilities
The new fibers-map uses the same batching approach that fibers-for-each uses, and fibers-map-with-progress allows tracking on the results while the map is happening.
-rw-r--r--guix-data-service/utils.scm147
1 files changed, 115 insertions, 32 deletions
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index 5f70f38..25624e4 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -65,10 +65,13 @@
fibers-batch-for-each
fibers-for-each
+ fibers-batch-map
+ fibers-map
parallel-via-fibers
par-map&
letpar&
+ fibers-map-with-progress
chunk
chunk!
@@ -805,38 +808,24 @@ If already in the worker thread, call PROC immediately."
(atomic-box-set! (fibers-promise-values-box fp)
#f))
-(define (fibers-batch-for-each proc batch-size . lists)
- ;; Like split-at, but don't care about the order of the resulting lists, and
- ;; don't error if the list is shorter than i elements
- (define (split-at* lst i)
- (let lp ((l lst) (n i) (acc '()))
- (if (or (<= n 0) (null? l))
- (values (reverse! acc) l)
- (lp (cdr l) (- n 1) (cons (car l) acc)))))
-
- ;; As this can be called with lists with tens of thousands of items in them,
- ;; batch the
- (define (get-batch lists)
- (let ((split-lists
- (map (lambda (lst)
- (let ((batch rest (split-at* lst batch-size)))
- (cons batch rest)))
- lists)))
- (values (map car split-lists)
- (map cdr split-lists))))
-
- (let loop ((lists lists))
- (call-with-values
- (lambda ()
- (get-batch lists))
- (lambda (batch rest)
- (apply par-map& proc batch)
- (unless (null? (car rest))
- (loop rest)))))
- *unspecified*)
-
-(define (fibers-for-each proc . lists)
- (apply fibers-batch-for-each proc 20 lists))
+;; Like split-at, but don't care about the order of the resulting lists, and
+;; don't error if the list is shorter than i elements
+(define (split-at* lst i)
+ (let lp ((l lst) (n i) (acc '()))
+ (if (or (<= n 0) (null? l))
+ (values (reverse! acc) l)
+ (lp (cdr l) (- n 1) (cons (car l) acc)))))
+
+;; As this can be called with lists with tens of thousands of items in them,
+;; batch the
+(define (get-batch batch-size lists)
+ (let ((split-lists
+ (map (lambda (lst)
+ (let ((batch rest (split-at* lst batch-size)))
+ (cons batch rest)))
+ lists)))
+ (values (map car split-lists)
+ (map cdr split-lists))))
(define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel)))
@@ -869,6 +858,50 @@ If already in the worker thread, call PROC immediately."
(apply values result)))
responses)))
+(define (fibers-batch-map proc batch-size . lists)
+ (let loop ((lists lists)
+ (result '()))
+ (let ((batch
+ rest
+ (get-batch batch-size lists)))
+ (if (any null? batch)
+ result
+ (let ((response-channels
+ (apply map
+ (lambda args
+ (defer-to-parallel-fiber
+ (lambda ()
+ (apply proc args))))
+ batch)))
+ (loop rest
+ (append! result
+ (apply fetch-result-of-defered-thunks
+ response-channels))))))))
+
+(define (fibers-map proc . lists)
+ (apply fibers-batch-map proc 20 lists))
+
+(define (fibers-batch-for-each proc batch-size . lists)
+ (let loop ((lists lists))
+ (let ((batch
+ rest
+ (get-batch batch-size lists)))
+ (if (any null? batch)
+ *unspecified*
+ (let ((response-channels
+ (apply map
+ (lambda args
+ (defer-to-parallel-fiber
+ (lambda ()
+ (apply proc args))))
+ batch)))
+ (apply fetch-result-of-defered-thunks
+ response-channels)
+ (loop rest))))))
+
+(define (fibers-for-each proc . lists)
+ (apply fibers-batch-for-each proc 20 lists))
+
(define-syntax parallel-via-fibers
(lambda (x)
(syntax-case x ()
@@ -903,6 +936,56 @@ If already in the worker thread, call PROC immediately."
(define par-map& (par-mapper' map cons))
+(define* (fibers-map-with-progress proc lists #:key report)
+ (let loop ((channels-to-results
+ (apply map
+ (lambda args
+ (cons (defer-to-parallel-fiber
+ (lambda ()
+ (apply proc args)))
+ #f))
+ lists)))
+ (let ((active-channels
+ (filter-map car channels-to-results)))
+ (when report
+ (report (apply map
+ list
+ (map cdr channels-to-results)
+ lists)))
+ (if (null? active-channels)
+ (map
+ (match-lambda
+ ((#f . ('exception . exn))
+ (raise-exception exn))
+ ((#f . ('result . val))
+ val))
+ channels-to-results)
+ (loop
+ (perform-operation
+ (apply
+ choice-operation
+ (filter-map
+ (lambda (p)
+ (match p
+ ((channel . _)
+ (if channel
+ (wrap-operation
+ (get-operation channel)
+ (lambda (result)
+ (map (match-lambda
+ ((c . r)
+ (if (eq? channel c)
+ (cons #f
+ (match result
+ (('exception . exn)
+ result)
+ (_
+ (cons 'result result))))
+ (cons c r))))
+ channels-to-results)))
+ #f))))
+ channels-to-results))))))))
+
(define (chunk lst max-length)
(if (> (length lst)
max-length)