blob: 2dfb0872c4419645a5af3a124c87521c0ce97199 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
(define-module (guix-build-coordinator utils fibers)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers conditions)
#:export (make-worker-thread-channel
call-with-worker-thread
call-with-sigint
run-server/patched))
(define %worker-thread-args
(make-parameter #f))
(define* (make-worker-thread-channel initializer
#:key (parallelism 1)
(delay-logger (lambda _ #f)))
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the worker thread procedure."
(parameterize (((@@ (fibers internal) current-fiber) #f))
(let ((channel (make-channel)))
(for-each
(lambda _
(let ((args (initializer)))
(call-with-new-thread
(lambda ()
(parameterize ((%worker-thread-args args))
(let loop ()
(match (get-message channel)
(((? channel? reply) sent-time (? procedure? proc))
(let ((time-delay
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
internal-time-units-per-second))
(put-message
reply
(with-exception-handler
(lambda (exn)
(cons 'worker-thread-error exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"worker-thread: exception: ~A\n" exn)
(backtrace)
(raise-exception exn))
(lambda ()
(call-with-values
(lambda ()
(apply proc args))
(lambda vals vals)))))
#:unwind? #t)))))
(loop)))))))
(iota parallelism))
channel)))
(define (call-with-worker-thread channel proc)
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
(let ((args (%worker-thread-args)))
(if args
(apply proc args)
(let ((reply (make-channel)))
(put-message channel (list reply (get-internal-real-time) proc))
(match (get-message reply)
(('worker-thread-error . exn)
(raise-exception exn))
(result
(apply values result)))))))
;; Copied from (fibers web server)
(define (call-with-sigint thunk cvar)
(let ((handler #f))
(dynamic-wind
(lambda ()
(set! handler
(sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
thunk
(lambda ()
(if handler
;; restore Scheme handler, SIG_IGN or SIG_DFL.
(sigaction SIGINT (car handler) (cdr handler))
;; restore original C handler.
(sigaction SIGINT #f))))))
;; This variant of run-server from the fibers library supports running
;; multiple servers within one process.
(define run-server/patched
(let ((fibers-web-server-module
(resolve-module '(fibers web server))))
(define set-nonblocking!
(module-ref fibers-web-server-module 'set-nonblocking!))
(define make-default-socket
(module-ref fibers-web-server-module 'make-default-socket))
(define socket-loop
(module-ref fibers-web-server-module 'socket-loop))
(lambda* (handler
#:key
(host #f)
(family AF_INET)
(addr (if host
(inet-pton family host)
INADDR_LOOPBACK))
(port 8080)
(socket (make-default-socket family addr port)))
;; We use a large backlog by default. If the server is suddenly hit
;; with a number of connections on a small backlog, clients won't
;; receive confirmation for their SYN, leading them to retry --
;; probably successfully, but with a large latency.
(listen socket 1024)
(set-nonblocking! socket)
(sigaction SIGPIPE SIG_IGN)
(spawn-fiber (lambda () (socket-loop socket handler))))))
|