diff options
Diffstat (limited to 'guix-build-coordinator/utils')
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 587 | ||||
-rw-r--r-- | guix-build-coordinator/utils/timeout.scm | 81 |
2 files changed, 140 insertions, 528 deletions
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index 450c36b..d836ceb 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -1,4 +1,5 @@ (define-module (guix-build-coordinator utils fibers) + #:use-module (srfi srfi-9) #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 atomic) @@ -12,305 +13,20 @@ #:use-module (fibers scheduler) #:use-module (fibers operations) #:use-module (fibers conditions) + #:use-module (knots timeout) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix-build-coordinator utils) - #:export (make-worker-thread-channel - %worker-thread-default-timeout - call-with-worker-thread - worker-thread-timeout-error? + #:export (spawn-port-monitoring-fiber - call-with-sigint + make-discrete-priority-queueing-channels - run-server/patched - - spawn-port-monitoring-fiber - - letpar& - - port-timeout-error? - port-read-timeout-error? - port-write-timeout-error? - with-fibers-timeout - with-fibers-port-timeouts - - make-queueing-channel - make-discrete-priority-queueing-channels) + make-reusable-condition + reusable-condition? + signal-reusable-condition! + reusable-condition-wait) #:replace (retry-on-error)) -(define %worker-thread-args - (make-parameter #f)) - -(define* (make-worker-thread-channel initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - destructor - lifetime - (log-exception? (const #t)) - (expire-on-exception? #f) - (name "unnamed")) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." - (define thread-proc-vector - (make-vector parallelism #f)) - - (define (initializer/safe) - (let ((args - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running initializer in worker thread (~A): ~A:\n ~A\n" - name - initializer - exn) - #f) - (lambda () - (with-throw-handler #t - initializer - (lambda args - (backtrace)))) - #:unwind? #t))) - - (if args - args - ;; never give up, just keep retrying - (begin - (sleep 1) - (initializer/safe))))) - - (define (destructor/safe args) - (let ((success? - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running destructor in worker thread (~A): ~A:\n ~A\n" - name - destructor - exn) - #f) - (lambda () - (with-throw-handler #t - (lambda () - (apply destructor args) - #t) - (lambda _ - (backtrace)))) - #:unwind? #t))) - - (or success? - #t - (begin - (sleep 1) - (destructor/safe args))))) - - (define (process thread-index channel args) - (let loop ((current-lifetime lifetime)) - (let ((exception? - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second)) - - (let* ((start-time (get-internal-real-time)) - (response - (with-exception-handler - (lambda (exn) - (list 'worker-thread-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (vector-set! thread-proc-vector - thread-index - proc) - (with-throw-handler #t - (lambda () - (call-with-values - (lambda () - (start-stack - 'worker-thread - (apply proc args))) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - vals)))) - (lambda args - (when (match args - (('%exception exn) - (log-exception? exn)) - (_ #t)) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" args) - (backtrace))))) - #:unwind? #t))) - (put-message reply - response) - - (vector-set! thread-proc-vector - thread-index - #f) - - (match response - (('worker-thread-error duration _) - (when duration-logger - (duration-logger duration proc)) - #t) - ((duration . _) - (when duration-logger - (duration-logger duration proc)) - #f)))))))) - (unless (and expire-on-exception? - exception?) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))))) - - (let ((channel (make-channel))) - (for-each - (lambda (thread-index) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append - name " w t " - (number->string thread-index)))) - (const #t)) - - (let init ((args (initializer/safe))) - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "worker-thread-channel: exception: ~A\n" exn)) - (lambda () - (parameterize ((%worker-thread-args args)) - (process thread-index channel args))) - #:unwind? #t) - - (when destructor - (destructor/safe args)) - - (init (initializer/safe)))))) - (iota parallelism)) - - (values channel - thread-proc-vector))) - -(define &worker-thread-timeout - (make-exception-type '&worker-thread-timeout - &error - '())) - -(define make-worker-thread-timeout-error - (record-constructor &worker-thread-timeout)) - -(define worker-thread-timeout-error? - (record-predicate &worker-thread-timeout)) - -(define %worker-thread-default-timeout - (make-parameter 30)) - -(define* (call-with-worker-thread channel proc #:key duration-logger - (timeout (%worker-thread-default-timeout))) - "Send PROC to the worker thread through CHANNEL. Return the result of PROC. -If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) - (if args - (call-with-delay-logging proc #:args args) - (let* ((reply (make-channel)) - (operation-success? - (perform-operation - (let ((put - (wrap-operation - (put-operation channel - (list reply - (get-internal-real-time) - proc)) - (const #t)))) - - (if timeout - (choice-operation - put - (wrap-operation (sleep-operation timeout) - (const #f))) - put))))) - - (unless operation-success? - (raise-exception - (make-worker-thread-timeout-error))) - - (match (get-message reply) - (('worker-thread-error duration exn) - (when duration-logger - (duration-logger duration)) - (raise-exception exn)) - ((duration . result) - (when duration-logger - (duration-logger duration)) - (apply values result))))))) - -;; Copied from (fibers web server) -(define (call-with-sigint thunk cvar) - (let ((handler #f)) - (dynamic-wind - (lambda () - (set! handler - (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) - thunk - (lambda () - (if handler - ;; restore Scheme handler, SIG_IGN or SIG_DFL. - (sigaction SIGINT (car handler) (cdr handler)) - ;; restore original C handler. - (sigaction SIGINT #f)))))) - -;; This variant of run-server from the fibers library supports running -;; multiple servers within one process. -(define run-server/patched - (let ((fibers-web-server-module - (resolve-module '(fibers web server)))) - - (define set-nonblocking! - (module-ref fibers-web-server-module 'set-nonblocking!)) - - (define make-default-socket - (module-ref fibers-web-server-module 'make-default-socket)) - - (define socket-loop - (module-ref fibers-web-server-module 'socket-loop)) - - (lambda* (handler - #:key - (host #f) - (family AF_INET) - (addr (if host - (inet-pton family host) - INADDR_LOOPBACK)) - (port 8080) - (socket (make-default-socket family addr port))) - ;; We use a large backlog by default. If the server is suddenly hit - ;; with a number of connections on a small backlog, clients won't - ;; receive confirmation for their SYN, leading them to retry -- - ;; probably successfully, but with a large latency. - (listen socket 1024) - (set-nonblocking! socket) - (sigaction SIGPIPE SIG_IGN) - (spawn-fiber (lambda () (socket-loop socket handler)))))) - (define (spawn-port-monitoring-fiber port error-condition) (spawn-fiber (lambda () @@ -326,7 +42,7 @@ If already in the worker thread, call PROC immediately." "port monitoring fiber error-condition unresponsive") (primitive-exit 1)) (lambda () - (with-fibers-port-timeouts + (with-port-timeouts (lambda () (let ((sock (non-blocking-port @@ -337,218 +53,6 @@ If already in the worker thread, call PROC immediately." #:unwind? #t) (sleep 20))))) -(define (defer-to-fiber thunk) - (let ((reply (make-channel))) - (spawn-fiber - (lambda () - (put-message - reply - (with-exception-handler - (lambda (exn) - (cons 'worker-fiber-error exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "worker fiber: exception: ~A\n" - exn) - (backtrace) - (raise-exception exn)) - (lambda () - (call-with-values - thunk - (lambda vals - vals))))) - #:unwind? #t))) - #:parallel? #t) - reply)) - -(define (fetch-result-of-defered-thunks . reply-channels) - (let ((responses (map get-message reply-channels))) - (map - (match-lambda - (('worker-thread-error . exn) - (raise-exception exn)) - (result - (apply values result))) - responses))) - -(define-syntax parallel-via-fibers - (lambda (x) - (syntax-case x () - ((_ e0 ...) - (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) - #'(let ((tmp0 (defer-to-fiber - (lambda () - e0))) - ...) - (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) - -(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) - (call-with-values - (lambda () (parallel-via-fibers e ...)) - (lambda (v ...) - b0 b1 ...))) - -(define* (with-fibers-timeout thunk #:key timeout on-timeout) - (let ((channel (make-channel))) - (spawn-fiber - (lambda () - (with-exception-handler - (lambda (exn) - (perform-operation - (choice-operation - (put-operation channel (cons 'exception exn)) - (sleep-operation timeout)))) - (lambda () - (call-with-values thunk - (lambda vals - (perform-operation - (choice-operation - (put-operation channel vals) - (sleep-operation timeout)))))) - #:unwind? #t))) - - (match (perform-operation - (choice-operation - (get-operation channel) - (wrap-operation (sleep-operation timeout) - (const 'timeout)))) - ('timeout - (on-timeout)) - (('exception . exn) - (raise-exception exn)) - (vals - (apply values vals))))) - -(define &port-timeout - (make-exception-type '&port-timeout - &external-error - '(thunk port))) - -(define make-port-timeout-error - (record-constructor &port-timeout)) - -(define port-timeout-error? - (record-predicate &port-timeout)) - -(define &port-read-timeout - (make-exception-type '&port-read-timeout - &port-timeout - '())) - -(define make-port-read-timeout-error - (record-constructor &port-read-timeout)) - -(define port-read-timeout-error? - (record-predicate &port-read-timeout)) - -(define &port-write-timeout - (make-exception-type '&port-write-timeout - &port-timeout - '())) - -(define make-port-write-timeout-error - (record-constructor &port-write-timeout)) - -(define port-write-timeout-error? - (record-predicate &port-write-timeout)) - -;; These procedure are subject to spurious wakeups. - -(define (readable? port) - "Test if PORT is writable." - (= 1 (port-poll port "r" 0))) - -(define (writable? port) - "Test if PORT is writable." - (= 1 (port-poll port "w" 0))) - -(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) - (make-base-operation #f - (lambda _ - (and (ready? port) values)) - (lambda (flag sched resume) - (define (commit) - (match (atomic-box-compare-and-swap! flag 'W 'S) - ('W (resume values)) - ('C (commit)) - ('S #f))) - (schedule-when-ready - sched (port-ready-fd port) commit)))) - -(define (wait-until-port-readable-operation port) - "Make an operation that will succeed when PORT is readable." - (unless (input-port? port) - (error "refusing to wait forever for input on non-input port")) - (make-wait-operation readable? schedule-task-when-fd-readable port - port-read-wait-fd - wait-until-port-readable-operation)) - -(define (wait-until-port-writable-operation port) - "Make an operation that will succeed when PORT is writable." - (unless (output-port? port) - (error "refusing to wait forever for output on non-output port")) - (make-wait-operation writable? schedule-task-when-fd-writable port - port-write-wait-fd - wait-until-port-writable-operation)) - -(define* (with-fibers-port-timeouts thunk - #:key timeout - (read-timeout timeout) - (write-timeout timeout)) - (define (no-fibers-wait thunk port mode timeout) - (define poll-timeout-ms 200) - - ;; When the GC runs, it restarts the poll syscall, but the timeout - ;; remains unchanged! When the timeout is longer than the time - ;; between the syscall restarting, I think this renders the - ;; timeout useless. Therefore, this code uses a short timeout, and - ;; repeatedly calls poll while watching the clock to see if it has - ;; timed out overall. - (let ((timeout-internal - (+ (get-internal-real-time) - (* internal-time-units-per-second timeout)))) - (let loop ((poll-value - (port-poll port mode poll-timeout-ms))) - (if (= poll-value 0) - (if (> (get-internal-real-time) - timeout-internal) - (raise-exception - (if (string=? mode "r") - (make-port-read-timeout-error thunk port) - (make-port-write-timeout-error thunk port))) - (loop (port-poll port mode poll-timeout-ms))) - poll-value)))) - - (parameterize - ((current-read-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-readable-operation port) - (wrap-operation - (sleep-operation read-timeout) - (lambda () - (raise-exception - (make-port-read-timeout-error thunk port)))))) - (no-fibers-wait thunk port "r" read-timeout)))) - (current-write-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-writable-operation port) - (wrap-operation - (sleep-operation write-timeout) - (lambda () - (raise-exception - (make-port-write-timeout-error thunk port)))))) - (no-fibers-wait thunk port "w" write-timeout))))) - (thunk))) - ;; Use the fibers sleep (define (retry-on-error . args) (apply @@ -557,28 +61,6 @@ If already in the worker thread, call PROC immediately." args (list #:sleep-impl sleep)))) -(define (make-queueing-channel channel) - (define queue (make-q)) - - (let ((queue-channel (make-channel))) - (spawn-fiber - (lambda () - (while #t - (if (q-empty? queue) - (enq! queue - (perform-operation - (get-operation queue-channel))) - (let ((front (q-front queue))) - (perform-operation - (choice-operation - (wrap-operation (get-operation queue-channel) - (lambda (val) - (enq! queue val))) - (wrap-operation (put-operation channel front) - (lambda _ - (q-pop! queue)))))))))) - queue-channel)) - (define (make-discrete-priority-queueing-channels channel num-priorities) (define all-queues (map (lambda _ (make-q)) @@ -588,6 +70,11 @@ If already in the worker thread, call PROC immediately." (map (lambda _ (make-channel)) (iota num-priorities))) + (define (stats) + (map (lambda (queue) + `((length . ,(q-length queue)))) + all-queues)) + (spawn-fiber (lambda () (while #t @@ -620,4 +107,48 @@ If already in the worker thread, call PROC immediately." (enq! queue val)))) all-queues queue-channels))))))))))) - (apply values queue-channels)) + (values (list-copy queue-channels) + stats)) + +(define-record-type <reusable-condition> + (%make-reusable-condition atomic-box channel) + reusable-condition? + (atomic-box reusable-condition-atomic-box) + (channel reusable-condition-channel)) + +(define (make-reusable-condition) + (%make-reusable-condition (make-atomic-box #f) + (make-channel))) + +(define* (signal-reusable-condition! reusable-condition + #:optional (scheduler (current-scheduler))) + (match (atomic-box-compare-and-swap! + (reusable-condition-atomic-box reusable-condition) + #f + #t) + (#f + (spawn-fiber + (lambda () + (put-message (reusable-condition-channel reusable-condition) + #t)) + scheduler) + #t) + (#t #f))) + +(define* (reusable-condition-wait reusable-condition + #:key (timeout #f)) + (let ((val + (if (atomic-box-ref (reusable-condition-atomic-box reusable-condition)) + #t + ;; Not great as this is subject to race conditions, but it should + ;; roughly work + (if timeout + (perform-operation + (choice-operation + (get-operation (reusable-condition-channel reusable-condition)) + (wrap-operation (sleep-operation timeout) + (const #f)))) + (get-message (reusable-condition-channel reusable-condition)))))) + (atomic-box-set! (reusable-condition-atomic-box reusable-condition) + #f) + val)) diff --git a/guix-build-coordinator/utils/timeout.scm b/guix-build-coordinator/utils/timeout.scm new file mode 100644 index 0000000..bb133d7 --- /dev/null +++ b/guix-build-coordinator/utils/timeout.scm @@ -0,0 +1,81 @@ +(define-module (guix-build-coordinator utils timeout) + #:use-module (ice-9 exceptions) + #:use-module (ice-9 suspendable-ports) + #:use-module ((ice-9 ports internal) #:select (port-poll)) + #:export (&port-timeout + &port-read-timeout + &port-write-timeout + + port-timeout-error? + port-read-timeout-error? + port-write-timeout-error? + + with-port-timeouts)) + +(define &port-timeout + (make-exception-type '&port-timeout + &external-error + '(port))) + +(define make-port-timeout-error + (record-constructor &port-timeout)) + +(define port-timeout-error? + (record-predicate &port-timeout)) + +(define &port-read-timeout + (make-exception-type '&port-read-timeout + &port-timeout + '())) + +(define make-port-read-timeout-error + (record-constructor &port-read-timeout)) + +(define port-read-timeout-error? + (record-predicate &port-read-timeout)) + +(define &port-write-timeout + (make-exception-type '&port-write-timeout + &port-timeout + '())) + +(define make-port-write-timeout-error + (record-constructor &port-write-timeout)) + +(define port-write-timeout-error? + (record-predicate &port-write-timeout)) + +(define* (with-port-timeouts thunk #:key timeout) + + ;; When the GC runs, it restarts the poll syscall, but the timeout remains + ;; unchanged! When the timeout is longer than the time between the syscall + ;; restarting, I think this renders the timeout useless. Therefore, this + ;; code uses a short timeout, and repeatedly calls poll while watching the + ;; clock to see if it has timed out overall. + (define poll-timeout-ms 200) + + (define (wait port mode) + (let ((timeout-internal + (+ (get-internal-real-time) + (* internal-time-units-per-second timeout)))) + (let loop ((poll-value + (port-poll port mode poll-timeout-ms))) + (if (= poll-value 0) + (if (> (get-internal-real-time) + timeout-internal) + (raise-exception + (if (string=? mode "r") + (make-port-read-timeout-error port) + (make-port-write-timeout-error port))) + (loop (port-poll port mode poll-timeout-ms))) + poll-value)))) + + (parameterize + ((current-read-waiter + (lambda (port) + (wait port "r"))) + (current-write-waiter + (lambda (port) + (wait port "w")))) + (thunk))) + |