aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-07-19 13:10:32 +0100
committerChristopher Baines <mail@cbaines.net>2023-07-19 13:10:32 +0100
commita88d154823de3bb7ea8c62e84b1294dfba51ecec (patch)
tree325e1abc510259689e1d39856ffdcb4fd7c44c73
parent3ce4613908bb4a42494323ef0597f6c3ae2dee24 (diff)
downloadbffe-a88d154823de3bb7ea8c62e84b1294dfba51ecec.tar
bffe-a88d154823de3bb7ea8c62e84b1294dfba51ecec.tar.gz
Hack non-blocking support to most http requests
Make the sockets non blocking, and avoid using read-response-body. This commit also moves the state channel in to a fiber.
-rw-r--r--bffe/server.scm214
1 files changed, 127 insertions, 87 deletions
diff --git a/bffe/server.scm b/bffe/server.scm
index e929df9..112ae17 100644
--- a/bffe/server.scm
+++ b/bffe/server.scm
@@ -67,9 +67,72 @@
(lambda (scheduler port)
(display "#<scheduler>" port)))
+;; Returns the port as well as the raw socket
+(define* (open-socket-for-uri* uri
+ #:key (verify-certificate? #t)
+ (non-blocking? #t))
+ (define tls-wrap
+ (@@ (web client) tls-wrap))
+
+ (define https?
+ (eq? 'https (uri-scheme uri)))
+
+ (define plain-uri
+ (if https?
+ (build-uri
+ 'http
+ #:userinfo (uri-userinfo uri)
+ #:host (uri-host uri)
+ #:port (or (uri-port uri) 443)
+ #:path (uri-path uri)
+ #:query (uri-query uri)
+ #:fragment (uri-fragment uri))
+ uri))
+
+ (let ((s (open-socket-for-uri plain-uri)))
+ (values
+ (if https?
+ (tls-wrap s (uri-host uri)
+ #:verify-certificate? verify-certificate?)
+ s)
+ s)))
+
+(define* (http-get* uri
+ #:key
+ (method 'GET)
+ (body #f)
+ (verify-certificate? #t)
+ (port #f)
+ (version '(1 . 1))
+ (keep-alive? #f)
+ (headers '())
+ (decode-body? #t)
+ ;; Default to streaming? #t since read-response-body calls
+ ;; get-bytevector-all, which is implemented in C and
+ ;; therefore can't be suspended
+ (streaming? #t))
+ (let ((port socket (open-socket-for-uri*
+ uri
+ #:verify-certificate? verify-certificate?)))
+ ;; Guile/guile-gnutls don't handle the handshake happening on a non
+ ;; blocking socket, so change the behavior here.
+ (let ((flags (fcntl socket F_GETFL)))
+ (fcntl socket F_SETFL (logior O_NONBLOCK flags)))
+
+ (http-request uri
+ #:method method
+ #:body body
+ #:verify-certificate? verify-certificate?
+ #:port port
+ #:version version
+ #:keep-alive? keep-alive?
+ #:headers headers
+ #:decode-body? decode-body?
+ #:streaming? streaming?)))
+
(define (make-state-channel event-source)
(let ((channel (make-channel)))
- (call-with-new-thread
+ (spawn-fiber
(lambda ()
(let loop ((last-fetch-time #f)
(state #f))
@@ -91,12 +154,11 @@
(lambda ()
(let ((response
body
- (http-get
+ (http-get*
(string->uri
(string-append event-source "/state")))))
(let ((state
- (json-string->scm
- (utf8->string body))))
+ (json->scm body)))
(put-message reply-channel
state)
state)))
@@ -232,7 +294,7 @@
(lambda ()
(let* ((response
remote-port
- (http-get
+ (http-get*
(string->uri
(string-append event-source "/events"))
#:headers
@@ -241,8 +303,7 @@
(- initial-state-id
;; Get some earlier events as well, as clients that
;; are reconnecting might want them
- 100))))
- #:streaming? #t)))
+ 100)))))))
(setvbuf remote-port 'none)
(let loop ((line (get-line remote-port)))
@@ -440,7 +501,7 @@
(('GET "build" uuid)
(let ((response
body
- (http-get
+ (http-get*
(string->uri
(string-append event-source "/build/" uuid
;; ".json"
@@ -450,17 +511,15 @@
mime-types)
((application/json)
(render-json
- (json-string->scm
- (utf8->string body))))
+ (json->scm body)))
(else
(render-html
#:sxml (build title
- (json-string->scm
- (utf8->string body))))))))
+ (json->scm body)))))))
(('GET "agent" agent-id)
(let ((response
body
- (http-get
+ (http-get*
(string->uri
(string-append event-source "/agent/" agent-id))
#:headers '((accept . ((application/json)))))))
@@ -469,24 +528,21 @@
mime-types)
((application/json)
(render-json
- (json-string->scm
- (utf8->string body))))
+ (json->scm body)))
(else
(render-html
#:sxml (agent title
- (json-string->scm
- (utf8->string body))))))))
+ (json->scm body)))))))
(('GET "agent" agent-id "build-allocation-plan")
(let ((response
body
- (http-get
+ (http-get*
(string->uri
(string-append event-source "/agent/" agent-id
"/build-allocation-plan"))
#:headers '((accept . ((application/json)))))))
(render-json
- (json-string->scm
- (utf8->string body)))))
+ (json->scm body))))
(('GET "assets" rest ...)
(or (handle-static-assets (string-join rest "/")
@@ -541,71 +597,55 @@
pid-file
metrics-registry)
- (define state-channel
- (make-state-channel
- event-source))
-
- (call-with-error-handling
- (lambda ()
- (let ((finished? (make-condition)))
- (call-with-sigint
+ (let ((finished? (make-condition)))
+ (call-with-sigint
+ (lambda ()
+ (run-fibers
(lambda ()
- (run-fibers
- (lambda ()
- (let ((initial-state-id
- (retry-on-error
- (lambda ()
- (assoc-ref
- (get-state state-channel)
- "state_id"))
- #:times 6
- #:delay 10)))
- (simple-format #t "Starting from state ~A\n"
- initial-state-id)
-
- (let* ((events-channel
- get-state-id
- (make-events-channel
- event-source
- initial-state-id
- #:metrics-registry metrics-registry))
- (controller
- (apply make-controller assets-directory
- metrics-registry
- events-channel state-channel
- event-source
- controller-args)))
-
- ;; Wait until the events channel catches up
- (while (< (get-state-id) initial-state-id)
- (sleep 1))
-
- (when pid-file
- (call-with-output-file pid-file
- (lambda (port)
- (simple-format port "~A\n" (getpid)))))
-
- (simple-format #t "Starting the server\n")
- (run-server/patched (lambda (request body)
- (apply values
- (handler request body controller)))
- #:host host
- #:port port)))
-
- (wait finished?))
- #:hz 10
- #:parallelism 4))
- finished?)))
- #:on-error 'backtrace
- #:post-error (lambda (key . args)
- (when (eq? key 'system-error)
- (match args
- (("bind" "~A" ("Address already in use") _)
- (simple-format
- (current-error-port)
- "\n
-error: bffe could not start, as it could not bind to port ~A
-
-Check if it's already running, or whether another process is using that
-port. Also, the port used can be changed by passing the --port option.\n"
- port)))))))
+ (let* ((state-channel
+ (make-state-channel
+ event-source))
+ (initial-state-id
+ (retry-on-error
+ (lambda ()
+ (assoc-ref
+ (get-state state-channel)
+ "state_id"))
+ #:times 6
+ #:delay 10)))
+ (simple-format #t "Starting from state ~A\n"
+ initial-state-id)
+
+ (let* ((events-channel
+ get-state-id
+ (make-events-channel
+ event-source
+ initial-state-id
+ #:metrics-registry metrics-registry))
+ (controller
+ (apply make-controller assets-directory
+ metrics-registry
+ events-channel state-channel
+ event-source
+ controller-args)))
+
+ ;; Wait until the events channel catches up
+ (while (< (get-state-id) initial-state-id)
+ (sleep 1))
+
+ (when pid-file
+ (call-with-output-file pid-file
+ (lambda (port)
+ (simple-format port "~A\n" (getpid)))))
+
+ (simple-format #t "Starting the server\n")
+ (run-server/patched (lambda (request body)
+ (apply values
+ (handler request body controller)))
+ #:host host
+ #:port port)))
+
+ (wait finished?))
+ #:hz 10
+ #:parallelism 4))
+ finished?)))