aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2025-01-13 09:28:57 +0000
committerChristopher Baines <mail@cbaines.net>2025-01-13 09:28:57 +0000
commit69c8f3d26112a97bdf04f2c50634d162bb6eeee3 (patch)
tree5c8bee05934c538a1c37a46fc0174af6b78abad4
parent2d513a2e24a6acd223ca3093c3fb2272865366dc (diff)
downloadnar-herder-69c8f3d26112a97bdf04f2c50634d162bb6eeee3.tar
nar-herder-69c8f3d26112a97bdf04f2c50634d162bb6eeee3.tar.gz
Use Guile Knots
-rw-r--r--guix-dev.scm34
-rw-r--r--nar-herder/cached-compression.scm6
-rw-r--r--nar-herder/database.scm1
-rw-r--r--nar-herder/mirror.scm6
-rw-r--r--nar-herder/server.scm9
-rw-r--r--nar-herder/storage.scm7
-rw-r--r--nar-herder/utils.scm626
7 files changed, 54 insertions, 635 deletions
diff --git a/guix-dev.scm b/guix-dev.scm
index 61e282c..b2e6da3 100644
--- a/guix-dev.scm
+++ b/guix-dev.scm
@@ -45,6 +45,39 @@
(gnu packages web)
(srfi srfi-1))
+(define guile-knots
+ (let ((commit "dcb56ee2c5ac3e283cb46841766e7282f3c2c52e")
+ (revision "1"))
+ (package
+ (name "guile-knots")
+ (version (git-version "0" revision commit))
+ (source (origin
+ (method git-fetch)
+ (uri (git-reference
+ (url "https://git.cbaines.net/git/guile/knots")
+ (commit commit)))
+ (sha256
+ (base32
+ "04z48572canx35hl0kfli3pf3g3m6184zvmnpyg1rbwla6g5z1fk"))
+ (file-name (string-append name "-" version "-checkout"))))
+ (build-system gnu-build-system)
+ (native-inputs
+ (list pkg-config
+ autoconf
+ automake
+ guile-3.0
+ guile-lib
+ guile-fibers))
+ (inputs
+ (list guile-3.0))
+ (propagated-inputs
+ (list guile-fibers))
+ (home-page "https://git.cbaines.net/guile/knots")
+ (synopsis "Patterns and functionality to use with Guile Fibers")
+ (description
+ "")
+ (license license:gpl3+))))
+
(package
(name "nar-herder")
(version "0")
@@ -54,6 +87,7 @@
`(("guix" ,guix)
("guile-json" ,guile-json-4)
("guile-fibers" ,guile-fibers-1.3)
+ ("guile-knots" ,guile-knots)
("guile-gcrypt" ,guile-gcrypt)
("guile-readline" ,guile-readline)
("guile-lzlib" ,guile-lzlib)
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm
index 5c257dc..1ac6d96 100644
--- a/nar-herder/cached-compression.scm
+++ b/nar-herder/cached-compression.scm
@@ -31,6 +31,9 @@
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers operations)
+ #:use-module (knots timeout)
+ #:use-module (knots non-blocking)
+ #:use-module (knots worker-threads)
#:use-module (web uri)
#:use-module (web client)
#:use-module (web response)
@@ -662,8 +665,7 @@
(call-with-values
(lambda ()
(let ((port
- socket
- (open-socket-for-uri* uri)))
+ (non-blocking-open-socket-for-uri uri)))
(http-get uri
#:port port
#:decode-body? #f
diff --git a/nar-herder/database.scm b/nar-herder/database.scm
index 63eeac0..7c88c8c 100644
--- a/nar-herder/database.scm
+++ b/nar-herder/database.scm
@@ -27,6 +27,7 @@
#:use-module (web uri)
#:use-module (sqlite3)
#:use-module (fibers)
+ #:use-module (knots worker-threads)
#:use-module (prometheus)
#:use-module (guix store)
#:use-module (guix narinfo)
diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm
index 14e23a9..67d4c00 100644
--- a/nar-herder/mirror.scm
+++ b/nar-herder/mirror.scm
@@ -32,6 +32,8 @@
#:use-module (json)
#:use-module (fibers)
#:use-module (fibers channels)
+ #:use-module (knots timeout)
+ #:use-module (knots non-blocking)
#:use-module (guix narinfo)
#:use-module ((guix store) #:select (store-path-hash-part))
#:use-module (nar-herder utils)
@@ -77,9 +79,7 @@
latest-recent-change)
(with-port-timeouts
(lambda ()
- (let ((port
- socket
- (open-socket-for-uri* uri)))
+ (let ((port (non-blocking-open-socket-for-uri uri)))
(http-get uri
#:port port
#:streaming? #t)))
diff --git a/nar-herder/server.scm b/nar-herder/server.scm
index 97f0567..b904675 100644
--- a/nar-herder/server.scm
+++ b/nar-herder/server.scm
@@ -38,6 +38,10 @@
#:use-module (fibers scheduler)
#:use-module (fibers conditions)
#:use-module (fibers operations)
+ #:use-module (knots)
+ #:use-module (knots timeout)
+ #:use-module (knots web-server)
+ #:use-module (knots non-blocking)
#:use-module (prometheus)
#:use-module (json)
#:use-module ((system foreign)
@@ -592,8 +596,7 @@
(simple-format (current-error-port)
"starting downloading the database\n")
(let ((port
- socket
- (open-socket-for-uri* database-uri)))
+ (non-blocking-open-socket-for-uri database-uri)))
(http-get database-uri
#:port port
#:streaming? #t)))
@@ -923,7 +926,7 @@
(log-msg 'INFO "starting server (" (getpid) "), listening on "
(assq-ref opts 'host) ":" (assq-ref opts 'port))
- (run-server/patched
+ (run-knots-web-server
(make-request-handler
database
canonical-storage
diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm
index 55e374e..0e7186d 100644
--- a/nar-herder/storage.scm
+++ b/nar-herder/storage.scm
@@ -28,6 +28,8 @@
#:use-module (web response)
#:use-module (fibers)
#:use-module (fibers channels)
+ #:use-module (knots timeout)
+ #:use-module (knots non-blocking)
#:use-module (logging logger)
#:use-module (logging port-log)
#:use-module (prometheus)
@@ -356,7 +358,7 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
;; Open a new connection to URI and evict old entries from
;; CACHE, if any.
(let ((socket
- (open-socket-for-uri*
+ (non-blocking-open-socket-for-uri
uri
#:verify-certificate? verify-certificate?))
(new-cache evicted
@@ -620,8 +622,7 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
(call-with-values
(lambda ()
(let ((port
- socket
- (open-socket-for-uri* uri)))
+ (non-blocking-open-socket-for-uri uri)))
(http-get uri
#:port port
#:decode-body? #f
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index 5bac2da..4155ea0 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -31,10 +31,6 @@
#:use-module (ice-9 binary-ports)
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
- #:use-module (ice-9 suspendable-ports)
- #:use-module ((ice-9 ports internal) #:select (port-poll
- port-read-wait-fd
- port-write-wait-fd))
#:use-module (web uri)
#:use-module (web http)
#:use-module (web client)
@@ -46,30 +42,12 @@
#:use-module (fibers scheduler)
#:use-module (fibers conditions)
#:use-module (fibers operations)
- #:use-module ((guix build syscalls)
- #:select (set-thread-name))
- #:export (make-worker-thread-set
- call-with-worker-thread
-
- call-with-time-logging
+ #:export (call-with-time-logging
with-time-logging
retry-on-error
- create-work-queue
-
- check-locale!
-
- open-socket-for-uri*
-
- call-with-sigint
- run-server/patched
-
- timeout-error?
-
- port-read-timeout-error?
- port-write-timeout-error?
- with-port-timeouts))
+ check-locale!))
(define* (retry-on-error f #:key times delay ignore error-hook)
(let loop ((attempt 1))
@@ -154,218 +132,6 @@
"Log under NAME the time taken to evaluate EXP."
(call-with-time-logging name (lambda () exp ...)))
-(define* (create-work-queue thread-count-parameter proc
- #:key thread-start-delay
- (thread-stop-delay
- (make-time time-duration 0 0))
- (name "unnamed")
- priority<?)
- (let ((queue (make-q))
- (queue-mutex (make-mutex))
- (job-available (make-condition-variable))
- (running-job-args (make-hash-table)))
-
- (define get-thread-count
- (cond
- ((number? thread-count-parameter)
- (const thread-count-parameter))
- ((eq? thread-count-parameter #f)
- ;; Run one thread per job
- (lambda ()
- (+ (q-length queue)
- (hash-count (lambda (index val)
- (list? val))
- running-job-args))))
- (else
- thread-count-parameter)))
-
- (define process-job
- (if priority<?
- (lambda* (args #:key priority)
- (with-mutex queue-mutex
- (enq! queue (cons priority args))
- (set-car!
- queue
- (stable-sort! (car queue)
- (lambda (a b)
- (priority<?
- (car a)
- (car b)))))
- (sync-q! queue)
- (start-new-threads-if-necessary (get-thread-count))
- (signal-condition-variable job-available)))
- (lambda args
- (with-mutex queue-mutex
- (enq! queue args)
- (start-new-threads-if-necessary (get-thread-count))
- (signal-condition-variable job-available)))))
-
- (define (count-threads)
- (with-mutex queue-mutex
- (hash-count (const #t) running-job-args)))
-
- (define (count-jobs)
- (with-mutex queue-mutex
- (+ (q-length queue)
- (hash-count (lambda (index val)
- (list? val))
- running-job-args))))
-
- (define (list-jobs)
- (with-mutex queue-mutex
- (append (if priority<?
- (map cdr (car queue))
- (list-copy (car queue)))
- (hash-fold (lambda (key val result)
- (if val
- (cons val result)
- result))
- '()
- running-job-args))))
-
- (define (thread-process-job job-args)
- (with-exception-handler
- (lambda (exn)
- (simple-format (current-error-port)
- "~A work queue, job raised exception ~A: ~A\n"
- name job-args exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply proc job-args))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "~A work queue, exception when handling job: ~A ~A\n"
- name key args)
- (backtrace))))
- #:unwind? #t))
-
- (define (start-thread thread-index)
- (define (too-many-threads?)
- (let ((running-jobs-count
- (hash-count (lambda (index val)
- (list? val))
- running-job-args))
- (desired-thread-count (get-thread-count)))
-
- (>= running-jobs-count
- desired-thread-count)))
-
- (define (thread-idle-for-too-long? last-job-finished-at)
- (time>=?
- (time-difference (current-time time-monotonic)
- last-job-finished-at)
- thread-stop-delay))
-
- (define (stop-thread)
- (hash-remove! running-job-args
- thread-index)
- (unlock-mutex queue-mutex))
-
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append name " q t "
- (number->string thread-index))))
- (const #t))
-
- (let loop ((last-job-finished-at (current-time time-monotonic)))
- (lock-mutex queue-mutex)
-
- (if (too-many-threads?)
- (stop-thread)
- (let ((job-args
- (if (q-empty? queue)
- ;; #f from wait-condition-variable indicates a timeout
- (if (wait-condition-variable
- job-available
- queue-mutex
- (+ 9 (time-second (current-time))))
- ;; Another thread could have taken
- ;; the job in the mean time
- (if (q-empty? queue)
- #f
- (if priority<?
- (cdr (deq! queue))
- (deq! queue)))
- #f)
- (if priority<?
- (cdr (deq! queue))
- (deq! queue)))))
-
- (if job-args
- (begin
- (hash-set! running-job-args
- thread-index
- job-args)
-
- (unlock-mutex queue-mutex)
- (thread-process-job job-args)
-
- (with-mutex queue-mutex
- (hash-set! running-job-args
- thread-index
- #f))
-
- (loop (current-time time-monotonic)))
- (if (thread-idle-for-too-long? last-job-finished-at)
- (stop-thread)
- (begin
- (unlock-mutex queue-mutex)
-
- (loop last-job-finished-at))))))))))
-
-
- (define start-new-threads-if-necessary
- (let ((previous-thread-started-at (make-time time-monotonic 0 0)))
- (lambda (desired-count)
- (let* ((thread-count
- (hash-count (const #t) running-job-args))
- (threads-to-start
- (- desired-count thread-count)))
- (when (> threads-to-start 0)
- (for-each
- (lambda (thread-index)
- (when (eq? (hash-ref running-job-args
- thread-index
- 'slot-free)
- 'slot-free)
- (let* ((now (current-time time-monotonic))
- (elapsed (time-difference now
- previous-thread-started-at)))
- (when (or (eq? #f thread-start-delay)
- (time>=? elapsed thread-start-delay))
- (set! previous-thread-started-at now)
- (hash-set! running-job-args
- thread-index
- #f)
- (start-thread thread-index)))))
- (iota desired-count)))))))
-
- (if (procedure? thread-count-parameter)
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append name " q t")))
- (const #t))
-
- (while #t
- (sleep 15)
- (with-mutex queue-mutex
- (let ((idle-threads (hash-count (lambda (index val)
- (eq? #f val))
- running-job-args)))
- (when (= 0 idle-threads)
- (start-new-threads-if-necessary (get-thread-count))))))))
- (start-new-threads-if-necessary (get-thread-count)))
-
- (values process-job count-jobs count-threads list-jobs)))
-
(define (check-locale!)
(with-exception-handler
(lambda (exn)
@@ -393,391 +159,3 @@ falling back to en_US.utf8\n"
(lambda _
(setlocale LC_ALL ""))
#:unwind? #t))
-
-(define-record-type <worker-thread-set>
- (worker-thread-set channel arguments-parameter)
- worker-thread-set?
- (channel worker-thread-set-channel)
- (arguments-parameter worker-thread-set-arguments-parameter))
-
-(define* (make-worker-thread-set initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- (duration-logger (const #f))
- destructor
- lifetime
- (log-exception? (const #t))
- (expire-on-exception? #f)
- (name "unnamed"))
- (define param
- (make-parameter #f))
-
- (define (initializer/safe)
- (let ((args
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running initializer in worker thread (~A): ~A:\n ~A\n"
- name
- initializer
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- initializer
- (lambda args
- (backtrace))))
- #:unwind? #t)))
-
- (if args
- args
- ;; never give up, just keep retrying
- (begin
- (sleep 5)
- (initializer/safe)))))
-
- (define (destructor/safe args)
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running destructor in worker thread (~A): ~A:\n ~A\n"
- name
- initializer
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply destructor args)
- #t)
- (lambda _
- (backtrace))))
- #:unwind? #t)))
-
- (or success?
- #t
- (begin
- (sleep 5)
- (destructor/safe args)))))
-
- (let ((channel (make-channel)))
- (for-each
- (lambda (thread-index)
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append
- name " w t "
- (number->string thread-index))))
- (const #t))
-
- (let init ((args (initializer/safe)))
- (parameterize ((param args))
- (let loop ((current-lifetime lifetime))
- (let ((exception?
- (match (get-message channel)
- (((? channel? reply) sent-time (? procedure? proc))
- (let ((time-delay
- (- (get-internal-real-time)
- sent-time)))
- (delay-logger (/ time-delay
- internal-time-units-per-second)
- proc)
-
- (let* ((start-time (get-internal-real-time))
- (response
- (with-exception-handler
- (lambda (exn)
- (list 'worker-thread-error
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (call-with-values
- (lambda ()
- (apply proc args))
- (lambda vals
- (cons (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- vals))))
- (lambda args
- (when (match args
- (('%exception exn)
- (log-exception? exn))
- (_ #t))
- (simple-format
- (current-error-port)
- "worker-thread: exception: ~A\n" args)
- (backtrace)))))
- #:unwind? #t)))
- (put-message reply
- response)
-
- (match response
- (('worker-thread-error duration _)
- (when duration-logger
- (duration-logger duration proc))
- #t)
- ((duration . _)
- (when duration-logger
- (duration-logger duration proc))
- #f))))))))
-
- (unless (and expire-on-exception?
- exception?)
- (if (number? current-lifetime)
- (unless (< current-lifetime 0)
- (loop (if current-lifetime
- (- current-lifetime 1)
- #f)))
- (loop #f))))))
-
- (when destructor
- (destructor/safe args))
-
- (init (initializer/safe))))))
- (iota parallelism))
-
- (worker-thread-set channel
- param)))
-
-(define* (call-with-worker-thread record proc #:key duration-logger)
- "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
-If already in the worker thread, call PROC immediately."
- (let ((args ((worker-thread-set-arguments-parameter record))))
- (if args
- (apply proc args)
- (let ((reply (make-channel)))
- (put-message (worker-thread-set-channel record)
- (list reply (get-internal-real-time) proc))
- (match (get-message reply)
- (('worker-thread-error duration exn)
- (when duration-logger
- (duration-logger duration))
- (raise-exception exn))
- ((duration . result)
- (when duration-logger
- (duration-logger duration))
- (apply values result)))))))
-
-(define* (open-socket-for-uri* uri
- #:key (verify-certificate? #t))
- (define tls-wrap
- (@@ (web client) tls-wrap))
-
- (define https?
- (eq? 'https (uri-scheme uri)))
-
- (define plain-uri
- (if https?
- (build-uri
- 'http
- #:userinfo (uri-userinfo uri)
- #:host (uri-host uri)
- #:port (or (uri-port uri) 443)
- #:path (uri-path uri)
- #:query (uri-query uri)
- #:fragment (uri-fragment uri))
- uri))
-
- (let ((s (open-socket-for-uri plain-uri)))
- (values
- (if https?
- (let ((port
- (tls-wrap s (uri-host uri)
- #:verify-certificate? verify-certificate?)))
- ;; Guile/guile-gnutls don't handle the handshake happening on a non
- ;; blocking socket, so change the behavior here.
- (let ((flags (fcntl s F_GETFL)))
- (fcntl s F_SETFL (logior O_NONBLOCK flags)))
- port)
- (let ((flags (fcntl s F_GETFL)))
- (fcntl s F_SETFL (logior O_NONBLOCK flags))
- s))
- s)))
-
-;; Copied from (fibers web server)
-(define (call-with-sigint thunk cvar)
- (let ((handler #f))
- (dynamic-wind
- (lambda ()
- (set! handler
- (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
- thunk
- (lambda ()
- (if handler
- ;; restore Scheme handler, SIG_IGN or SIG_DFL.
- (sigaction SIGINT (car handler) (cdr handler))
- ;; restore original C handler.
- (sigaction SIGINT #f))))))
-
-;; This variant of run-server from the fibers library supports running
-;; multiple servers within one process.
-(define run-server/patched
- (let ((fibers-web-server-module
- (resolve-module '(fibers web server))))
-
- (define set-nonblocking!
- (module-ref fibers-web-server-module 'set-nonblocking!))
-
- (define make-default-socket
- (module-ref fibers-web-server-module 'make-default-socket))
-
- (define socket-loop
- (module-ref fibers-web-server-module 'socket-loop))
-
- (lambda* (handler
- #:key
- (host #f)
- (family AF_INET)
- (addr (if host
- (inet-pton family host)
- INADDR_LOOPBACK))
- (port 8080)
- (socket (make-default-socket family addr port)))
- ;; We use a large backlog by default. If the server is suddenly hit
- ;; with a number of connections on a small backlog, clients won't
- ;; receive confirmation for their SYN, leading them to retry --
- ;; probably successfully, but with a large latency.
- (listen socket 1024)
- (set-nonblocking! socket)
- (sigaction SIGPIPE SIG_IGN)
- (spawn-fiber (lambda () (socket-loop socket handler))))))
-
-;; These procedure are subject to spurious wakeups.
-
-(define (readable? port)
- "Test if PORT is writable."
- (= 1 (port-poll port "r" 0)))
-
-(define (writable? port)
- "Test if PORT is writable."
- (= 1 (port-poll port "w" 0)))
-
-(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
- (make-base-operation #f
- (lambda _
- (and (ready? port) values))
- (lambda (flag sched resume)
- (define (commit)
- (match (atomic-box-compare-and-swap! flag 'W 'S)
- ('W (resume values))
- ('C (commit))
- ('S #f)))
- (schedule-when-ready
- sched (port-ready-fd port) commit))))
-
-(define (wait-until-port-readable-operation port)
- "Make an operation that will succeed when PORT is readable."
- (unless (input-port? port)
- (error "refusing to wait forever for input on non-input port"))
- (make-wait-operation readable? schedule-task-when-fd-readable port
- port-read-wait-fd
- wait-until-port-readable-operation))
-
-(define (wait-until-port-writable-operation port)
- "Make an operation that will succeed when PORT is writable."
- (unless (output-port? port)
- (error "refusing to wait forever for output on non-output port"))
- (make-wait-operation writable? schedule-task-when-fd-writable port
- port-write-wait-fd
- wait-until-port-writable-operation))
-
-
-
-(define &port-timeout
- (make-exception-type '&port-timeout
- &external-error
- '(thunk port)))
-
-(define make-port-timeout-error
- (record-constructor &port-timeout))
-
-(define port-timeout-error?
- (record-predicate &port-timeout))
-
-(define &port-read-timeout
- (make-exception-type '&port-read-timeout
- &port-timeout
- '()))
-
-(define make-port-read-timeout-error
- (record-constructor &port-read-timeout))
-
-(define port-read-timeout-error?
- (record-predicate &port-read-timeout))
-
-(define &port-write-timeout
- (make-exception-type '&port-write-timeout
- &port-timeout
- '()))
-
-(define make-port-write-timeout-error
- (record-constructor &port-write-timeout))
-
-(define port-write-timeout-error?
- (record-predicate &port-write-timeout))
-
-(define* (with-port-timeouts thunk
- #:key timeout
- (read-timeout timeout)
- (write-timeout timeout))
- (define (no-fibers-wait thunk port mode timeout)
- (define poll-timeout-ms 200)
-
- ;; When the GC runs, it restarts the poll syscall, but the timeout
- ;; remains unchanged! When the timeout is longer than the time
- ;; between the syscall restarting, I think this renders the
- ;; timeout useless. Therefore, this code uses a short timeout, and
- ;; repeatedly calls poll while watching the clock to see if it has
- ;; timed out overall.
- (let ((timeout-internal
- (+ (get-internal-real-time)
- (* timeout internal-time-units-per-second))))
- (let loop ((poll-value
- (port-poll port mode poll-timeout-ms)))
- (if (= poll-value 0)
- (if (> (get-internal-real-time)
- timeout-internal)
- (raise-exception
- (if (string=? mode "r")
- (make-port-read-timeout-error thunk port)
- (make-port-write-timeout-error thunk port)))
- (loop (port-poll port mode poll-timeout-ms)))
- poll-value))))
-
- (parameterize
- ((current-read-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-readable-operation port)
- (wrap-operation
- (sleep-operation read-timeout)
- (lambda ()
- (raise-exception
- (make-port-read-timeout-error thunk port))))))
- (no-fibers-wait thunk port "r" read-timeout))))
- (current-write-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-writable-operation port)
- (wrap-operation
- (sleep-operation write-timeout)
- (lambda ()
- (raise-exception
- (make-port-write-timeout-error thunk port))))))
- (no-fibers-wait thunk port "w" write-timeout)))))
- (thunk)))