aboutsummaryrefslogtreecommitdiff
path: root/nar-herder/utils.scm
diff options
context:
space:
mode:
Diffstat (limited to 'nar-herder/utils.scm')
-rw-r--r--nar-herder/utils.scm837
1 files changed, 486 insertions, 351 deletions
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index 2d62360..4755d33 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -18,44 +18,37 @@
(define-module (nar-herder utils)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
#:use-module (srfi srfi-19) ; time
#:use-module (ice-9 q)
- ;; #:use-module (ice-9 ftw)
- ;; #:use-module (ice-9 popen)
#:use-module (ice-9 iconv)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
+ #:use-module (ice-9 atomic)
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 binary-ports)
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
+ #:use-module (ice-9 suspendable-ports)
+ #:use-module ((ice-9 ports internal) #:select (port-poll
+ port-read-wait-fd
+ port-write-wait-fd))
#:use-module (web uri)
#:use-module (web http)
#:use-module (web client)
#:use-module (web request)
#:use-module (web response)
#:use-module (fibers)
+ #:use-module (fibers timers)
#:use-module (fibers channels)
+ #:use-module (fibers scheduler)
#:use-module (fibers conditions)
- ;; #:use-module (gcrypt pk-crypto)
- ;; #:use-module (gcrypt hash)
- ;; #:use-module (gcrypt random)
- ;; #:use-module (json)
- ;; #:use-module (guix pki)
- ;; #:use-module (guix utils)
- ;; #:use-module (guix config)
- ;; #:use-module (guix store)
- ;; #:use-module (guix status)
- ;; #:use-module (guix base64)
- ;; #:use-module (guix scripts substitute)
- #:export (call-with-streaming-http-request
- &chunked-input-ended-prematurely
- chunked-input-ended-prematurely-error?
- make-chunked-input-port*
-
- make-worker-thread-channel
+ #:use-module (fibers operations)
+ #:use-module ((guix build syscalls)
+ #:select (set-thread-name))
+ #:export (make-worker-thread-set
call-with-worker-thread
call-with-time-logging
@@ -65,171 +58,20 @@
create-work-queue
- check-locale!))
-
-;; Chunked Responses
-(define (read-chunk-header port)
- "Read a chunk header from PORT and return the size in bytes of the
-upcoming chunk."
- (match (read-line port)
- ((? eof-object?)
- ;; Connection closed prematurely: there's nothing left to read.
- (error "chunked input ended prematurely"))
- (str
- (let ((extension-start (string-index str
- (lambda (c)
- (or (char=? c #\;)
- (char=? c #\return))))))
- (string->number (if extension-start ; unnecessary?
- (substring str 0 extension-start)
- str)
- 16)))))
-
-(define &chunked-input-ended-prematurely
- (make-exception-type '&chunked-input-error-prematurely
- &external-error
- '()))
+ check-locale!
-(define make-chunked-input-ended-prematurely-error
- (record-constructor &chunked-input-ended-prematurely))
-
-(define chunked-input-ended-prematurely-error?
- (record-predicate &chunked-input-ended-prematurely))
-
-(define* (make-chunked-input-port* port #:key (keep-alive? #f))
- (define (close)
- (unless keep-alive?
- (close-port port)))
-
- (define chunk-size 0) ;size of the current chunk
- (define remaining 0) ;number of bytes left from the current chunk
- (define finished? #f) ;did we get all the chunks?
-
- (define (read! bv idx to-read)
- (define (loop to-read num-read)
- (cond ((or finished? (zero? to-read))
- num-read)
- ((zero? remaining) ;get a new chunk
- (let ((size (read-chunk-header port)))
- (set! chunk-size size)
- (set! remaining size)
- (cond
- ((zero? size)
- (set! finished? #t)
- (get-bytevector-n port 2) ; \r\n follows the last chunk
- num-read)
- (else
- (loop to-read num-read)))))
- (else ;read from the current chunk
- (let* ((ask-for (min to-read remaining))
- (read (get-bytevector-n! port bv (+ idx num-read)
- ask-for)))
- (cond
- ((eof-object? read) ;premature termination
- (raise-exception
- (make-chunked-input-ended-prematurely-error)))
- (else
- (let ((left (- remaining read)))
- (set! remaining left)
- (when (zero? left)
- ;; We're done with this chunk; read CR and LF.
- (get-u8 port) (get-u8 port))
- (loop (- to-read read)
- (+ num-read read)))))))))
- (loop to-read 0))
-
- (make-custom-binary-input-port "chunked input port" read! #f #f close))
-
-(define* (make-chunked-output-port* port #:key (keep-alive? #f)
- (buffering 1200)
- report-bytes-sent)
- (define heap-allocated-limit
- (expt 2 20)) ;; 1MiB
-
- (define (%put-string s)
- (unless (string-null? s)
- (let* ((bv (string->bytevector s "ISO-8859-1"))
- (length (bytevector-length bv)))
- (put-string port (number->string length 16))
- (put-string port "\r\n")
- (put-bytevector port bv)
- (put-string port "\r\n")
-
- (when report-bytes-sent
- (report-bytes-sent length))
- (let* ((stats (gc-stats))
- (initial-gc-times
- (assq-ref stats 'gc-times)))
- (when (> (assq-ref stats 'heap-allocated-since-gc)
- heap-allocated-limit)
- (while (let ((updated-stats (gc-stats)))
- (= (assq-ref updated-stats 'gc-times)
- initial-gc-times))
- (gc)
- (usleep 50)))))))
-
- (define (%put-char c)
- (%put-string (list->string (list c))))
-
- (define (flush) #t)
- (define (close)
- (put-string port "0\r\n\r\n")
- (force-output port)
- (unless keep-alive?
- (close-port port)))
- (let ((ret (make-soft-port
- (vector %put-char %put-string flush #f close) "w")))
- (setvbuf ret 'block buffering)
- ret))
-
-(define* (call-with-streaming-http-request uri callback
- #:key (headers '())
- (method 'PUT)
- report-bytes-sent)
- (let* ((port (open-socket-for-uri uri))
- (request
- (build-request
- uri
- #:method method
- #:version '(1 . 1)
- #:headers `((connection close)
- (Transfer-Encoding . "chunked")
- (Content-Type . "application/octet-stream")
- ,@headers)
- #:port port)))
-
- (set-port-encoding! port "ISO-8859-1")
- (setvbuf port 'block (expt 2 13))
- (with-exception-handler
- (lambda (exp)
- (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp)
- (close-port port)
- (raise-exception exp))
- (lambda ()
- (let ((request (write-request request port)))
- (let* ((chunked-output-port
- (make-chunked-output-port*
- port
- #:buffering (expt 2 12)
- #:keep-alive? #t
- #:report-bytes-sent report-bytes-sent)))
-
- ;; A SIGPIPE will kill Guile, so ignore it
- (sigaction SIGPIPE
- (lambda (arg)
- (simple-format (current-error-port) "warning: SIGPIPE\n")))
-
- (set-port-encoding! chunked-output-port "ISO-8859-1")
- (callback chunked-output-port)
- (close-port chunked-output-port)
-
- (let ((response (read-response port)))
- (let ((body (read-response-body response)))
- (close-port port)
- (values response
- body)))))))))
-
-(define* (retry-on-error f #:key times delay ignore)
+ open-socket-for-uri*
+
+ call-with-sigint
+ run-server/patched
+
+ timeout-error?
+
+ port-read-timeout-error?
+ port-write-timeout-error?
+ with-port-timeouts))
+
+(define* (retry-on-error f #:key times delay ignore error-hook)
(let loop ((attempt 1))
(match (with-exception-handler
(lambda (exn)
@@ -259,15 +101,26 @@ upcoming chunk."
times))
(apply values return-values))
((#f . exn)
- (if (>= attempt times)
+ (if (>= attempt
+ (- times 1))
(begin
(simple-format
(current-error-port)
- "error: ~A:\n ~A,\n giving up after ~A attempts\n"
+ "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n"
f
exn
- times)
- (raise-exception exn))
+ attempt
+ times
+ delay)
+ (when error-hook
+ (error-hook attempt exn))
+ (sleep delay)
+ (simple-format
+ (current-error-port)
+ "running last retry of ~A after ~A failed attempts\n"
+ f
+ attempt)
+ (f))
(begin
(simple-format
(current-error-port)
@@ -277,71 +130,11 @@ upcoming chunk."
attempt
times
delay)
+ (when error-hook
+ (error-hook attempt exn))
(sleep delay)
(loop (+ 1 attempt))))))))
-(define delay-logging-fluid
- (make-thread-local-fluid))
-(define delay-logging-depth-fluid
- (make-thread-local-fluid 0))
-
-(define (log-delay proc duration)
- (and=> (fluid-ref delay-logging-fluid)
- (lambda (recorder)
- (recorder proc duration))))
-
-(define* (call-with-delay-logging proc #:key (threshold 1) (args '()))
- (let ((start (get-internal-real-time))
- (trace '())
- (root-logger? (eq? #f (fluid-ref delay-logging-fluid))))
-
- (define (format-seconds seconds)
- (format #f "~4f" seconds))
-
- (call-with-values
- (lambda ()
- (with-fluid* delay-logging-depth-fluid
- (+ 1 (fluid-ref delay-logging-depth-fluid))
- (lambda ()
- (if root-logger?
- (with-fluid* delay-logging-fluid
- (lambda (proc duration)
- (set! trace
- (cons (list proc
- duration
- (fluid-ref delay-logging-depth-fluid))
- trace))
- #t)
- (lambda ()
- (apply proc args)))
- (apply proc args)))))
- (lambda vals
- (let ((elapsed-seconds
- (/ (- (get-internal-real-time)
- start)
- internal-time-units-per-second)))
- (if (and (> elapsed-seconds threshold)
- root-logger?)
- (let ((lines
- (cons
- (simple-format #f "warning: delay of ~A seconds: ~A"
- (format-seconds elapsed-seconds)
- proc)
- (map (match-lambda
- ((proc duration depth)
- (string-append
- (make-string (* 2 depth) #\space)
- (simple-format #f "~A: ~A"
- (format-seconds duration)
- proc))))
- trace))))
- (display (string-append
- (string-join lines "\n")
- "\n")))
- (unless root-logger?
- ((fluid-ref delay-logging-fluid) proc elapsed-seconds))))
- (apply values vals)))))
-
(define (call-with-time-logging name thunk)
(let ((start (current-time time-utc)))
(call-with-values
@@ -364,7 +157,9 @@ upcoming chunk."
(define* (create-work-queue thread-count-parameter proc
#:key thread-start-delay
(thread-stop-delay
- (make-time time-duration 0 0)))
+ (make-time time-duration 0 0))
+ (name "unnamed")
+ priority<?)
(let ((queue (make-q))
(queue-mutex (make-mutex))
(job-available (make-condition-variable))
@@ -384,11 +179,26 @@ upcoming chunk."
(else
thread-count-parameter)))
- (define (process-job . args)
- (with-mutex queue-mutex
- (enq! queue args)
- (start-new-threads-if-necessary (get-thread-count))
- (signal-condition-variable job-available)))
+ (define process-job
+ (if priority<?
+ (lambda* (args #:key priority)
+ (with-mutex queue-mutex
+ (enq! queue (cons priority args))
+ (set-car!
+ queue
+ (stable-sort! (car queue)
+ (lambda (a b)
+ (priority<?
+ (car a)
+ (car b)))))
+ (sync-q! queue)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))
+ (lambda args
+ (with-mutex queue-mutex
+ (enq! queue args)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))))
(define (count-threads)
(with-mutex queue-mutex
@@ -403,11 +213,12 @@ upcoming chunk."
(define (list-jobs)
(with-mutex queue-mutex
- (append (list-copy
- (car queue))
+ (append (if priority<?
+ (map cdr (car queue))
+ (list-copy (car queue)))
(hash-fold (lambda (key val result)
- (or (and val
- (cons val result))
+ (if val
+ (cons val result)
result))
'()
running-job-args))))
@@ -416,16 +227,17 @@ upcoming chunk."
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
- "job raised exception: ~A\n"
- job-args))
+ "~A work queue, job raised exception ~A: ~A\n"
+ name job-args exn))
(lambda ()
(with-throw-handler #t
(lambda ()
(apply proc job-args))
(lambda (key . args)
- (simple-format (current-error-port)
- "exception when handling job: ~A ~A\n"
- key args)
+ (simple-format
+ (current-error-port)
+ "~A work queue, exception when handling job: ~A ~A\n"
+ name key args)
(backtrace))))
#:unwind? #t))
@@ -453,6 +265,13 @@ upcoming chunk."
(call-with-new-thread
(lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append name " q t "
+ (number->string thread-index))))
+ (const #t))
+
(let loop ((last-job-finished-at (current-time time-monotonic)))
(lock-mutex queue-mutex)
@@ -469,9 +288,13 @@ upcoming chunk."
;; the job in the mean time
(if (q-empty? queue)
#f
- (deq! queue))
+ (if priority<?
+ (cdr (deq! queue))
+ (deq! queue)))
#f)
- (deq! queue))))
+ (if priority<?
+ (cdr (deq! queue))
+ (deq! queue)))))
(if job-args
(begin
@@ -499,32 +322,38 @@ upcoming chunk."
(define start-new-threads-if-necessary
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
(lambda (desired-count)
- (let* ((thread-count
- (hash-count (const #t) running-job-args))
- (threads-to-start
- (- desired-count thread-count)))
- (when (> threads-to-start 0)
- (for-each
- (lambda (thread-index)
- (when (eq? (hash-ref running-job-args
- thread-index
- 'slot-free)
- 'slot-free)
- (let* ((now (current-time time-monotonic))
- (elapsed (time-difference now
- previous-thread-started-at)))
- (when (or (eq? #f thread-start-delay)
- (time>=? elapsed thread-start-delay))
- (set! previous-thread-started-at now)
- (hash-set! running-job-args
- thread-index
- #f)
- (start-thread thread-index)))))
- (iota desired-count)))))))
+ (let* ((thread-count
+ (hash-count (const #t) running-job-args))
+ (threads-to-start
+ (- desired-count thread-count)))
+ (when (> threads-to-start 0)
+ (for-each
+ (lambda (thread-index)
+ (when (eq? (hash-ref running-job-args
+ thread-index
+ 'slot-free)
+ 'slot-free)
+ (let* ((now (current-time time-monotonic))
+ (elapsed (time-difference now
+ previous-thread-started-at)))
+ (when (or (eq? #f thread-start-delay)
+ (time>=? elapsed thread-start-delay))
+ (set! previous-thread-started-at now)
+ (hash-set! running-job-args
+ thread-index
+ #f)
+ (start-thread thread-index)))))
+ (iota desired-count)))))))
(if (procedure? thread-count-parameter)
(call-with-new-thread
(lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append name " q t")))
+ (const #t))
+
(while #t
(sleep 15)
(with-mutex queue-mutex
@@ -565,83 +394,171 @@ falling back to en_US.utf8\n"
(setlocale LC_ALL ""))
#:unwind? #t))
-(define %worker-thread-args
- (make-parameter #f))
-
-(define* (make-worker-thread-channel initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- destructor
- lifetime
- (log-exception? (const #t)))
- "Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
+(define-record-type <worker-thread-set>
+ (worker-thread-set channel arguments-parameter)
+ worker-thread-set?
+ (channel worker-thread-set-channel)
+ (arguments-parameter worker-thread-set-arguments-parameter))
+
+(define* (make-worker-thread-set initializer
+ #:key (parallelism 1)
+ (delay-logger (lambda _ #f))
+ (duration-logger (const #f))
+ destructor
+ lifetime
+ (log-exception? (const #t))
+ (expire-on-exception? #f)
+ (name "unnamed"))
+ (define param
+ (make-parameter #f))
+
+ (define (initializer/safe)
+ (let ((args
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running initializer in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ initializer
+ (lambda args
+ (backtrace))))
+ #:unwind? #t)))
+
+ (if args
+ args
+ ;; never give up, just keep retrying
+ (begin
+ (sleep 5)
+ (initializer/safe)))))
+
+ (define (destructor/safe args)
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running destructor in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply destructor args)
+ #t)
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)))
+
+ (or success?
+ #t
+ (begin
+ (sleep 5)
+ (destructor/safe args)))))
+
(let ((channel (make-channel)))
(for-each
- (lambda _
+ (lambda (thread-index)
(call-with-new-thread
(lambda ()
- (let init ((args (initializer)))
- (parameterize ((%worker-thread-args args))
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append
+ name " w t "
+ (number->string thread-index))))
+ (const #t))
+
+ (let init ((args (initializer/safe)))
+ (parameterize ((param args))
(let loop ((current-lifetime lifetime))
- (match (get-message channel)
- (((? channel? reply) sent-time (? procedure? proc))
- (let ((time-delay
- (- (get-internal-real-time)
- sent-time)))
- (delay-logger (/ time-delay
- internal-time-units-per-second))
- (put-message
- reply
- (let ((start-time (get-internal-real-time)))
- (with-exception-handler
- (lambda (exn)
- (list 'worker-thread-error
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (call-with-values
- (lambda ()
- (apply proc args))
- (lambda vals
- (cons (/ (- (get-internal-real-time)
- start-time)
+ (let ((exception?
+ (match (get-message channel)
+ (((? channel? reply) sent-time (? procedure? proc))
+ (let ((time-delay
+ (- (get-internal-real-time)
+ sent-time)))
+ (delay-logger (/ time-delay
internal-time-units-per-second)
- vals))))
- (lambda args
- (when (match args
- (('%exception exn)
- (log-exception? exn))
- (_ #t))
- (simple-format
- (current-error-port)
- "worker-thread: exception: ~A\n" args)
- (backtrace)))))
- #:unwind? #t))))))
- (if (number? current-lifetime)
- (unless (< current-lifetime 0)
- (loop (if current-lifetime
- (- current-lifetime 1)
- #f)))
- (loop #f))))
+ proc)
+
+ (let* ((start-time (get-internal-real-time))
+ (response
+ (with-exception-handler
+ (lambda (exn)
+ (list 'worker-thread-error
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ exn))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (apply proc args))
+ (lambda vals
+ (cons (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ vals))))
+ (lambda args
+ (when (match args
+ (('%exception exn)
+ (log-exception? exn))
+ (_ #t))
+ (simple-format
+ (current-error-port)
+ "worker-thread: exception: ~A\n" args)
+ (backtrace)))))
+ #:unwind? #t)))
+ (put-message reply
+ response)
+
+ (match response
+ (('worker-thread-error duration _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #t)
+ ((duration . _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #f))))))))
+
+ (unless (and expire-on-exception?
+ exception?)
+ (if (number? current-lifetime)
+ (unless (< current-lifetime 0)
+ (loop (if current-lifetime
+ (- current-lifetime 1)
+ #f)))
+ (loop #f))))))
+
(when destructor
- (apply destructor args))
- (init (initializer))))))
+ (destructor/safe args))
+
+ (init (initializer/safe))))))
(iota parallelism))
- channel))
-(define* (call-with-worker-thread channel proc #:key duration-logger)
+ (worker-thread-set channel
+ param)))
+
+(define* (call-with-worker-thread record proc #:key duration-logger)
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
- (let ((args (%worker-thread-args)))
+ (let ((args ((worker-thread-set-arguments-parameter record))))
(if args
(apply proc args)
(let ((reply (make-channel)))
- (put-message channel (list reply (get-internal-real-time) proc))
+ (put-message (worker-thread-set-channel record)
+ (list reply (get-internal-real-time) proc))
(match (get-message reply)
(('worker-thread-error duration exn)
(when duration-logger
@@ -651,3 +568,221 @@ If already in the worker thread, call PROC immediately."
(when duration-logger
(duration-logger duration))
(apply values result)))))))
+
+(define* (open-socket-for-uri* uri
+ #:key (verify-certificate? #t))
+ (define tls-wrap
+ (@@ (web client) tls-wrap))
+
+ (define https?
+ (eq? 'https (uri-scheme uri)))
+
+ (define plain-uri
+ (if https?
+ (build-uri
+ 'http
+ #:userinfo (uri-userinfo uri)
+ #:host (uri-host uri)
+ #:port (or (uri-port uri) 443)
+ #:path (uri-path uri)
+ #:query (uri-query uri)
+ #:fragment (uri-fragment uri))
+ uri))
+
+ (let ((s (open-socket-for-uri plain-uri)))
+ (values
+ (if https?
+ (let ((port
+ (tls-wrap s (uri-host uri)
+ #:verify-certificate? verify-certificate?)))
+ ;; Guile/guile-gnutls don't handle the handshake happening on a non
+ ;; blocking socket, so change the behavior here.
+ (let ((flags (fcntl s F_GETFL)))
+ (fcntl s F_SETFL (logior O_NONBLOCK flags)))
+ port)
+ (let ((flags (fcntl s F_GETFL)))
+ (fcntl s F_SETFL (logior O_NONBLOCK flags))
+ s))
+ s)))
+
+;; Copied from (fibers web server)
+(define (call-with-sigint thunk cvar)
+ (let ((handler #f))
+ (dynamic-wind
+ (lambda ()
+ (set! handler
+ (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
+ thunk
+ (lambda ()
+ (if handler
+ ;; restore Scheme handler, SIG_IGN or SIG_DFL.
+ (sigaction SIGINT (car handler) (cdr handler))
+ ;; restore original C handler.
+ (sigaction SIGINT #f))))))
+
+;; This variant of run-server from the fibers library supports running
+;; multiple servers within one process.
+(define run-server/patched
+ (let ((fibers-web-server-module
+ (resolve-module '(fibers web server))))
+
+ (define set-nonblocking!
+ (module-ref fibers-web-server-module 'set-nonblocking!))
+
+ (define make-default-socket
+ (module-ref fibers-web-server-module 'make-default-socket))
+
+ (define socket-loop
+ (module-ref fibers-web-server-module 'socket-loop))
+
+ (lambda* (handler
+ #:key
+ (host #f)
+ (family AF_INET)
+ (addr (if host
+ (inet-pton family host)
+ INADDR_LOOPBACK))
+ (port 8080)
+ (socket (make-default-socket family addr port)))
+ ;; We use a large backlog by default. If the server is suddenly hit
+ ;; with a number of connections on a small backlog, clients won't
+ ;; receive confirmation for their SYN, leading them to retry --
+ ;; probably successfully, but with a large latency.
+ (listen socket 1024)
+ (set-nonblocking! socket)
+ (sigaction SIGPIPE SIG_IGN)
+ (spawn-fiber (lambda () (socket-loop socket handler))))))
+
+;; These procedure are subject to spurious wakeups.
+
+(define (readable? port)
+ "Test if PORT is writable."
+ (match (select (vector port) #() #() 0)
+ ((#() #() #()) #f)
+ ((#(_) #() #()) #t)))
+
+(define (writable? port)
+ "Test if PORT is writable."
+ (match (select #() (vector port) #() 0)
+ ((#() #() #()) #f)
+ ((#() #(_) #()) #t)))
+
+(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
+ (make-base-operation #f
+ (lambda _
+ (and (ready? (port-ready-fd port)) values))
+ (lambda (flag sched resume)
+ (define (commit)
+ (match (atomic-box-compare-and-swap! flag 'W 'S)
+ ('W (resume values))
+ ('C (commit))
+ ('S #f)))
+ (schedule-when-ready
+ sched (port-ready-fd port) commit))))
+
+(define (wait-until-port-readable-operation port)
+ "Make an operation that will succeed when PORT is readable."
+ (unless (input-port? port)
+ (error "refusing to wait forever for input on non-input port"))
+ (make-wait-operation readable? schedule-task-when-fd-readable port
+ port-read-wait-fd
+ wait-until-port-readable-operation))
+
+(define (wait-until-port-writable-operation port)
+ "Make an operation that will succeed when PORT is writable."
+ (unless (output-port? port)
+ (error "refusing to wait forever for output on non-output port"))
+ (make-wait-operation writable? schedule-task-when-fd-writable port
+ port-write-wait-fd
+ wait-until-port-writable-operation))
+
+
+
+(define &port-timeout
+ (make-exception-type '&port-timeout
+ &external-error
+ '(port)))
+
+(define make-port-timeout-error
+ (record-constructor &port-timeout))
+
+(define port-timeout-error?
+ (record-predicate &port-timeout))
+
+(define &port-read-timeout
+ (make-exception-type '&port-read-timeout
+ &port-timeout
+ '()))
+
+(define make-port-read-timeout-error
+ (record-constructor &port-read-timeout))
+
+(define port-read-timeout-error?
+ (record-predicate &port-read-timeout))
+
+(define &port-write-timeout
+ (make-exception-type '&port-write-timeout
+ &port-timeout
+ '()))
+
+(define make-port-write-timeout-error
+ (record-constructor &port-write-timeout))
+
+(define port-write-timeout-error?
+ (record-predicate &port-write-timeout))
+
+(define* (with-port-timeouts thunk
+ #:key timeout
+ (read-timeout timeout)
+ (write-timeout timeout))
+ (define (no-fibers-wait port mode timeout)
+ (define poll-timeout-ms 200)
+
+ ;; When the GC runs, it restarts the poll syscall, but the timeout
+ ;; remains unchanged! When the timeout is longer than the time
+ ;; between the syscall restarting, I think this renders the
+ ;; timeout useless. Therefore, this code uses a short timeout, and
+ ;; repeatedly calls poll while watching the clock to see if it has
+ ;; timed out overall.
+ (let ((timeout-internal
+ (+ (get-internal-real-time)
+ (* internal-time-units-per-second
+ (/ timeout 1000)))))
+ (let loop ((poll-value
+ (port-poll port mode poll-timeout-ms)))
+ (if (= poll-value 0)
+ (if (> (get-internal-real-time)
+ timeout-internal)
+ (raise-exception
+ (if (string=? mode "r")
+ (make-port-read-timeout-error port)
+ (make-port-write-timeout-error port)))
+ (loop (port-poll port mode poll-timeout-ms)))
+ poll-value))))
+
+ (parameterize
+ ((current-read-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-readable-operation port)
+ (wrap-operation
+ (sleep-operation read-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-read-timeout-error thunk port))))))
+ (no-fibers-wait port "r" read-timeout))))
+ (current-write-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-writable-operation port)
+ (wrap-operation
+ (sleep-operation write-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-write-timeout-error thunk port))))))
+ (no-fibers-wait port "w" write-timeout)))))
+ (thunk)))