aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/utils
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator/utils')
-rw-r--r--guix-build-coordinator/utils/fibers.scm58
1 files changed, 57 insertions, 1 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index a8b6738..5481191 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -10,7 +10,9 @@
call-with-sigint
- run-server/patched))
+ run-server/patched
+
+ letpar&))
(define %worker-thread-args
(make-parameter #f))
@@ -147,3 +149,57 @@ If already in the worker thread, call PROC immediately."
(set-nonblocking! socket)
(sigaction SIGPIPE SIG_IGN)
(spawn-fiber (lambda () (socket-loop socket handler))))))
+
+(define (defer-to-fiber thunk)
+ (let ((reply (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (put-message
+ reply
+ (with-exception-handler
+ (lambda (exn)
+ (cons 'worker-fiber-error exn))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "worker fiber: exception: ~A\n"
+ exn)
+ (backtrace)
+ (raise-exception exn))
+ (lambda ()
+ (call-with-values
+ thunk
+ (lambda vals
+ vals)))))
+ #:unwind? #t)))
+ #:parallel? #t)
+ reply))
+
+(define (fetch-result-of-defered-thunks . reply-channels)
+ (let ((responses (map get-message reply-channels)))
+ (map
+ (match-lambda
+ (('worker-thread-error . exn)
+ (raise-exception exn))
+ (result
+ (apply values result)))
+ responses)))
+
+(define-syntax parallel-via-fibers
+ (lambda (x)
+ (syntax-case x ()
+ ((_ e0 ...)
+ (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
+ #'(let ((tmp0 (defer-to-fiber
+ (lambda ()
+ e0)))
+ ...)
+ (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
+
+(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
+ (call-with-values
+ (lambda () (parallel-via-fibers e ...))
+ (lambda (v ...)
+ b0 b1 ...)))