aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/utils/fibers.scm
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator/utils/fibers.scm')
-rw-r--r--guix-build-coordinator/utils/fibers.scm115
1 files changed, 115 insertions, 0 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
new file mode 100644
index 0000000..a39b3bb
--- /dev/null
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -0,0 +1,115 @@
+(define-module (guix-build-coordinator utils fibers)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 threads)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers conditions)
+ #:export (make-worker-thread-channel
+ call-with-worker-thread
+
+ call-with-sigint
+
+ run-server/patched))
+
+(define %worker-thread-args
+ (make-parameter #f))
+
+(define* (make-worker-thread-channel initializer
+ #:key (parallelism 1))
+ "Return a channel used to offload work to a dedicated thread. ARGS are the
+arguments of the worker thread procedure."
+ (parameterize (((@@ (fibers internal) current-fiber) #f))
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda _
+ (let ((args (initializer)))
+ (call-with-new-thread
+ (lambda ()
+ (parameterize ((%worker-thread-args args))
+ (let loop ()
+ (match (get-message channel)
+ (((? channel? reply) . (? procedure? proc))
+ (put-message
+ reply
+ (with-exception-handler
+ (lambda (exn)
+ (cons 'worker-thread-error exn))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "worker-thread: exception: ~A\n" exn)
+ (backtrace)
+ (raise-exception exn))
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (apply proc args))
+ (lambda vals vals)))))
+ #:unwind? #t))))
+ (loop)))))))
+ (iota parallelism))
+ channel)))
+
+(define (call-with-worker-thread channel proc)
+ "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
+If already in the worker thread, call PROC immediately."
+ (let ((args (%worker-thread-args)))
+ (if args
+ (apply proc args)
+ (let ((reply (make-channel)))
+ (put-message channel (cons reply proc))
+ (match (get-message reply)
+ (('worker-thread-error . exn)
+ (raise-exception exn))
+ (result
+ (apply values result)))))))
+
+;; Copied from (fibers web server)
+(define (call-with-sigint thunk cvar)
+ (let ((handler #f))
+ (dynamic-wind
+ (lambda ()
+ (set! handler
+ (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
+ thunk
+ (lambda ()
+ (if handler
+ ;; restore Scheme handler, SIG_IGN or SIG_DFL.
+ (sigaction SIGINT (car handler) (cdr handler))
+ ;; restore original C handler.
+ (sigaction SIGINT #f))))))
+
+;; This variant of run-server from the fibers library supports running
+;; multiple servers within one process.
+(define run-server/patched
+ (let ((fibers-web-server-module
+ (resolve-module '(fibers web server))))
+
+ (define set-nonblocking!
+ (module-ref fibers-web-server-module 'set-nonblocking!))
+
+ (define make-default-socket
+ (module-ref fibers-web-server-module 'make-default-socket))
+
+ (define socket-loop
+ (module-ref fibers-web-server-module 'socket-loop))
+
+ (lambda* (handler
+ #:key
+ (host #f)
+ (family AF_INET)
+ (addr (if host
+ (inet-pton family host)
+ INADDR_LOOPBACK))
+ (port 8080)
+ (socket (make-default-socket family addr port)))
+ ;; We use a large backlog by default. If the server is suddenly hit
+ ;; with a number of connections on a small backlog, clients won't
+ ;; receive confirmation for their SYN, leading them to retry --
+ ;; probably successfully, but with a large latency.
+ (listen socket 1024)
+ (set-nonblocking! socket)
+ (sigaction SIGPIPE SIG_IGN)
+ (spawn-fiber (lambda () (socket-loop socket handler))))))