diff options
Diffstat (limited to 'guix-build-coordinator/utils/fibers.scm')
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm new file mode 100644 index 0000000..a39b3bb --- /dev/null +++ b/guix-build-coordinator/utils/fibers.scm @@ -0,0 +1,115 @@ +(define-module (guix-build-coordinator utils fibers) + #:use-module (ice-9 match) + #:use-module (ice-9 threads) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers conditions) + #:export (make-worker-thread-channel + call-with-worker-thread + + call-with-sigint + + run-server/patched)) + +(define %worker-thread-args + (make-parameter #f)) + +(define* (make-worker-thread-channel initializer + #:key (parallelism 1)) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the worker thread procedure." + (parameterize (((@@ (fibers internal) current-fiber) #f)) + (let ((channel (make-channel))) + (for-each + (lambda _ + (let ((args (initializer))) + (call-with-new-thread + (lambda () + (parameterize ((%worker-thread-args args)) + (let loop () + (match (get-message channel) + (((? channel? reply) . (? procedure? proc)) + (put-message + reply + (with-exception-handler + (lambda (exn) + (cons 'worker-thread-error exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" exn) + (backtrace) + (raise-exception exn)) + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals vals))))) + #:unwind? #t)))) + (loop))))))) + (iota parallelism)) + channel))) + +(define (call-with-worker-thread channel proc) + "Send PROC to the worker thread through CHANNEL. Return the result of PROC. +If already in the worker thread, call PROC immediately." + (let ((args (%worker-thread-args))) + (if args + (apply proc args) + (let ((reply (make-channel))) + (put-message channel (cons reply proc)) + (match (get-message reply) + (('worker-thread-error . exn) + (raise-exception exn)) + (result + (apply values result))))))) + +;; Copied from (fibers web server) +(define (call-with-sigint thunk cvar) + (let ((handler #f)) + (dynamic-wind + (lambda () + (set! handler + (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) + thunk + (lambda () + (if handler + ;; restore Scheme handler, SIG_IGN or SIG_DFL. + (sigaction SIGINT (car handler) (cdr handler)) + ;; restore original C handler. + (sigaction SIGINT #f)))))) + +;; This variant of run-server from the fibers library supports running +;; multiple servers within one process. +(define run-server/patched + (let ((fibers-web-server-module + (resolve-module '(fibers web server)))) + + (define set-nonblocking! + (module-ref fibers-web-server-module 'set-nonblocking!)) + + (define make-default-socket + (module-ref fibers-web-server-module 'make-default-socket)) + + (define socket-loop + (module-ref fibers-web-server-module 'socket-loop)) + + (lambda* (handler + #:key + (host #f) + (family AF_INET) + (addr (if host + (inet-pton family host) + INADDR_LOOPBACK)) + (port 8080) + (socket (make-default-socket family addr port))) + ;; We use a large backlog by default. If the server is suddenly hit + ;; with a number of connections on a small backlog, clients won't + ;; receive confirmation for their SYN, leading them to retry -- + ;; probably successfully, but with a large latency. + (listen socket 1024) + (set-nonblocking! socket) + (sigaction SIGPIPE SIG_IGN) + (spawn-fiber (lambda () (socket-loop socket handler)))))) |