aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-04-03 13:49:03 +0100
committerChristopher Baines <mail@cbaines.net>2023-04-03 13:49:03 +0100
commit60fac8121cc668a01f22a569059aa00d476db566 (patch)
treefa52a18d67a0d97345c4918796f63674b522a97d
parent9fc74074e6e81aa6c9acd368bc4a6ee5eb9b45d2 (diff)
downloadbuild-coordinator-60fac8121cc668a01f22a569059aa00d476db566.tar
build-coordinator-60fac8121cc668a01f22a569059aa00d476db566.tar.gz
Improve event/state id support for events
Support the Last-Event-ID header in the events endpoint, and include the event id's in the responses.
-rw-r--r--guix-build-coordinator/client-communication.scm49
-rw-r--r--guix-build-coordinator/coordinator.scm22
2 files changed, 45 insertions, 26 deletions
diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm
index 7e9fa6c..37fd322 100644
--- a/guix-build-coordinator/client-communication.scm
+++ b/guix-build-coordinator/client-communication.scm
@@ -554,24 +554,37 @@
(assq-ref agent-details 'uuid))))))))
(datastore-list-agents datastore))))))))))
(('GET "events")
- (list (build-response
- #:code 200
- #:headers '((content-type . (text/event-stream))))
- (lambda (port)
- (build-coordinator-listen-for-events
- build-coordinator
- (lambda (event-name data)
- (display
- (simple-format #f "event: ~A\ndata: ~A\n\n"
- event-name
- (scm->json-string data))
- port)
- (force-output port)
- ;; TODO because the chunked output port doesn't call
- ;; force-output on the underlying port, do that here. We
- ;; want this event to be sent now, rather than when some
- ;; buffer fills up.
- (force-output (request-port request)))))))
+ (let ((headers (request-headers request)))
+ (list (build-response
+ #:code 200
+ #:headers '((content-type . (text/event-stream))))
+ (lambda (port)
+ (build-coordinator-listen-for-events
+ build-coordinator
+ (lambda (state-id event-name data)
+ (display
+ (simple-format #f "id: ~A\nevent: ~A\ndata: ~A\n\n"
+ state-id
+ event-name
+ (scm->json-string data))
+ port)
+ (force-output port)
+ ;; TODO because the chunked output port doesn't call
+ ;; force-output on the underlying port, do that here. We
+ ;; want this event to be sent now, rather than when some
+ ;; buffer fills up.
+ (force-output (request-port request)))
+ #:after-state-id
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "error: when processing Last-Event-ID header value: ~A\n"
+ (assoc-ref headers 'last-event-id)))
+ (lambda ()
+ (and=> (assoc-ref headers 'last-event-id)
+ string->number))
+ #:unwind? #t))))))
(_
(render-json
"not-found"
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index b022c76..b19d278 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -178,7 +178,7 @@
(when (not (= event-state-id index-state-id))
(error "listener behind"))
- (callback event-name data))))
+ (callback event-state-id event-name data))))
(map (lambda (i)
(modulo i buffer-size))
(iota event-count-to-send
@@ -226,10 +226,20 @@
(lambda ()
(while #t
(match (get-message submission-channel)
- (('new-listener callback after-state-id
+ (('new-listener callback requested-after-state-id
listening-finished-channel)
- (let ((listener-channel (make-channel)))
+ (let ((listener-channel (make-channel))
+ (after-state-id
+ (match (atomic-box-ref
+ current-state-id-and-event-buffer-index-box)
+ ((current-state-id . event-buffer-index)
+ (if requested-after-state-id
+ (if (> requested-after-state-id
+ current-state-id)
+ current-state-id
+ requested-after-state-id)
+ current-state-id)))))
(atomic-box-set!
listener-channels-box
(vhash-consq listener-channel
@@ -237,11 +247,7 @@
(atomic-box-ref listener-channels-box)))
(spawn-fiber-for-listener callback
- (or after-state-id
- (match (atomic-box-ref
- current-state-id-and-event-buffer-index-box)
- ((current-state-id . event-buffer-index)
- current-state-id)))
+ after-state-id
submission-channel
listener-channel
listening-finished-channel)))