From 00e2e64337dd6deb62f05b409391cf69ab15fe41 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 7 Oct 2020 17:22:56 +0100 Subject: Split the fibers utils from the main utils module To start making it possible to use the agent, without having to load anything related to fibers (as it doesn't work on the hurd yet). --- Makefile.am | 3 +- guix-build-coordinator/agent-messaging/http.scm | 3 +- guix-build-coordinator/client-communication.scm | 2 +- guix-build-coordinator/coordinator.scm | 1 + guix-build-coordinator/datastore/sqlite.scm | 1 + guix-build-coordinator/utils.scm | 118 +----------------------- guix-build-coordinator/utils/fibers.scm | 115 +++++++++++++++++++++++ 7 files changed, 123 insertions(+), 120 deletions(-) create mode 100644 guix-build-coordinator/utils/fibers.scm diff --git a/Makefile.am b/Makefile.am index 72cc2d7..f5a8150 100644 --- a/Makefile.am +++ b/Makefile.am @@ -18,7 +18,8 @@ SOURCES = \ guix-build-coordinator/datastore/sqlite.scm \ guix-build-coordinator/guix-data-service.scm \ guix-build-coordinator/hooks.scm \ - guix-build-coordinator/utils.scm + guix-build-coordinator/utils.scm \ + guix-build-coordinator/utils/fibers.scm install-data-local: mkdir -p "$(DESTDIR)$(pkgdatadir)" || exit 1; diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm index e93a909..1f0f1a9 100644 --- a/guix-build-coordinator/agent-messaging/http.scm +++ b/guix-build-coordinator/agent-messaging/http.scm @@ -28,7 +28,6 @@ #:use-module (ice-9 textual-ports) #:use-module (ice-9 binary-ports) #:use-module (system repl error-handling) - #:use-module (fibers web server) #:use-module (rnrs bytevectors) #:use-module (json) #:use-module (web http) @@ -36,7 +35,6 @@ #:use-module (web request) #:use-module (web response) #:use-module (web uri) - #:use-module (fibers channels) #:use-module (lzlib) #:use-module (prometheus) #:use-module (guix store) @@ -44,6 +42,7 @@ #:use-module (guix serialization) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator coordinator) #:export (http-agent-messaging-start-server diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm index a53ce3a..902b3e9 100644 --- a/guix-build-coordinator/client-communication.scm +++ b/guix-build-coordinator/client-communication.scm @@ -29,10 +29,10 @@ #:use-module (web client) #:use-module (web request) #:use-module (web response) - #:use-module (fibers web server) #:use-module (system repl error-handling) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator coordinator) #:export (start-client-request-server diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 3a3d76f..308aa18 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -36,6 +36,7 @@ #:use-module (guix derivations) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator datastore) #:use-module (guix-build-coordinator build-allocator) diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index 79877ed..03eea76 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -7,6 +7,7 @@ #:use-module (prometheus) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator datastore abstract) #:export (sqlite-datastore diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index e9b7ad8..ad0df90 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -22,9 +22,6 @@ #:use-module (gcrypt pk-crypto) #:use-module (gcrypt hash) #:use-module (gcrypt random) - #:use-module (fibers) - #:use-module (fibers channels) - #:use-module (fibers conditions) #:use-module (json) #:use-module (guix pki) #:use-module (guix utils) @@ -33,10 +30,7 @@ #:use-module (guix status) #:use-module (guix base64) #:use-module (guix scripts substitute) - #:export (make-worker-thread-channel - call-with-worker-thread - - random-v4-uuid + #:export (random-v4-uuid make-base64-output-port call-with-streaming-http-request @@ -57,67 +51,7 @@ create-work-queue - with-timeout - - call-with-sigint - - run-server/patched)) - - -(define %worker-thread-args - (make-parameter #f)) - -(define* (make-worker-thread-channel initializer - #:key (parallelism 1)) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." - (parameterize (((@@ (fibers internal) current-fiber) #f)) - (let ((channel (make-channel))) - (for-each - (lambda _ - (let ((args (initializer))) - (call-with-new-thread - (lambda () - (parameterize ((%worker-thread-args args)) - (let loop () - (match (get-message channel) - (((? channel? reply) . (? procedure? proc)) - (put-message - reply - (with-exception-handler - (lambda (exn) - (cons 'worker-thread-error exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" exn) - (backtrace) - (raise-exception exn)) - (lambda () - (call-with-values - (lambda () - (apply proc args)) - (lambda vals vals))))) - #:unwind? #t)))) - (loop))))))) - (iota parallelism)) - channel))) - -(define (call-with-worker-thread channel proc) - "Send PROC to the worker thread through CHANNEL. Return the result of PROC. -If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) - (if args - (apply proc args) - (let ((reply (make-channel))) - (put-message channel (cons reply proc)) - (match (get-message reply) - (('worker-thread-error . exn) - (raise-exception exn)) - (result - (apply values result))))))) + with-timeout)) (define (random-v4-uuid) ;; https://tools.ietf.org/html/rfc4122#page-14 @@ -639,51 +573,3 @@ again." (alarm 0) (sigaction SIGALRM SIG_DFL) (apply values result))))) - -;; Copied from (fibers web server) -(define (call-with-sigint thunk cvar) - (let ((handler #f)) - (dynamic-wind - (lambda () - (set! handler - (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) - thunk - (lambda () - (if handler - ;; restore Scheme handler, SIG_IGN or SIG_DFL. - (sigaction SIGINT (car handler) (cdr handler)) - ;; restore original C handler. - (sigaction SIGINT #f)))))) - -;; This variant of run-server from the fibers library supports running -;; multiple servers within one process. -(define run-server/patched - (let ((fibers-web-server-module - (resolve-module '(fibers web server)))) - - (define set-nonblocking! - (module-ref fibers-web-server-module 'set-nonblocking!)) - - (define make-default-socket - (module-ref fibers-web-server-module 'make-default-socket)) - - (define socket-loop - (module-ref fibers-web-server-module 'socket-loop)) - - (lambda* (handler - #:key - (host #f) - (family AF_INET) - (addr (if host - (inet-pton family host) - INADDR_LOOPBACK)) - (port 8080) - (socket (make-default-socket family addr port))) - ;; We use a large backlog by default. If the server is suddenly hit - ;; with a number of connections on a small backlog, clients won't - ;; receive confirmation for their SYN, leading them to retry -- - ;; probably successfully, but with a large latency. - (listen socket 1024) - (set-nonblocking! socket) - (sigaction SIGPIPE SIG_IGN) - (spawn-fiber (lambda () (socket-loop socket handler)))))) diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm new file mode 100644 index 0000000..a39b3bb --- /dev/null +++ b/guix-build-coordinator/utils/fibers.scm @@ -0,0 +1,115 @@ +(define-module (guix-build-coordinator utils fibers) + #:use-module (ice-9 match) + #:use-module (ice-9 threads) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers conditions) + #:export (make-worker-thread-channel + call-with-worker-thread + + call-with-sigint + + run-server/patched)) + +(define %worker-thread-args + (make-parameter #f)) + +(define* (make-worker-thread-channel initializer + #:key (parallelism 1)) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the worker thread procedure." + (parameterize (((@@ (fibers internal) current-fiber) #f)) + (let ((channel (make-channel))) + (for-each + (lambda _ + (let ((args (initializer))) + (call-with-new-thread + (lambda () + (parameterize ((%worker-thread-args args)) + (let loop () + (match (get-message channel) + (((? channel? reply) . (? procedure? proc)) + (put-message + reply + (with-exception-handler + (lambda (exn) + (cons 'worker-thread-error exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" exn) + (backtrace) + (raise-exception exn)) + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals vals))))) + #:unwind? #t)))) + (loop))))))) + (iota parallelism)) + channel))) + +(define (call-with-worker-thread channel proc) + "Send PROC to the worker thread through CHANNEL. Return the result of PROC. +If already in the worker thread, call PROC immediately." + (let ((args (%worker-thread-args))) + (if args + (apply proc args) + (let ((reply (make-channel))) + (put-message channel (cons reply proc)) + (match (get-message reply) + (('worker-thread-error . exn) + (raise-exception exn)) + (result + (apply values result))))))) + +;; Copied from (fibers web server) +(define (call-with-sigint thunk cvar) + (let ((handler #f)) + (dynamic-wind + (lambda () + (set! handler + (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) + thunk + (lambda () + (if handler + ;; restore Scheme handler, SIG_IGN or SIG_DFL. + (sigaction SIGINT (car handler) (cdr handler)) + ;; restore original C handler. + (sigaction SIGINT #f)))))) + +;; This variant of run-server from the fibers library supports running +;; multiple servers within one process. +(define run-server/patched + (let ((fibers-web-server-module + (resolve-module '(fibers web server)))) + + (define set-nonblocking! + (module-ref fibers-web-server-module 'set-nonblocking!)) + + (define make-default-socket + (module-ref fibers-web-server-module 'make-default-socket)) + + (define socket-loop + (module-ref fibers-web-server-module 'socket-loop)) + + (lambda* (handler + #:key + (host #f) + (family AF_INET) + (addr (if host + (inet-pton family host) + INADDR_LOOPBACK)) + (port 8080) + (socket (make-default-socket family addr port))) + ;; We use a large backlog by default. If the server is suddenly hit + ;; with a number of connections on a small backlog, clients won't + ;; receive confirmation for their SYN, leading them to retry -- + ;; probably successfully, but with a large latency. + (listen socket 1024) + (set-nonblocking! socket) + (sigaction SIGPIPE SIG_IGN) + (spawn-fiber (lambda () (socket-loop socket handler)))))) -- cgit v1.2.3