aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/utils/fibers.scm
blob: a39b3bbd3da591edf8139129679331cc072a8d4d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
(define-module (guix-build-coordinator utils fibers)
  #:use-module (ice-9 match)
  #:use-module (ice-9 threads)
  #:use-module (fibers)
  #:use-module (fibers channels)
  #:use-module (fibers conditions)
  #:export (make-worker-thread-channel
            call-with-worker-thread

            call-with-sigint

            run-server/patched))

(define %worker-thread-args
  (make-parameter #f))

(define* (make-worker-thread-channel initializer
                                     #:key (parallelism 1))
  "Return a channel used to offload work to a dedicated thread.  ARGS are the
arguments of the worker thread procedure."
  (parameterize (((@@ (fibers internal) current-fiber) #f))
    (let ((channel (make-channel)))
      (for-each
       (lambda _
         (let ((args (initializer)))
           (call-with-new-thread
            (lambda ()
              (parameterize ((%worker-thread-args args))
                (let loop ()
                  (match (get-message channel)
                    (((? channel? reply) . (? procedure? proc))
                     (put-message
                      reply
                      (with-exception-handler
                          (lambda (exn)
                            (cons 'worker-thread-error exn))
                        (lambda ()
                          (with-exception-handler
                              (lambda (exn)
                                (simple-format
                                 (current-error-port)
                                 "worker-thread: exception: ~A\n" exn)
                                (backtrace)
                                (raise-exception exn))
                            (lambda ()
                              (call-with-values
                                  (lambda ()
                                    (apply proc args))
                                (lambda vals vals)))))
                        #:unwind? #t))))
                  (loop)))))))
       (iota parallelism))
      channel)))

(define (call-with-worker-thread channel proc)
  "Send PROC to the worker thread through CHANNEL.  Return the result of PROC.
If already in the worker thread, call PROC immediately."
  (let ((args (%worker-thread-args)))
    (if args
        (apply proc args)
        (let ((reply (make-channel)))
          (put-message channel (cons reply proc))
          (match (get-message reply)
            (('worker-thread-error . exn)
             (raise-exception exn))
            (result
             (apply values result)))))))

;; Copied from (fibers web server)
(define (call-with-sigint thunk cvar)
  (let ((handler #f))
    (dynamic-wind
      (lambda ()
        (set! handler
          (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
      thunk
      (lambda ()
        (if handler
            ;; restore Scheme handler, SIG_IGN or SIG_DFL.
            (sigaction SIGINT (car handler) (cdr handler))
            ;; restore original C handler.
            (sigaction SIGINT #f))))))

;; This variant of run-server from the fibers library supports running
;; multiple servers within one process.
(define run-server/patched
  (let ((fibers-web-server-module
         (resolve-module '(fibers web server))))

    (define set-nonblocking!
      (module-ref fibers-web-server-module 'set-nonblocking!))

    (define make-default-socket
      (module-ref fibers-web-server-module 'make-default-socket))

    (define socket-loop
      (module-ref fibers-web-server-module 'socket-loop))

    (lambda* (handler
              #:key
              (host #f)
              (family AF_INET)
              (addr (if host
                        (inet-pton family host)
                        INADDR_LOOPBACK))
              (port 8080)
              (socket (make-default-socket family addr port)))
      ;; We use a large backlog by default.  If the server is suddenly hit
      ;; with a number of connections on a small backlog, clients won't
      ;; receive confirmation for their SYN, leading them to retry --
      ;; probably successfully, but with a large latency.
      (listen socket 1024)
      (set-nonblocking! socket)
      (sigaction SIGPIPE SIG_IGN)
      (spawn-fiber (lambda () (socket-loop socket handler))))))