From 60fac8121cc668a01f22a569059aa00d476db566 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 3 Apr 2023 13:49:03 +0100 Subject: Improve event/state id support for events Support the Last-Event-ID header in the events endpoint, and include the event id's in the responses. --- guix-build-coordinator/client-communication.scm | 49 ++++++++++++++++--------- guix-build-coordinator/coordinator.scm | 22 +++++++---- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm index 7e9fa6c..37fd322 100644 --- a/guix-build-coordinator/client-communication.scm +++ b/guix-build-coordinator/client-communication.scm @@ -554,24 +554,37 @@ (assq-ref agent-details 'uuid)))))))) (datastore-list-agents datastore)))))))))) (('GET "events") - (list (build-response - #:code 200 - #:headers '((content-type . (text/event-stream)))) - (lambda (port) - (build-coordinator-listen-for-events - build-coordinator - (lambda (event-name data) - (display - (simple-format #f "event: ~A\ndata: ~A\n\n" - event-name - (scm->json-string data)) - port) - (force-output port) - ;; TODO because the chunked output port doesn't call - ;; force-output on the underlying port, do that here. We - ;; want this event to be sent now, rather than when some - ;; buffer fills up. - (force-output (request-port request))))))) + (let ((headers (request-headers request))) + (list (build-response + #:code 200 + #:headers '((content-type . (text/event-stream)))) + (lambda (port) + (build-coordinator-listen-for-events + build-coordinator + (lambda (state-id event-name data) + (display + (simple-format #f "id: ~A\nevent: ~A\ndata: ~A\n\n" + state-id + event-name + (scm->json-string data)) + port) + (force-output port) + ;; TODO because the chunked output port doesn't call + ;; force-output on the underlying port, do that here. We + ;; want this event to be sent now, rather than when some + ;; buffer fills up. + (force-output (request-port request))) + #:after-state-id + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "error: when processing Last-Event-ID header value: ~A\n" + (assoc-ref headers 'last-event-id))) + (lambda () + (and=> (assoc-ref headers 'last-event-id) + string->number)) + #:unwind? #t)))))) (_ (render-json "not-found" diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index b022c76..b19d278 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -178,7 +178,7 @@ (when (not (= event-state-id index-state-id)) (error "listener behind")) - (callback event-name data)))) + (callback event-state-id event-name data)))) (map (lambda (i) (modulo i buffer-size)) (iota event-count-to-send @@ -226,10 +226,20 @@ (lambda () (while #t (match (get-message submission-channel) - (('new-listener callback after-state-id + (('new-listener callback requested-after-state-id listening-finished-channel) - (let ((listener-channel (make-channel))) + (let ((listener-channel (make-channel)) + (after-state-id + (match (atomic-box-ref + current-state-id-and-event-buffer-index-box) + ((current-state-id . event-buffer-index) + (if requested-after-state-id + (if (> requested-after-state-id + current-state-id) + current-state-id + requested-after-state-id) + current-state-id))))) (atomic-box-set! listener-channels-box (vhash-consq listener-channel @@ -237,11 +247,7 @@ (atomic-box-ref listener-channels-box))) (spawn-fiber-for-listener callback - (or after-state-id - (match (atomic-box-ref - current-state-id-and-event-buffer-index-box) - ((current-state-id . event-buffer-index) - current-state-id))) + after-state-id submission-channel listener-channel listening-finished-channel))) -- cgit v1.2.3