(define-module (guix-build-coordinator utils fibers) #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 atomic) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (ice-9 ports internal) #:use-module (ice-9 suspendable-ports) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) #:use-module (fibers conditions) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix-build-coordinator utils) #:export (make-worker-thread-channel %worker-thread-default-timeout call-with-worker-thread worker-thread-timeout-error? call-with-sigint run-server/patched spawn-port-monitoring-fiber letpar& port-timeout-error? port-read-timeout-error? port-write-timeout-error? with-fibers-timeout with-fibers-port-timeouts make-queueing-channel) #:replace (retry-on-error)) (define %worker-thread-args (make-parameter #f)) (define* (make-worker-thread-channel initializer #:key (parallelism 1) (delay-logger (lambda _ #f)) (duration-logger (const #f)) destructor lifetime (log-exception? (const #t)) (expire-on-exception? #f) (name "unnamed")) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the worker thread procedure." (define thread-proc-vector (make-vector parallelism #f)) (define (initializer/safe) (let ((args (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running initializer in worker thread (~A): ~A:\n ~A\n" name initializer exn) #f) (lambda () (with-throw-handler #t initializer (lambda args (backtrace)))) #:unwind? #t))) (if args args ;; never give up, just keep retrying (begin (sleep 1) (initializer/safe))))) (define (destructor/safe args) (let ((success? (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running destructor in worker thread (~A): ~A:\n ~A\n" name destructor exn) #f) (lambda () (with-throw-handler #t (lambda () (apply destructor args) #t) (lambda _ (backtrace)))) #:unwind? #t))) (or success? #t (begin (sleep 1) (destructor/safe args))))) (define (process thread-index channel args) (let loop ((current-lifetime lifetime)) (let ((exception? (match (get-message channel) (((? channel? reply) sent-time (? procedure? proc)) (let ((time-delay (- (get-internal-real-time) sent-time))) (delay-logger (/ time-delay internal-time-units-per-second)) (let* ((start-time (get-internal-real-time)) (response (with-exception-handler (lambda (exn) (list 'worker-thread-error (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) exn)) (lambda () (vector-set! thread-proc-vector thread-index proc) (with-throw-handler #t (lambda () (call-with-values (lambda () (start-stack 'worker-thread (apply proc args))) (lambda vals (cons (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) vals)))) (lambda args (when (match args (('%exception exn) (log-exception? exn)) (_ #t)) (simple-format (current-error-port) "worker-thread: exception: ~A\n" args) (backtrace))))) #:unwind? #t))) (put-message reply response) (vector-set! thread-proc-vector thread-index #f) (match response (('worker-thread-error duration _) (when duration-logger (duration-logger duration proc)) #t) ((duration . _) (when duration-logger (duration-logger duration proc)) #f)))))))) (unless (and expire-on-exception? exception?) (if (number? current-lifetime) (unless (< current-lifetime 0) (loop (if current-lifetime (- current-lifetime 1) #f))) (loop #f)))))) (let ((channel (make-channel))) (for-each (lambda (thread-index) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " w t " (number->string thread-index)))) (const #t)) (let init ((args (initializer/safe))) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "worker-thread-channel: exception: ~A\n" exn)) (lambda () (parameterize ((%worker-thread-args args)) (process thread-index channel args))) #:unwind? #t) (when destructor (destructor/safe args)) (init (initializer/safe)))))) (iota parallelism)) (values channel thread-proc-vector))) (define &worker-thread-timeout (make-exception-type '&worker-thread-timeout &error '())) (define make-worker-thread-timeout-error (record-constructor &worker-thread-timeout)) (define worker-thread-timeout-error? (record-predicate &worker-thread-timeout)) (define %worker-thread-default-timeout (make-parameter 30)) (define* (call-with-worker-thread channel proc #:key duration-logger (timeout (%worker-thread-default-timeout))) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." (let ((args (%worker-thread-args))) (if args (call-with-delay-logging proc #:args args) (let* ((reply (make-channel)) (operation-success? (perform-operation (let ((put (wrap-operation (put-operation channel (list reply (get-internal-real-time) proc)) (const #t)))) (if timeout (choice-operation put (wrap-operation (sleep-operation timeout) (const #f))) put))))) (unless operation-success? (raise-exception (make-worker-thread-timeout-error))) (match (get-message reply) (('worker-thread-error duration exn) (when duration-logger (duration-logger duration)) (raise-exception exn)) ((duration . result) (when duration-logger (duration-logger duration)) (apply values result))))))) ;; Copied from (fibers web server) (define (call-with-sigint thunk cvar) (let ((handler #f)) (dynamic-wind (lambda () (set! handler (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) thunk (lambda () (if handler ;; restore Scheme handler, SIG_IGN or SIG_DFL. (sigaction SIGINT (car handler) (cdr handler)) ;; restore original C handler. (sigaction SIGINT #f)))))) ;; This variant of run-server from the fibers library supports running ;; multiple servers within one process. (define run-server/patched (let ((fibers-web-server-module (resolve-module '(fibers web server)))) (define set-nonblocking! (module-ref fibers-web-server-module 'set-nonblocking!)) (define make-default-socket (module-ref fibers-web-server-module 'make-default-socket)) (define socket-loop (module-ref fibers-web-server-module 'socket-loop)) (lambda* (handler #:key (host #f) (family AF_INET) (addr (if host (inet-pton family host) INADDR_LOOPBACK)) (port 8080) (socket (make-default-socket family addr port))) ;; We use a large backlog by default. If the server is suddenly hit ;; with a number of connections on a small backlog, clients won't ;; receive confirmation for their SYN, leading them to retry -- ;; probably successfully, but with a large latency. (listen socket 1024) (set-nonblocking! socket) (sigaction SIGPIPE SIG_IGN) (spawn-fiber (lambda () (socket-loop socket handler)))))) (define (spawn-port-monitoring-fiber port error-condition) (spawn-fiber (lambda () (while #t (with-exception-handler (lambda (exn) (simple-format (current-error-port) "port monitoring fiber failed to connect to ~A: ~A\n" port exn) (signal-condition! error-condition) (sleep 10) (simple-format (current-error-port) "port monitoring fiber error-condition unresponsive") (primitive-exit 1)) (lambda () (with-fibers-port-timeouts (lambda () (let ((sock (non-blocking-port (socket PF_INET SOCK_STREAM 0)))) (connect sock AF_INET INADDR_LOOPBACK port) (close-port sock))) #:timeout 20)) #:unwind? #t) (sleep 20))))) (define (defer-to-fiber thunk) (let ((reply (make-channel))) (spawn-fiber (lambda () (put-message reply (with-exception-handler (lambda (exn) (cons 'worker-fiber-error exn)) (lambda () (with-exception-handler (lambda (exn) (simple-format (current-error-port) "worker fiber: exception: ~A\n" exn) (backtrace) (raise-exception exn)) (lambda () (call-with-values thunk (lambda vals vals))))) #:unwind? #t))) #:parallel? #t) reply)) (define (fetch-result-of-defered-thunks . reply-channels) (let ((responses (map get-message reply-channels))) (map (match-lambda (('worker-thread-error . exn) (raise-exception exn)) (result (apply values result))) responses))) (define-syntax parallel-via-fibers (lambda (x) (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) #'(let ((tmp0 (defer-to-fiber (lambda () e0))) ...) (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) (call-with-values (lambda () (parallel-via-fibers e ...)) (lambda (v ...) b0 b1 ...))) (define* (with-fibers-timeout thunk #:key timeout on-timeout) (let ((channel (make-channel))) (spawn-fiber (lambda () (with-exception-handler (lambda (exn) (perform-operation (choice-operation (put-operation channel (cons 'exception exn)) (sleep-operation timeout)))) (lambda () (call-with-values thunk (lambda vals (perform-operation (choice-operation (put-operation channel vals) (sleep-operation timeout)))))) #:unwind? #t))) (match (perform-operation (choice-operation (get-operation channel) (wrap-operation (sleep-operation timeout) (const 'timeout)))) ('timeout (on-timeout)) (('exception . exn) (raise-exception exn)) (vals (apply values vals))))) (define &port-timeout (make-exception-type '&port-timeout &external-error '(thunk port))) (define make-port-timeout-error (record-constructor &port-timeout)) (define port-timeout-error? (record-predicate &port-timeout)) (define &port-read-timeout (make-exception-type '&port-read-timeout &port-timeout '())) (define make-port-read-timeout-error (record-constructor &port-read-timeout)) (define port-read-timeout-error? (record-predicate &port-read-timeout)) (define &port-write-timeout (make-exception-type '&port-write-timeout &port-timeout '())) (define make-port-write-timeout-error (record-constructor &port-write-timeout)) (define port-write-timeout-error? (record-predicate &port-write-timeout)) ;; These procedure are subject to spurious wakeups. (define (readable? port) "Test if PORT is writable." (= 1 (port-poll port "r" 0))) (define (writable? port) "Test if PORT is writable." (= 1 (port-poll port "w" 0))) (define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) (make-base-operation #f (lambda _ (and (ready? port) values)) (lambda (flag sched resume) (define (commit) (match (atomic-box-compare-and-swap! flag 'W 'S) ('W (resume values)) ('C (commit)) ('S #f))) (schedule-when-ready sched (port-ready-fd port) commit)))) (define (wait-until-port-readable-operation port) "Make an operation that will succeed when PORT is readable." (unless (input-port? port) (error "refusing to wait forever for input on non-input port")) (make-wait-operation readable? schedule-task-when-fd-readable port port-read-wait-fd wait-until-port-readable-operation)) (define (wait-until-port-writable-operation port) "Make an operation that will succeed when PORT is writable." (unless (output-port? port) (error "refusing to wait forever for output on non-output port")) (make-wait-operation writable? schedule-task-when-fd-writable port port-write-wait-fd wait-until-port-writable-operation)) (define* (with-fibers-port-timeouts thunk #:key timeout (read-timeout timeout) (write-timeout timeout)) (define (no-fibers-wait thunk port mode timeout) (define poll-timeout-ms 200) ;; When the GC runs, it restarts the poll syscall, but the timeout ;; remains unchanged! When the timeout is longer than the time ;; between the syscall restarting, I think this renders the ;; timeout useless. Therefore, this code uses a short timeout, and ;; repeatedly calls poll while watching the clock to see if it has ;; timed out overall. (let ((timeout-internal (+ (get-internal-real-time) (* internal-time-units-per-second timeout)))) (let loop ((poll-value (port-poll port mode poll-timeout-ms))) (if (= poll-value 0) (if (> (get-internal-real-time) timeout-internal) (raise-exception (if (string=? mode "r") (make-port-read-timeout-error thunk port) (make-port-write-timeout-error thunk port))) (loop (port-poll port mode poll-timeout-ms))) poll-value)))) (parameterize ((current-read-waiter (lambda (port) (if (current-scheduler) (perform-operation (choice-operation (wait-until-port-readable-operation port) (wrap-operation (sleep-operation read-timeout) (lambda () (raise-exception (make-port-read-timeout-error thunk port)))))) (no-fibers-wait thunk port "r" read-timeout)))) (current-write-waiter (lambda (port) (if (current-scheduler) (perform-operation (choice-operation (wait-until-port-writable-operation port) (wrap-operation (sleep-operation write-timeout) (lambda () (raise-exception (make-port-write-timeout-error thunk port)))))) (no-fibers-wait thunk port "w" write-timeout))))) (thunk))) ;; Use the fibers sleep (define (retry-on-error . args) (apply (@ (guix-build-coordinator utils) retry-on-error) (append args (list #:sleep-impl sleep)))) (define (make-queueing-channel channel) (define queue (make-q)) (let ((queue-channel (make-channel))) (spawn-fiber (lambda () (while #t (if (q-empty? queue) (enq! queue (perform-operation (get-operation queue-channel))) (let ((front (q-front queue))) (perform-operation (choice-operation (wrap-operation (get-operation queue-channel) (lambda (val) (enq! queue val))) (wrap-operation (put-operation channel front) (lambda _ (q-pop! queue)))))))))) queue-channel))