aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator/utils/fibers.scm
blob: 99941adfa0302ccbb6668f34a782859d548400bd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
(define-module (guix-build-coordinator utils fibers)
  #:use-module (ice-9 match)
  #:use-module (ice-9 threads)
  #:use-module (fibers)
  #:use-module (fibers channels)
  #:use-module (fibers conditions)
  #:export (make-worker-thread-channel
            call-with-worker-thread

            call-with-sigint

            run-server/patched))

(define %worker-thread-args
  (make-parameter #f))

(define* (make-worker-thread-channel initializer
                                     #:key (parallelism 1)
                                     (delay-logger (lambda _ #f))
                                     destructor
                                     lifetime)
  "Return a channel used to offload work to a dedicated thread.  ARGS are the
arguments of the worker thread procedure."
  (parameterize (((@@ (fibers internal) current-fiber) #f))
    (let ((channel (make-channel)))
      (for-each
       (lambda _
         (call-with-new-thread
          (lambda ()
            (let init ((args (initializer)))
              (parameterize ((%worker-thread-args args))
                (let loop ((current-lifetime lifetime))
                  (match (get-message channel)
                    (((? channel? reply) sent-time (? procedure? proc))
                     (let ((time-delay
                            (- (get-internal-real-time)
                               sent-time)))
                       (delay-logger (/ time-delay
                                        internal-time-units-per-second))
                       (put-message
                        reply
                        (with-exception-handler
                            (lambda (exn)
                              (cons 'worker-thread-error exn))
                          (lambda ()
                            (with-throw-handler #t
                              (lambda ()
                                (call-with-values
                                    (lambda ()
                                      (apply proc args))
                                  (lambda vals vals)))
                              (lambda (key . args)
                                (simple-format
                                 (current-error-port)
                                 "worker-thread: exception: ~A ~A\n" key args)
                                (backtrace))))
                          #:unwind? #t)))))
                  (if (number? current-lifetime)
                      (unless (< current-lifetime 0)
                        (loop (if current-lifetime
                                  (- current-lifetime 1)
                                  #f)))
                      (loop #f))))
              (when destructor
                (apply destructor args)
              (init (initializer))))))
       (iota parallelism))
      channel)))

(define (call-with-worker-thread channel proc)
  "Send PROC to the worker thread through CHANNEL.  Return the result of PROC.
If already in the worker thread, call PROC immediately."
  (let ((args (%worker-thread-args)))
    (if args
        (apply proc args)
        (let ((reply (make-channel)))
          (put-message channel (list reply (get-internal-real-time) proc))
          (match (get-message reply)
            (('worker-thread-error . exn)
             (raise-exception exn))
            (result
             (apply values result)))))))

;; Copied from (fibers web server)
(define (call-with-sigint thunk cvar)
  (let ((handler #f))
    (dynamic-wind
      (lambda ()
        (set! handler
          (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
      thunk
      (lambda ()
        (if handler
            ;; restore Scheme handler, SIG_IGN or SIG_DFL.
            (sigaction SIGINT (car handler) (cdr handler))
            ;; restore original C handler.
            (sigaction SIGINT #f))))))

;; This variant of run-server from the fibers library supports running
;; multiple servers within one process.
(define run-server/patched
  (let ((fibers-web-server-module
         (resolve-module '(fibers web server))))

    (define set-nonblocking!
      (module-ref fibers-web-server-module 'set-nonblocking!))

    (define make-default-socket
      (module-ref fibers-web-server-module 'make-default-socket))

    (define socket-loop
      (module-ref fibers-web-server-module 'socket-loop))

    (lambda* (handler
              #:key
              (host #f)
              (family AF_INET)
              (addr (if host
                        (inet-pton family host)
                        INADDR_LOOPBACK))
              (port 8080)
              (socket (make-default-socket family addr port)))
      ;; We use a large backlog by default.  If the server is suddenly hit
      ;; with a number of connections on a small backlog, clients won't
      ;; receive confirmation for their SYN, leading them to retry --
      ;; probably successfully, but with a large latency.
      (listen socket 1024)
      (set-nonblocking! socket)
      (sigaction SIGPIPE SIG_IGN)
      (spawn-fiber (lambda () (socket-loop socket handler))))))