aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.dir-locals.el1
-rw-r--r--guix-build-coordinator/utils/fibers.scm58
2 files changed, 58 insertions, 1 deletions
diff --git a/.dir-locals.el b/.dir-locals.el
index 4cbc8f8..60601df 100644
--- a/.dir-locals.el
+++ b/.dir-locals.el
@@ -10,5 +10,6 @@
(eval put 'with-db-worker-thread 'scheme-indent-function 1)
(eval put 'with-time-logging 'scheme-indent-function 1)
(eval put 'with-timeout 'scheme-indent-function 1)
+ (eval put 'letpar& 'scheme-indent-function 1)
(eval . (put 'call-with-lzip-output-port 'scheme-indent-function 1))
(eval . (put 'with-store 'scheme-indent-function 1))))
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index a8b6738..5481191 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -10,7 +10,9 @@
call-with-sigint
- run-server/patched))
+ run-server/patched
+
+ letpar&))
(define %worker-thread-args
(make-parameter #f))
@@ -147,3 +149,57 @@ If already in the worker thread, call PROC immediately."
(set-nonblocking! socket)
(sigaction SIGPIPE SIG_IGN)
(spawn-fiber (lambda () (socket-loop socket handler))))))
+
+(define (defer-to-fiber thunk)
+ (let ((reply (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (put-message
+ reply
+ (with-exception-handler
+ (lambda (exn)
+ (cons 'worker-fiber-error exn))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "worker fiber: exception: ~A\n"
+ exn)
+ (backtrace)
+ (raise-exception exn))
+ (lambda ()
+ (call-with-values
+ thunk
+ (lambda vals
+ vals)))))
+ #:unwind? #t)))
+ #:parallel? #t)
+ reply))
+
+(define (fetch-result-of-defered-thunks . reply-channels)
+ (let ((responses (map get-message reply-channels)))
+ (map
+ (match-lambda
+ (('worker-thread-error . exn)
+ (raise-exception exn))
+ (result
+ (apply values result)))
+ responses)))
+
+(define-syntax parallel-via-fibers
+ (lambda (x)
+ (syntax-case x ()
+ ((_ e0 ...)
+ (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
+ #'(let ((tmp0 (defer-to-fiber
+ (lambda ()
+ e0)))
+ ...)
+ (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
+
+(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
+ (call-with-values
+ (lambda () (parallel-via-fibers e ...))
+ (lambda (v ...)
+ b0 b1 ...)))