diff options
Diffstat (limited to 'nar-herder/utils.scm')
-rw-r--r-- | nar-herder/utils.scm | 837 |
1 files changed, 486 insertions, 351 deletions
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index 2d62360..4755d33 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -18,44 +18,37 @@ (define-module (nar-herder utils) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) ; time #:use-module (ice-9 q) - ;; #:use-module (ice-9 ftw) - ;; #:use-module (ice-9 popen) #:use-module (ice-9 iconv) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) + #:use-module (ice-9 atomic) #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) + #:use-module (ice-9 suspendable-ports) + #:use-module ((ice-9 ports internal) #:select (port-poll + port-read-wait-fd + port-write-wait-fd)) #:use-module (web uri) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (fibers) + #:use-module (fibers timers) #:use-module (fibers channels) + #:use-module (fibers scheduler) #:use-module (fibers conditions) - ;; #:use-module (gcrypt pk-crypto) - ;; #:use-module (gcrypt hash) - ;; #:use-module (gcrypt random) - ;; #:use-module (json) - ;; #:use-module (guix pki) - ;; #:use-module (guix utils) - ;; #:use-module (guix config) - ;; #:use-module (guix store) - ;; #:use-module (guix status) - ;; #:use-module (guix base64) - ;; #:use-module (guix scripts substitute) - #:export (call-with-streaming-http-request - &chunked-input-ended-prematurely - chunked-input-ended-prematurely-error? - make-chunked-input-port* - - make-worker-thread-channel + #:use-module (fibers operations) + #:use-module ((guix build syscalls) + #:select (set-thread-name)) + #:export (make-worker-thread-set call-with-worker-thread call-with-time-logging @@ -65,171 +58,20 @@ create-work-queue - check-locale!)) - -;; Chunked Responses -(define (read-chunk-header port) - "Read a chunk header from PORT and return the size in bytes of the -upcoming chunk." - (match (read-line port) - ((? eof-object?) - ;; Connection closed prematurely: there's nothing left to read. - (error "chunked input ended prematurely")) - (str - (let ((extension-start (string-index str - (lambda (c) - (or (char=? c #\;) - (char=? c #\return)))))) - (string->number (if extension-start ; unnecessary? - (substring str 0 extension-start) - str) - 16))))) - -(define &chunked-input-ended-prematurely - (make-exception-type '&chunked-input-error-prematurely - &external-error - '())) + check-locale! -(define make-chunked-input-ended-prematurely-error - (record-constructor &chunked-input-ended-prematurely)) - -(define chunked-input-ended-prematurely-error? - (record-predicate &chunked-input-ended-prematurely)) - -(define* (make-chunked-input-port* port #:key (keep-alive? #f)) - (define (close) - (unless keep-alive? - (close-port port))) - - (define chunk-size 0) ;size of the current chunk - (define remaining 0) ;number of bytes left from the current chunk - (define finished? #f) ;did we get all the chunks? - - (define (read! bv idx to-read) - (define (loop to-read num-read) - (cond ((or finished? (zero? to-read)) - num-read) - ((zero? remaining) ;get a new chunk - (let ((size (read-chunk-header port))) - (set! chunk-size size) - (set! remaining size) - (cond - ((zero? size) - (set! finished? #t) - (get-bytevector-n port 2) ; \r\n follows the last chunk - num-read) - (else - (loop to-read num-read))))) - (else ;read from the current chunk - (let* ((ask-for (min to-read remaining)) - (read (get-bytevector-n! port bv (+ idx num-read) - ask-for))) - (cond - ((eof-object? read) ;premature termination - (raise-exception - (make-chunked-input-ended-prematurely-error))) - (else - (let ((left (- remaining read))) - (set! remaining left) - (when (zero? left) - ;; We're done with this chunk; read CR and LF. - (get-u8 port) (get-u8 port)) - (loop (- to-read read) - (+ num-read read))))))))) - (loop to-read 0)) - - (make-custom-binary-input-port "chunked input port" read! #f #f close)) - -(define* (make-chunked-output-port* port #:key (keep-alive? #f) - (buffering 1200) - report-bytes-sent) - (define heap-allocated-limit - (expt 2 20)) ;; 1MiB - - (define (%put-string s) - (unless (string-null? s) - (let* ((bv (string->bytevector s "ISO-8859-1")) - (length (bytevector-length bv))) - (put-string port (number->string length 16)) - (put-string port "\r\n") - (put-bytevector port bv) - (put-string port "\r\n") - - (when report-bytes-sent - (report-bytes-sent length)) - (let* ((stats (gc-stats)) - (initial-gc-times - (assq-ref stats 'gc-times))) - (when (> (assq-ref stats 'heap-allocated-since-gc) - heap-allocated-limit) - (while (let ((updated-stats (gc-stats))) - (= (assq-ref updated-stats 'gc-times) - initial-gc-times)) - (gc) - (usleep 50))))))) - - (define (%put-char c) - (%put-string (list->string (list c)))) - - (define (flush) #t) - (define (close) - (put-string port "0\r\n\r\n") - (force-output port) - (unless keep-alive? - (close-port port))) - (let ((ret (make-soft-port - (vector %put-char %put-string flush #f close) "w"))) - (setvbuf ret 'block buffering) - ret)) - -(define* (call-with-streaming-http-request uri callback - #:key (headers '()) - (method 'PUT) - report-bytes-sent) - (let* ((port (open-socket-for-uri uri)) - (request - (build-request - uri - #:method method - #:version '(1 . 1) - #:headers `((connection close) - (Transfer-Encoding . "chunked") - (Content-Type . "application/octet-stream") - ,@headers) - #:port port))) - - (set-port-encoding! port "ISO-8859-1") - (setvbuf port 'block (expt 2 13)) - (with-exception-handler - (lambda (exp) - (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp) - (close-port port) - (raise-exception exp)) - (lambda () - (let ((request (write-request request port))) - (let* ((chunked-output-port - (make-chunked-output-port* - port - #:buffering (expt 2 12) - #:keep-alive? #t - #:report-bytes-sent report-bytes-sent))) - - ;; A SIGPIPE will kill Guile, so ignore it - (sigaction SIGPIPE - (lambda (arg) - (simple-format (current-error-port) "warning: SIGPIPE\n"))) - - (set-port-encoding! chunked-output-port "ISO-8859-1") - (callback chunked-output-port) - (close-port chunked-output-port) - - (let ((response (read-response port))) - (let ((body (read-response-body response))) - (close-port port) - (values response - body))))))))) - -(define* (retry-on-error f #:key times delay ignore) + open-socket-for-uri* + + call-with-sigint + run-server/patched + + timeout-error? + + port-read-timeout-error? + port-write-timeout-error? + with-port-timeouts)) + +(define* (retry-on-error f #:key times delay ignore error-hook) (let loop ((attempt 1)) (match (with-exception-handler (lambda (exn) @@ -259,15 +101,26 @@ upcoming chunk." times)) (apply values return-values)) ((#f . exn) - (if (>= attempt times) + (if (>= attempt + (- times 1)) (begin (simple-format (current-error-port) - "error: ~A:\n ~A,\n giving up after ~A attempts\n" + "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n" f exn - times) - (raise-exception exn)) + attempt + times + delay) + (when error-hook + (error-hook attempt exn)) + (sleep delay) + (simple-format + (current-error-port) + "running last retry of ~A after ~A failed attempts\n" + f + attempt) + (f)) (begin (simple-format (current-error-port) @@ -277,71 +130,11 @@ upcoming chunk." attempt times delay) + (when error-hook + (error-hook attempt exn)) (sleep delay) (loop (+ 1 attempt)))))))) -(define delay-logging-fluid - (make-thread-local-fluid)) -(define delay-logging-depth-fluid - (make-thread-local-fluid 0)) - -(define (log-delay proc duration) - (and=> (fluid-ref delay-logging-fluid) - (lambda (recorder) - (recorder proc duration)))) - -(define* (call-with-delay-logging proc #:key (threshold 1) (args '())) - (let ((start (get-internal-real-time)) - (trace '()) - (root-logger? (eq? #f (fluid-ref delay-logging-fluid)))) - - (define (format-seconds seconds) - (format #f "~4f" seconds)) - - (call-with-values - (lambda () - (with-fluid* delay-logging-depth-fluid - (+ 1 (fluid-ref delay-logging-depth-fluid)) - (lambda () - (if root-logger? - (with-fluid* delay-logging-fluid - (lambda (proc duration) - (set! trace - (cons (list proc - duration - (fluid-ref delay-logging-depth-fluid)) - trace)) - #t) - (lambda () - (apply proc args))) - (apply proc args))))) - (lambda vals - (let ((elapsed-seconds - (/ (- (get-internal-real-time) - start) - internal-time-units-per-second))) - (if (and (> elapsed-seconds threshold) - root-logger?) - (let ((lines - (cons - (simple-format #f "warning: delay of ~A seconds: ~A" - (format-seconds elapsed-seconds) - proc) - (map (match-lambda - ((proc duration depth) - (string-append - (make-string (* 2 depth) #\space) - (simple-format #f "~A: ~A" - (format-seconds duration) - proc)))) - trace)))) - (display (string-append - (string-join lines "\n") - "\n"))) - (unless root-logger? - ((fluid-ref delay-logging-fluid) proc elapsed-seconds)))) - (apply values vals))))) - (define (call-with-time-logging name thunk) (let ((start (current-time time-utc))) (call-with-values @@ -364,7 +157,9 @@ upcoming chunk." (define* (create-work-queue thread-count-parameter proc #:key thread-start-delay (thread-stop-delay - (make-time time-duration 0 0))) + (make-time time-duration 0 0)) + (name "unnamed") + priority<?) (let ((queue (make-q)) (queue-mutex (make-mutex)) (job-available (make-condition-variable)) @@ -384,11 +179,26 @@ upcoming chunk." (else thread-count-parameter))) - (define (process-job . args) - (with-mutex queue-mutex - (enq! queue args) - (start-new-threads-if-necessary (get-thread-count)) - (signal-condition-variable job-available))) + (define process-job + (if priority<? + (lambda* (args #:key priority) + (with-mutex queue-mutex + (enq! queue (cons priority args)) + (set-car! + queue + (stable-sort! (car queue) + (lambda (a b) + (priority<? + (car a) + (car b))))) + (sync-q! queue) + (start-new-threads-if-necessary (get-thread-count)) + (signal-condition-variable job-available))) + (lambda args + (with-mutex queue-mutex + (enq! queue args) + (start-new-threads-if-necessary (get-thread-count)) + (signal-condition-variable job-available))))) (define (count-threads) (with-mutex queue-mutex @@ -403,11 +213,12 @@ upcoming chunk." (define (list-jobs) (with-mutex queue-mutex - (append (list-copy - (car queue)) + (append (if priority<? + (map cdr (car queue)) + (list-copy (car queue))) (hash-fold (lambda (key val result) - (or (and val - (cons val result)) + (if val + (cons val result) result)) '() running-job-args)))) @@ -416,16 +227,17 @@ upcoming chunk." (with-exception-handler (lambda (exn) (simple-format (current-error-port) - "job raised exception: ~A\n" - job-args)) + "~A work queue, job raised exception ~A: ~A\n" + name job-args exn)) (lambda () (with-throw-handler #t (lambda () (apply proc job-args)) (lambda (key . args) - (simple-format (current-error-port) - "exception when handling job: ~A ~A\n" - key args) + (simple-format + (current-error-port) + "~A work queue, exception when handling job: ~A ~A\n" + name key args) (backtrace)))) #:unwind? #t)) @@ -453,6 +265,13 @@ upcoming chunk." (call-with-new-thread (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append name " q t " + (number->string thread-index)))) + (const #t)) + (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex queue-mutex) @@ -469,9 +288,13 @@ upcoming chunk." ;; the job in the mean time (if (q-empty? queue) #f - (deq! queue)) + (if priority<? + (cdr (deq! queue)) + (deq! queue))) #f) - (deq! queue)))) + (if priority<? + (cdr (deq! queue)) + (deq! queue))))) (if job-args (begin @@ -499,32 +322,38 @@ upcoming chunk." (define start-new-threads-if-necessary (let ((previous-thread-started-at (make-time time-monotonic 0 0))) (lambda (desired-count) - (let* ((thread-count - (hash-count (const #t) running-job-args)) - (threads-to-start - (- desired-count thread-count))) - (when (> threads-to-start 0) - (for-each - (lambda (thread-index) - (when (eq? (hash-ref running-job-args - thread-index - 'slot-free) - 'slot-free) - (let* ((now (current-time time-monotonic)) - (elapsed (time-difference now - previous-thread-started-at))) - (when (or (eq? #f thread-start-delay) - (time>=? elapsed thread-start-delay)) - (set! previous-thread-started-at now) - (hash-set! running-job-args - thread-index - #f) - (start-thread thread-index))))) - (iota desired-count))))))) + (let* ((thread-count + (hash-count (const #t) running-job-args)) + (threads-to-start + (- desired-count thread-count))) + (when (> threads-to-start 0) + (for-each + (lambda (thread-index) + (when (eq? (hash-ref running-job-args + thread-index + 'slot-free) + 'slot-free) + (let* ((now (current-time time-monotonic)) + (elapsed (time-difference now + previous-thread-started-at))) + (when (or (eq? #f thread-start-delay) + (time>=? elapsed thread-start-delay)) + (set! previous-thread-started-at now) + (hash-set! running-job-args + thread-index + #f) + (start-thread thread-index))))) + (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append name " q t"))) + (const #t)) + (while #t (sleep 15) (with-mutex queue-mutex @@ -565,83 +394,171 @@ falling back to en_US.utf8\n" (setlocale LC_ALL "")) #:unwind? #t)) -(define %worker-thread-args - (make-parameter #f)) - -(define* (make-worker-thread-channel initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - destructor - lifetime - (log-exception? (const #t))) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." +(define-record-type <worker-thread-set> + (worker-thread-set channel arguments-parameter) + worker-thread-set? + (channel worker-thread-set-channel) + (arguments-parameter worker-thread-set-arguments-parameter)) + +(define* (make-worker-thread-set initializer + #:key (parallelism 1) + (delay-logger (lambda _ #f)) + (duration-logger (const #f)) + destructor + lifetime + (log-exception? (const #t)) + (expire-on-exception? #f) + (name "unnamed")) + (define param + (make-parameter #f)) + + (define (initializer/safe) + (let ((args + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running initializer in worker thread (~A): ~A:\n ~A\n" + name + initializer + exn) + #f) + (lambda () + (with-throw-handler #t + initializer + (lambda args + (backtrace)))) + #:unwind? #t))) + + (if args + args + ;; never give up, just keep retrying + (begin + (sleep 5) + (initializer/safe))))) + + (define (destructor/safe args) + (let ((success? + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running destructor in worker thread (~A): ~A:\n ~A\n" + name + initializer + exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (apply destructor args) + #t) + (lambda _ + (backtrace)))) + #:unwind? #t))) + + (or success? + #t + (begin + (sleep 5) + (destructor/safe args))))) + (let ((channel (make-channel))) (for-each - (lambda _ + (lambda (thread-index) (call-with-new-thread (lambda () - (let init ((args (initializer))) - (parameterize ((%worker-thread-args args)) + (catch 'system-error + (lambda () + (set-thread-name + (string-append + name " w t " + (number->string thread-index)))) + (const #t)) + + (let init ((args (initializer/safe))) + (parameterize ((param args)) (let loop ((current-lifetime lifetime)) - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second)) - (put-message - reply - (let ((start-time (get-internal-real-time))) - (with-exception-handler - (lambda (exn) - (list 'worker-thread-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (call-with-values - (lambda () - (apply proc args)) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) + (let ((exception? + (match (get-message channel) + (((? channel? reply) sent-time (? procedure? proc)) + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay internal-time-units-per-second) - vals)))) - (lambda args - (when (match args - (('%exception exn) - (log-exception? exn)) - (_ #t)) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" args) - (backtrace))))) - #:unwind? #t)))))) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))) + proc) + + (let* ((start-time (get-internal-real-time)) + (response + (with-exception-handler + (lambda (exn) + (list 'worker-thread-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))) + (lambda args + (when (match args + (('%exception exn) + (log-exception? exn)) + (_ #t)) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" args) + (backtrace))))) + #:unwind? #t))) + (put-message reply + response) + + (match response + (('worker-thread-error duration _) + (when duration-logger + (duration-logger duration proc)) + #t) + ((duration . _) + (when duration-logger + (duration-logger duration proc)) + #f)))))))) + + (unless (and expire-on-exception? + exception?) + (if (number? current-lifetime) + (unless (< current-lifetime 0) + (loop (if current-lifetime + (- current-lifetime 1) + #f))) + (loop #f)))))) + (when destructor - (apply destructor args)) - (init (initializer)))))) + (destructor/safe args)) + + (init (initializer/safe)))))) (iota parallelism)) - channel)) -(define* (call-with-worker-thread channel proc #:key duration-logger) + (worker-thread-set channel + param))) + +(define* (call-with-worker-thread record proc #:key duration-logger) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) + (let ((args ((worker-thread-set-arguments-parameter record)))) (if args (apply proc args) (let ((reply (make-channel))) - (put-message channel (list reply (get-internal-real-time) proc)) + (put-message (worker-thread-set-channel record) + (list reply (get-internal-real-time) proc)) (match (get-message reply) (('worker-thread-error duration exn) (when duration-logger @@ -651,3 +568,221 @@ If already in the worker thread, call PROC immediately." (when duration-logger (duration-logger duration)) (apply values result))))))) + +(define* (open-socket-for-uri* uri + #:key (verify-certificate? #t)) + (define tls-wrap + (@@ (web client) tls-wrap)) + + (define https? + (eq? 'https (uri-scheme uri))) + + (define plain-uri + (if https? + (build-uri + 'http + #:userinfo (uri-userinfo uri) + #:host (uri-host uri) + #:port (or (uri-port uri) 443) + #:path (uri-path uri) + #:query (uri-query uri) + #:fragment (uri-fragment uri)) + uri)) + + (let ((s (open-socket-for-uri plain-uri))) + (values + (if https? + (let ((port + (tls-wrap s (uri-host uri) + #:verify-certificate? verify-certificate?))) + ;; Guile/guile-gnutls don't handle the handshake happening on a non + ;; blocking socket, so change the behavior here. + (let ((flags (fcntl s F_GETFL))) + (fcntl s F_SETFL (logior O_NONBLOCK flags))) + port) + (let ((flags (fcntl s F_GETFL))) + (fcntl s F_SETFL (logior O_NONBLOCK flags)) + s)) + s))) + +;; Copied from (fibers web server) +(define (call-with-sigint thunk cvar) + (let ((handler #f)) + (dynamic-wind + (lambda () + (set! handler + (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) + thunk + (lambda () + (if handler + ;; restore Scheme handler, SIG_IGN or SIG_DFL. + (sigaction SIGINT (car handler) (cdr handler)) + ;; restore original C handler. + (sigaction SIGINT #f)))))) + +;; This variant of run-server from the fibers library supports running +;; multiple servers within one process. +(define run-server/patched + (let ((fibers-web-server-module + (resolve-module '(fibers web server)))) + + (define set-nonblocking! + (module-ref fibers-web-server-module 'set-nonblocking!)) + + (define make-default-socket + (module-ref fibers-web-server-module 'make-default-socket)) + + (define socket-loop + (module-ref fibers-web-server-module 'socket-loop)) + + (lambda* (handler + #:key + (host #f) + (family AF_INET) + (addr (if host + (inet-pton family host) + INADDR_LOOPBACK)) + (port 8080) + (socket (make-default-socket family addr port))) + ;; We use a large backlog by default. If the server is suddenly hit + ;; with a number of connections on a small backlog, clients won't + ;; receive confirmation for their SYN, leading them to retry -- + ;; probably successfully, but with a large latency. + (listen socket 1024) + (set-nonblocking! socket) + (sigaction SIGPIPE SIG_IGN) + (spawn-fiber (lambda () (socket-loop socket handler)))))) + +;; These procedure are subject to spurious wakeups. + +(define (readable? port) + "Test if PORT is writable." + (match (select (vector port) #() #() 0) + ((#() #() #()) #f) + ((#(_) #() #()) #t))) + +(define (writable? port) + "Test if PORT is writable." + (match (select #() (vector port) #() 0) + ((#() #() #()) #f) + ((#() #(_) #()) #t))) + +(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) + (make-base-operation #f + (lambda _ + (and (ready? (port-ready-fd port)) values)) + (lambda (flag sched resume) + (define (commit) + (match (atomic-box-compare-and-swap! flag 'W 'S) + ('W (resume values)) + ('C (commit)) + ('S #f))) + (schedule-when-ready + sched (port-ready-fd port) commit)))) + +(define (wait-until-port-readable-operation port) + "Make an operation that will succeed when PORT is readable." + (unless (input-port? port) + (error "refusing to wait forever for input on non-input port")) + (make-wait-operation readable? schedule-task-when-fd-readable port + port-read-wait-fd + wait-until-port-readable-operation)) + +(define (wait-until-port-writable-operation port) + "Make an operation that will succeed when PORT is writable." + (unless (output-port? port) + (error "refusing to wait forever for output on non-output port")) + (make-wait-operation writable? schedule-task-when-fd-writable port + port-write-wait-fd + wait-until-port-writable-operation)) + + + +(define &port-timeout + (make-exception-type '&port-timeout + &external-error + '(port))) + +(define make-port-timeout-error + (record-constructor &port-timeout)) + +(define port-timeout-error? + (record-predicate &port-timeout)) + +(define &port-read-timeout + (make-exception-type '&port-read-timeout + &port-timeout + '())) + +(define make-port-read-timeout-error + (record-constructor &port-read-timeout)) + +(define port-read-timeout-error? + (record-predicate &port-read-timeout)) + +(define &port-write-timeout + (make-exception-type '&port-write-timeout + &port-timeout + '())) + +(define make-port-write-timeout-error + (record-constructor &port-write-timeout)) + +(define port-write-timeout-error? + (record-predicate &port-write-timeout)) + +(define* (with-port-timeouts thunk + #:key timeout + (read-timeout timeout) + (write-timeout timeout)) + (define (no-fibers-wait port mode timeout) + (define poll-timeout-ms 200) + + ;; When the GC runs, it restarts the poll syscall, but the timeout + ;; remains unchanged! When the timeout is longer than the time + ;; between the syscall restarting, I think this renders the + ;; timeout useless. Therefore, this code uses a short timeout, and + ;; repeatedly calls poll while watching the clock to see if it has + ;; timed out overall. + (let ((timeout-internal + (+ (get-internal-real-time) + (* internal-time-units-per-second + (/ timeout 1000))))) + (let loop ((poll-value + (port-poll port mode poll-timeout-ms))) + (if (= poll-value 0) + (if (> (get-internal-real-time) + timeout-internal) + (raise-exception + (if (string=? mode "r") + (make-port-read-timeout-error port) + (make-port-write-timeout-error port))) + (loop (port-poll port mode poll-timeout-ms))) + poll-value)))) + + (parameterize + ((current-read-waiter + (lambda (port) + (if (current-scheduler) + (perform-operation + (choice-operation + (wait-until-port-readable-operation port) + (wrap-operation + (sleep-operation read-timeout) + (lambda () + (raise-exception + (make-port-read-timeout-error thunk port)))))) + (no-fibers-wait port "r" read-timeout)))) + (current-write-waiter + (lambda (port) + (if (current-scheduler) + (perform-operation + (choice-operation + (wait-until-port-writable-operation port) + (wrap-operation + (sleep-operation write-timeout) + (lambda () + (raise-exception + (make-port-write-timeout-error thunk port)))))) + (no-fibers-wait port "w" write-timeout))))) + (thunk))) |