diff options
author | Christopher Baines <mail@cbaines.net> | 2023-07-19 13:10:32 +0100 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2023-07-19 13:10:32 +0100 |
commit | a88d154823de3bb7ea8c62e84b1294dfba51ecec (patch) | |
tree | 325e1abc510259689e1d39856ffdcb4fd7c44c73 | |
parent | 3ce4613908bb4a42494323ef0597f6c3ae2dee24 (diff) | |
download | bffe-a88d154823de3bb7ea8c62e84b1294dfba51ecec.tar bffe-a88d154823de3bb7ea8c62e84b1294dfba51ecec.tar.gz |
Hack non-blocking support to most http requests
Make the sockets non blocking, and avoid using read-response-body. This commit
also moves the state channel in to a fiber.
-rw-r--r-- | bffe/server.scm | 214 |
1 files changed, 127 insertions, 87 deletions
diff --git a/bffe/server.scm b/bffe/server.scm index e929df9..112ae17 100644 --- a/bffe/server.scm +++ b/bffe/server.scm @@ -67,9 +67,72 @@ (lambda (scheduler port) (display "#<scheduler>" port))) +;; Returns the port as well as the raw socket +(define* (open-socket-for-uri* uri + #:key (verify-certificate? #t) + (non-blocking? #t)) + (define tls-wrap + (@@ (web client) tls-wrap)) + + (define https? + (eq? 'https (uri-scheme uri))) + + (define plain-uri + (if https? + (build-uri + 'http + #:userinfo (uri-userinfo uri) + #:host (uri-host uri) + #:port (or (uri-port uri) 443) + #:path (uri-path uri) + #:query (uri-query uri) + #:fragment (uri-fragment uri)) + uri)) + + (let ((s (open-socket-for-uri plain-uri))) + (values + (if https? + (tls-wrap s (uri-host uri) + #:verify-certificate? verify-certificate?) + s) + s))) + +(define* (http-get* uri + #:key + (method 'GET) + (body #f) + (verify-certificate? #t) + (port #f) + (version '(1 . 1)) + (keep-alive? #f) + (headers '()) + (decode-body? #t) + ;; Default to streaming? #t since read-response-body calls + ;; get-bytevector-all, which is implemented in C and + ;; therefore can't be suspended + (streaming? #t)) + (let ((port socket (open-socket-for-uri* + uri + #:verify-certificate? verify-certificate?))) + ;; Guile/guile-gnutls don't handle the handshake happening on a non + ;; blocking socket, so change the behavior here. + (let ((flags (fcntl socket F_GETFL))) + (fcntl socket F_SETFL (logior O_NONBLOCK flags))) + + (http-request uri + #:method method + #:body body + #:verify-certificate? verify-certificate? + #:port port + #:version version + #:keep-alive? keep-alive? + #:headers headers + #:decode-body? decode-body? + #:streaming? streaming?))) + (define (make-state-channel event-source) (let ((channel (make-channel))) - (call-with-new-thread + (spawn-fiber (lambda () (let loop ((last-fetch-time #f) (state #f)) @@ -91,12 +154,11 @@ (lambda () (let ((response body - (http-get + (http-get* (string->uri (string-append event-source "/state"))))) (let ((state - (json-string->scm - (utf8->string body)))) + (json->scm body))) (put-message reply-channel state) state))) @@ -232,7 +294,7 @@ (lambda () (let* ((response remote-port - (http-get + (http-get* (string->uri (string-append event-source "/events")) #:headers @@ -241,8 +303,7 @@ (- initial-state-id ;; Get some earlier events as well, as clients that ;; are reconnecting might want them - 100)))) - #:streaming? #t))) + 100))))))) (setvbuf remote-port 'none) (let loop ((line (get-line remote-port))) @@ -440,7 +501,7 @@ (('GET "build" uuid) (let ((response body - (http-get + (http-get* (string->uri (string-append event-source "/build/" uuid ;; ".json" @@ -450,17 +511,15 @@ mime-types) ((application/json) (render-json - (json-string->scm - (utf8->string body)))) + (json->scm body))) (else (render-html #:sxml (build title - (json-string->scm - (utf8->string body)))))))) + (json->scm body))))))) (('GET "agent" agent-id) (let ((response body - (http-get + (http-get* (string->uri (string-append event-source "/agent/" agent-id)) #:headers '((accept . ((application/json))))))) @@ -469,24 +528,21 @@ mime-types) ((application/json) (render-json - (json-string->scm - (utf8->string body)))) + (json->scm body))) (else (render-html #:sxml (agent title - (json-string->scm - (utf8->string body)))))))) + (json->scm body))))))) (('GET "agent" agent-id "build-allocation-plan") (let ((response body - (http-get + (http-get* (string->uri (string-append event-source "/agent/" agent-id "/build-allocation-plan")) #:headers '((accept . ((application/json))))))) (render-json - (json-string->scm - (utf8->string body))))) + (json->scm body)))) (('GET "assets" rest ...) (or (handle-static-assets (string-join rest "/") @@ -541,71 +597,55 @@ pid-file metrics-registry) - (define state-channel - (make-state-channel - event-source)) - - (call-with-error-handling - (lambda () - (let ((finished? (make-condition))) - (call-with-sigint + (let ((finished? (make-condition))) + (call-with-sigint + (lambda () + (run-fibers (lambda () - (run-fibers - (lambda () - (let ((initial-state-id - (retry-on-error - (lambda () - (assoc-ref - (get-state state-channel) - "state_id")) - #:times 6 - #:delay 10))) - (simple-format #t "Starting from state ~A\n" - initial-state-id) - - (let* ((events-channel - get-state-id - (make-events-channel - event-source - initial-state-id - #:metrics-registry metrics-registry)) - (controller - (apply make-controller assets-directory - metrics-registry - events-channel state-channel - event-source - controller-args))) - - ;; Wait until the events channel catches up - (while (< (get-state-id) initial-state-id) - (sleep 1)) - - (when pid-file - (call-with-output-file pid-file - (lambda (port) - (simple-format port "~A\n" (getpid))))) - - (simple-format #t "Starting the server\n") - (run-server/patched (lambda (request body) - (apply values - (handler request body controller))) - #:host host - #:port port))) - - (wait finished?)) - #:hz 10 - #:parallelism 4)) - finished?))) - #:on-error 'backtrace - #:post-error (lambda (key . args) - (when (eq? key 'system-error) - (match args - (("bind" "~A" ("Address already in use") _) - (simple-format - (current-error-port) - "\n -error: bffe could not start, as it could not bind to port ~A - -Check if it's already running, or whether another process is using that -port. Also, the port used can be changed by passing the --port option.\n" - port))))))) + (let* ((state-channel + (make-state-channel + event-source)) + (initial-state-id + (retry-on-error + (lambda () + (assoc-ref + (get-state state-channel) + "state_id")) + #:times 6 + #:delay 10))) + (simple-format #t "Starting from state ~A\n" + initial-state-id) + + (let* ((events-channel + get-state-id + (make-events-channel + event-source + initial-state-id + #:metrics-registry metrics-registry)) + (controller + (apply make-controller assets-directory + metrics-registry + events-channel state-channel + event-source + controller-args))) + + ;; Wait until the events channel catches up + (while (< (get-state-id) initial-state-id) + (sleep 1)) + + (when pid-file + (call-with-output-file pid-file + (lambda (port) + (simple-format port "~A\n" (getpid))))) + + (simple-format #t "Starting the server\n") + (run-server/patched (lambda (request body) + (apply values + (handler request body controller))) + #:host host + #:port port))) + + (wait finished?)) + #:hz 10 + #:parallelism 4)) + finished?))) |