(define-module (guix-build-coordinator utils fibers) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers conditions) #:export (make-worker-thread-channel call-with-worker-thread call-with-sigint run-server/patched)) (define %worker-thread-args (make-parameter #f)) (define* (make-worker-thread-channel initializer #:key (parallelism 1) (delay-logger (lambda _ #f)) destructor lifetime) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the worker thread procedure." (parameterize (((@@ (fibers internal) current-fiber) #f)) (let ((channel (make-channel))) (for-each (lambda _ (call-with-new-thread (lambda () (let init ((args (initializer))) (parameterize ((%worker-thread-args args)) (let loop ((current-lifetime lifetime)) (match (get-message channel) (((? channel? reply) sent-time (? procedure? proc)) (let ((time-delay (- (get-internal-real-time) sent-time))) (delay-logger (/ time-delay internal-time-units-per-second)) (put-message reply (with-exception-handler (lambda (exn) (cons 'worker-thread-error exn)) (lambda () (with-throw-handler #t (lambda () (call-with-values (lambda () (apply proc args)) (lambda vals vals))) (lambda (key . args) (simple-format (current-error-port) "worker-thread: exception: ~A ~A\n" key args) (backtrace)))) #:unwind? #t))))) (if (number? current-lifetime) (unless (< current-lifetime 0) (loop (if current-lifetime (- current-lifetime 1) #f))) (loop #f)))) (when destructor (apply destructor args)) (init (initializer)))))) (iota parallelism)) channel))) (define (call-with-worker-thread channel proc) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." (let ((args (%worker-thread-args))) (if args (apply proc args) (let ((reply (make-channel))) (put-message channel (list reply (get-internal-real-time) proc)) (match (get-message reply) (('worker-thread-error . exn) (raise-exception exn)) (result (apply values result))))))) ;; Copied from (fibers web server) (define (call-with-sigint thunk cvar) (let ((handler #f)) (dynamic-wind (lambda () (set! handler (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) thunk (lambda () (if handler ;; restore Scheme handler, SIG_IGN or SIG_DFL. (sigaction SIGINT (car handler) (cdr handler)) ;; restore original C handler. (sigaction SIGINT #f)))))) ;; This variant of run-server from the fibers library supports running ;; multiple servers within one process. (define run-server/patched (let ((fibers-web-server-module (resolve-module '(fibers web server)))) (define set-nonblocking! (module-ref fibers-web-server-module 'set-nonblocking!)) (define make-default-socket (module-ref fibers-web-server-module 'make-default-socket)) (define socket-loop (module-ref fibers-web-server-module 'socket-loop)) (lambda* (handler #:key (host #f) (family AF_INET) (addr (if host (inet-pton family host) INADDR_LOOPBACK)) (port 8080) (socket (make-default-socket family addr port))) ;; We use a large backlog by default. If the server is suddenly hit ;; with a number of connections on a small backlog, clients won't ;; receive confirmation for their SYN, leading them to retry -- ;; probably successfully, but with a large latency. (listen socket 1024) (set-nonblocking! socket) (sigaction SIGPIPE SIG_IGN) (spawn-fiber (lambda () (socket-loop socket handler))))))