diff options
-rw-r--r-- | Makefile.am | 3 | ||||
-rw-r--r-- | guix-build-coordinator/agent-messaging/http.scm | 3 | ||||
-rw-r--r-- | guix-build-coordinator/client-communication.scm | 2 | ||||
-rw-r--r-- | guix-build-coordinator/coordinator.scm | 1 | ||||
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 1 | ||||
-rw-r--r-- | guix-build-coordinator/utils.scm | 118 | ||||
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 115 |
7 files changed, 123 insertions, 120 deletions
diff --git a/Makefile.am b/Makefile.am index 72cc2d7..f5a8150 100644 --- a/Makefile.am +++ b/Makefile.am @@ -18,7 +18,8 @@ SOURCES = \ guix-build-coordinator/datastore/sqlite.scm \ guix-build-coordinator/guix-data-service.scm \ guix-build-coordinator/hooks.scm \ - guix-build-coordinator/utils.scm + guix-build-coordinator/utils.scm \ + guix-build-coordinator/utils/fibers.scm install-data-local: mkdir -p "$(DESTDIR)$(pkgdatadir)" || exit 1; diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm index e93a909..1f0f1a9 100644 --- a/guix-build-coordinator/agent-messaging/http.scm +++ b/guix-build-coordinator/agent-messaging/http.scm @@ -28,7 +28,6 @@ #:use-module (ice-9 textual-ports) #:use-module (ice-9 binary-ports) #:use-module (system repl error-handling) - #:use-module (fibers web server) #:use-module (rnrs bytevectors) #:use-module (json) #:use-module (web http) @@ -36,7 +35,6 @@ #:use-module (web request) #:use-module (web response) #:use-module (web uri) - #:use-module (fibers channels) #:use-module (lzlib) #:use-module (prometheus) #:use-module (guix store) @@ -44,6 +42,7 @@ #:use-module (guix serialization) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator coordinator) #:export (http-agent-messaging-start-server diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm index a53ce3a..902b3e9 100644 --- a/guix-build-coordinator/client-communication.scm +++ b/guix-build-coordinator/client-communication.scm @@ -29,10 +29,10 @@ #:use-module (web client) #:use-module (web request) #:use-module (web response) - #:use-module (fibers web server) #:use-module (system repl error-handling) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator coordinator) #:export (start-client-request-server diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 3a3d76f..308aa18 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -36,6 +36,7 @@ #:use-module (guix derivations) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator build-allocator) diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index 79877ed..03eea76 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -7,6 +7,7 @@ #:use-module (prometheus) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator datastore abstract) #:export (sqlite-datastore diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index e9b7ad8..ad0df90 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -22,9 +22,6 @@ #:use-module (gcrypt pk-crypto) #:use-module (gcrypt hash) #:use-module (gcrypt random) - #:use-module (fibers) - #:use-module (fibers channels) - #:use-module (fibers conditions) #:use-module (json) #:use-module (guix pki) #:use-module (guix utils) @@ -33,10 +30,7 @@ #:use-module (guix status) #:use-module (guix base64) #:use-module (guix scripts substitute) - #:export (make-worker-thread-channel - call-with-worker-thread - - random-v4-uuid + #:export (random-v4-uuid make-base64-output-port call-with-streaming-http-request @@ -57,67 +51,7 @@ create-work-queue - with-timeout - - call-with-sigint - - run-server/patched)) - - -(define %worker-thread-args - (make-parameter #f)) - -(define* (make-worker-thread-channel initializer - #:key (parallelism 1)) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." - (parameterize (((@@ (fibers internal) current-fiber) #f)) - (let ((channel (make-channel))) - (for-each - (lambda _ - (let ((args (initializer))) - (call-with-new-thread - (lambda () - (parameterize ((%worker-thread-args args)) - (let loop () - (match (get-message channel) - (((? channel? reply) . (? procedure? proc)) - (put-message - reply - (with-exception-handler - (lambda (exn) - (cons 'worker-thread-error exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" exn) - (backtrace) - (raise-exception exn)) - (lambda () - (call-with-values - (lambda () - (apply proc args)) - (lambda vals vals))))) - #:unwind? #t)))) - (loop))))))) - (iota parallelism)) - channel))) - -(define (call-with-worker-thread channel proc) - "Send PROC to the worker thread through CHANNEL. Return the result of PROC. -If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) - (if args - (apply proc args) - (let ((reply (make-channel))) - (put-message channel (cons reply proc)) - (match (get-message reply) - (('worker-thread-error . exn) - (raise-exception exn)) - (result - (apply values result))))))) + with-timeout)) (define (random-v4-uuid) ;; https://tools.ietf.org/html/rfc4122#page-14 @@ -639,51 +573,3 @@ again." (alarm 0) (sigaction SIGALRM SIG_DFL) (apply values result))))) - -;; Copied from (fibers web server) -(define (call-with-sigint thunk cvar) - (let ((handler #f)) - (dynamic-wind - (lambda () - (set! handler - (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) - thunk - (lambda () - (if handler - ;; restore Scheme handler, SIG_IGN or SIG_DFL. - (sigaction SIGINT (car handler) (cdr handler)) - ;; restore original C handler. - (sigaction SIGINT #f)))))) - -;; This variant of run-server from the fibers library supports running -;; multiple servers within one process. -(define run-server/patched - (let ((fibers-web-server-module - (resolve-module '(fibers web server)))) - - (define set-nonblocking! - (module-ref fibers-web-server-module 'set-nonblocking!)) - - (define make-default-socket - (module-ref fibers-web-server-module 'make-default-socket)) - - (define socket-loop - (module-ref fibers-web-server-module 'socket-loop)) - - (lambda* (handler - #:key - (host #f) - (family AF_INET) - (addr (if host - (inet-pton family host) - INADDR_LOOPBACK)) - (port 8080) - (socket (make-default-socket family addr port))) - ;; We use a large backlog by default. If the server is suddenly hit - ;; with a number of connections on a small backlog, clients won't - ;; receive confirmation for their SYN, leading them to retry -- - ;; probably successfully, but with a large latency. - (listen socket 1024) - (set-nonblocking! socket) - (sigaction SIGPIPE SIG_IGN) - (spawn-fiber (lambda () (socket-loop socket handler)))))) diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm new file mode 100644 index 0000000..a39b3bb --- /dev/null +++ b/guix-build-coordinator/utils/fibers.scm @@ -0,0 +1,115 @@ +(define-module (guix-build-coordinator utils fibers) + #:use-module (ice-9 match) + #:use-module (ice-9 threads) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers conditions) + #:export (make-worker-thread-channel + call-with-worker-thread + + call-with-sigint + + run-server/patched)) + +(define %worker-thread-args + (make-parameter #f)) + +(define* (make-worker-thread-channel initializer + #:key (parallelism 1)) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the worker thread procedure." + (parameterize (((@@ (fibers internal) current-fiber) #f)) + (let ((channel (make-channel))) + (for-each + (lambda _ + (let ((args (initializer))) + (call-with-new-thread + (lambda () + (parameterize ((%worker-thread-args args)) + (let loop () + (match (get-message channel) + (((? channel? reply) . (? procedure? proc)) + (put-message + reply + (with-exception-handler + (lambda (exn) + (cons 'worker-thread-error exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" exn) + (backtrace) + (raise-exception exn)) + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals vals))))) + #:unwind? #t)))) + (loop))))))) + (iota parallelism)) + channel))) + +(define (call-with-worker-thread channel proc) + "Send PROC to the worker thread through CHANNEL. Return the result of PROC. +If already in the worker thread, call PROC immediately." + (let ((args (%worker-thread-args))) + (if args + (apply proc args) + (let ((reply (make-channel))) + (put-message channel (cons reply proc)) + (match (get-message reply) + (('worker-thread-error . exn) + (raise-exception exn)) + (result + (apply values result))))))) + +;; Copied from (fibers web server) +(define (call-with-sigint thunk cvar) + (let ((handler #f)) + (dynamic-wind + (lambda () + (set! handler + (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) + thunk + (lambda () + (if handler + ;; restore Scheme handler, SIG_IGN or SIG_DFL. + (sigaction SIGINT (car handler) (cdr handler)) + ;; restore original C handler. + (sigaction SIGINT #f)))))) + +;; This variant of run-server from the fibers library supports running +;; multiple servers within one process. +(define run-server/patched + (let ((fibers-web-server-module + (resolve-module '(fibers web server)))) + + (define set-nonblocking! + (module-ref fibers-web-server-module 'set-nonblocking!)) + + (define make-default-socket + (module-ref fibers-web-server-module 'make-default-socket)) + + (define socket-loop + (module-ref fibers-web-server-module 'socket-loop)) + + (lambda* (handler + #:key + (host #f) + (family AF_INET) + (addr (if host + (inet-pton family host) + INADDR_LOOPBACK)) + (port 8080) + (socket (make-default-socket family addr port))) + ;; We use a large backlog by default. If the server is suddenly hit + ;; with a number of connections on a small backlog, clients won't + ;; receive confirmation for their SYN, leading them to retry -- + ;; probably successfully, but with a large latency. + (listen socket 1024) + (set-nonblocking! socket) + (sigaction SIGPIPE SIG_IGN) + (spawn-fiber (lambda () (socket-loop socket handler)))))) |