(define-module (guix-build-coordinator utils fibers) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers conditions) #:use-module (guix-build-coordinator utils) #:export (make-worker-thread-channel call-with-worker-thread call-with-sigint run-server/patched letpar&)) (define %worker-thread-args (make-parameter #f)) (define* (make-worker-thread-channel initializer #:key (parallelism 1) (delay-logger (lambda _ #f)) destructor lifetime (log-exception? (const #t))) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the worker thread procedure." (let ((channel (make-channel))) (for-each (lambda _ (call-with-new-thread (lambda () (let init ((args (initializer))) (parameterize ((%worker-thread-args args)) (let loop ((current-lifetime lifetime)) (match (get-message channel) (((? channel? reply) sent-time (? procedure? proc)) (let ((time-delay (- (get-internal-real-time) sent-time))) (delay-logger (/ time-delay internal-time-units-per-second)) (put-message reply (let ((start-time (get-internal-real-time))) (with-exception-handler (lambda (exn) (list 'worker-thread-error (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) exn)) (lambda () (with-throw-handler #t (lambda () (call-with-values (lambda () (apply proc args)) (lambda vals (cons (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) vals)))) (lambda args (when (match args (('%exception exn) (log-exception? exn)) (_ #t)) (simple-format (current-error-port) "worker-thread: exception: ~A\n" args) (backtrace))))) #:unwind? #t)))))) (if (number? current-lifetime) (unless (< current-lifetime 0) (loop (if current-lifetime (- current-lifetime 1) #f))) (loop #f)))) (when destructor (apply destructor args)) (init (initializer)))))) (iota parallelism)) channel)) (define* (call-with-worker-thread channel proc #:key duration-logger) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." (let ((args (%worker-thread-args))) (if args (call-with-delay-logging proc #:args args) (let ((reply (make-channel))) (put-message channel (list reply (get-internal-real-time) proc)) (match (get-message reply) (('worker-thread-error duration exn) (when duration-logger (duration-logger duration)) (raise-exception exn)) ((duration . result) (when duration-logger (duration-logger duration)) (apply values result))))))) ;; Copied from (fibers web server) (define (call-with-sigint thunk cvar) (let ((handler #f)) (dynamic-wind (lambda () (set! handler (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) thunk (lambda () (if handler ;; restore Scheme handler, SIG_IGN or SIG_DFL. (sigaction SIGINT (car handler) (cdr handler)) ;; restore original C handler. (sigaction SIGINT #f)))))) ;; This variant of run-server from the fibers library supports running ;; multiple servers within one process. (define run-server/patched (let ((fibers-web-server-module (resolve-module '(fibers web server)))) (define set-nonblocking! (module-ref fibers-web-server-module 'set-nonblocking!)) (define make-default-socket (module-ref fibers-web-server-module 'make-default-socket)) (define socket-loop (module-ref fibers-web-server-module 'socket-loop)) (lambda* (handler #:key (host #f) (family AF_INET) (addr (if host (inet-pton family host) INADDR_LOOPBACK)) (port 8080) (socket (make-default-socket family addr port))) ;; We use a large backlog by default. If the server is suddenly hit ;; with a number of connections on a small backlog, clients won't ;; receive confirmation for their SYN, leading them to retry -- ;; probably successfully, but with a large latency. (listen socket 1024) (set-nonblocking! socket) (sigaction SIGPIPE SIG_IGN) (spawn-fiber (lambda () (socket-loop socket handler)))))) (define (defer-to-fiber thunk) (let ((reply (make-channel))) (spawn-fiber (lambda () (put-message reply (with-exception-handler (lambda (exn) (cons 'worker-fiber-error exn)) (lambda () (with-exception-handler (lambda (exn) (simple-format (current-error-port) "worker fiber: exception: ~A\n" exn) (backtrace) (raise-exception exn)) (lambda () (call-with-values thunk (lambda vals vals))))) #:unwind? #t))) #:parallel? #t) reply)) (define (fetch-result-of-defered-thunks . reply-channels) (let ((responses (map get-message reply-channels))) (map (match-lambda (('worker-thread-error . exn) (raise-exception exn)) (result (apply values result))) responses))) (define-syntax parallel-via-fibers (lambda (x) (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) #'(let ((tmp0 (defer-to-fiber (lambda () e0))) ...) (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) (call-with-values (lambda () (parallel-via-fibers e ...)) (lambda (v ...) b0 b1 ...)))