From 768c5e6d56569eaee46e98b41f5e28f43e424b64 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 19 May 2023 13:47:21 +0100 Subject: Change listing builds to work as a stream Both in terms of getting the data from the database, and sending it to the client. This avoids the use of the after-id and ordering by id when listing builds, which makes listing builds faster. It does mean that the database reads may last for a while (which can be a problem), but maybe that can be addressed in other ways. --- guix-build-coordinator/client-communication.scm | 228 +++++++++++++----------- guix-build-coordinator/datastore.scm | 2 +- guix-build-coordinator/datastore/sqlite.scm | 101 ++++++----- scripts/guix-build-coordinator.in | 144 ++++++--------- 4 files changed, 236 insertions(+), 239 deletions(-) diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm index 67d8f65..2df9b32 100644 --- a/guix-build-coordinator/client-communication.scm +++ b/guix-build-coordinator/client-communication.scm @@ -25,6 +25,7 @@ #:use-module (srfi srfi-43) #:use-module (ice-9 format) #:use-module (ice-9 match) + #:use-module (ice-9 streams) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (json) @@ -327,105 +328,109 @@ `((error . 404)) #:code 404)))) (('GET "builds") - (let ((query-parameters (request-query-parameters request))) - (render-json - `((builds - . ,(list->vector - (map - (lambda (build-details) - `(,@(alist-delete - 'created-at - (alist-delete - 'end-time - build-details)) - (created-at . ,(or - (and=> - (assq-ref build-details 'created-at) - (lambda (time) - (strftime "%F %T" time))) - 'null)) - (end-time . ,(or (and=> - (assq-ref build-details 'end-time) + (let* ((query-parameters (request-query-parameters request)) + (fold-builds-args + (list + #:tags + (filter-map (match-lambda + ((key . value) + (if (eq? key 'tag) + (match (string-split value #\:) + ((tag-key tag-value) + (cons tag-key tag-value)) + ((tag-key) tag-key)) + #f))) + query-parameters) + #:not-tags + (filter-map (match-lambda + ((key . value) + (if (eq? key 'not_tag) + (match (string-split value #\:) + ((tag-key tag-value) + (cons tag-key tag-value)) + ((tag_key) tag_key)) + #f))) + query-parameters) + #:systems + (filter-map (match-lambda + ((key . value) + (if (eq? key 'system) + value + #f))) + query-parameters) + #:not-systems + (filter-map (match-lambda + ((key . value) + (if (eq? key 'not-system) + value + #f))) + query-parameters) + #:processed + (match (assq 'processed query-parameters) + ((_ . val) + (string=? val "true")) + (#f 'unset)) + #:canceled + (match (assq 'canceled query-parameters) + ((_ . val) + (string=? val "true")) + (#f 'unset)) + #:priority-> + (or (and=> (assq-ref query-parameters 'priority_gt) + string->number) + 'unset) + #:priority-< + (or (and=> (assq-ref query-parameters 'priority_lt) + string->number) + 'unset) + #:relationship + (or (and=> (assq-ref query-parameters 'relationship) + string->symbol) + 'unset) + #:after-id + (assq-ref query-parameters 'after_id) + #:limit + (and=> (assq-ref query-parameters 'limit) + string->number)))) + + (list + (build-response + #:code 200 + #:headers '((content-type . (application/json-seq)))) + (lambda (port) + (apply datastore-fold-builds + datastore + (lambda (build-details _) + (scm->json-seq + `((,@(alist-delete + 'created-at + (alist-delete + 'end-time + build-details)) + (created-at . ,(or + (and=> + (assq-ref build-details 'created-at) (lambda (time) (strftime "%F %T" time))) 'null)) - (tags . ,(vector-map - (lambda (_ tag) - (match tag - ((key . value) - `((key . ,key) - (value . ,value))))) - (datastore-fetch-build-tags - datastore - (assq-ref build-details 'uuid)))))) - (datastore-list-builds - datastore - #:tags - (filter-map (match-lambda - ((key . value) - (if (eq? key 'tag) - (match (string-split value #\:) - ((tag-key tag-value) - (cons tag-key tag-value)) - ((tag-key) tag-key)) - #f))) - query-parameters) - #:not-tags - (filter-map (match-lambda - ((key . value) - (if (eq? key 'not_tag) - (match (string-split value #\:) - ((tag-key tag-value) - (cons tag-key tag-value)) - ((tag_key) tag_key)) - #f))) - query-parameters) - #:systems - (filter-map (match-lambda - ((key . value) - (if (eq? key 'system) - value - #f))) - query-parameters) - #:not-systems - (filter-map (match-lambda - ((key . value) - (if (eq? key 'not-system) - value - #f))) - query-parameters) - #:processed - (match (assq 'processed query-parameters) - ((_ . val) - (string=? val "true")) - (#f 'unset)) - #:canceled - (match (assq 'canceled query-parameters) - ((_ . val) - (string=? val "true")) - (#f 'unset)) - #:priority-> - (or (and=> (assq-ref query-parameters 'priority_gt) - string->number) - 'unset) - #:priority-< - (or (and=> (assq-ref query-parameters 'priority_lt) - string->number) - 'unset) - #:relationship - (or (and=> (assq-ref query-parameters 'relationship) - string->symbol) - 'unset) - #:after-id - (assq-ref query-parameters 'after_id) - #:limit - (or (and=> (assq-ref query-parameters 'limit) - (lambda (val) - ;; Don't allow a high limit, as that could - ;; cause the query to run for a long time - (min (string->number val) - 100))) - 100))))))))) + (end-time . ,(or (and=> + (assq-ref build-details 'end-time) + (lambda (time) + (strftime "%F %T" time))) + 'null)) + (tags . ,(vector-map + (lambda (_ tag) + (match tag + ((key . value) + `((key . ,key) + (value . ,value))))) + (datastore-fetch-build-tags + datastore + (assq-ref build-details 'uuid)))))) + port) + #t) + #t + fold-builds-args))))) (('POST "builds") (let ((derivation-file (assoc-ref body "derivation")) (substitute-urls @@ -653,7 +658,8 @@ (string-append coordinator-uri path)) #:method method #:body (and=> request-body scm->json-string) - #:decode-body? #f))) + #:decode-body? #f + #:streaming? #t))) (if (>= (response-code response) 400) (begin (simple-format @@ -665,8 +671,11 @@ (lambda () (if (equal? '(application/json (charset . "utf-8")) (response-content-type response)) - (json-string->scm (utf8->string body)) - (utf8->string body))) + (json-string->scm + (utf8->string + (read-response-body response))) + (utf8->string + (read-response-body response)))) (lambda (key . args) (simple-format (current-error-port) @@ -676,9 +685,22 @@ (raise-exception (make-exception-with-message body)))) - (values - (json-string->scm (utf8->string body)) - response)))) + + (begin + (set-port-encoding! body "UTF-8") + + (values + (if (equal? '(application/json-seq) + (response-content-type response)) + (json-seq->scm + body + ;; TODO I would like to use 'throw, but it always raises an + ;; exception, so this needs fixing upstream first + #:handle-truncate 'replace) + (json-string->scm + (utf8->string + (read-response-body response)))) + response))))) (define* (send-submit-build-request coordinator-uri @@ -760,7 +782,7 @@ (priority-< 'unset) (relationship 'unset) (after-id #f) - (limit 100)) + (limit #f)) (let ((query-parameters `(,@(if (null? tags) '() diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm index 4af90e0..3fec2b5 100644 --- a/guix-build-coordinator/datastore.scm +++ b/guix-build-coordinator/datastore.scm @@ -59,7 +59,7 @@ (re-export datastore-count-builds) (re-export datastore-for-each-build) (re-export datastore-find-build) -(re-export datastore-list-builds) +(re-export datastore-fold-builds) (re-export datastore-insert-build-tags) (re-export datastore-fetch-build-tags) (re-export datastore-find-build-result) diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index db9508b..824af59 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -38,7 +38,7 @@ datastore-count-builds datastore-for-each-build datastore-find-build - datastore-list-builds + datastore-fold-builds datastore-insert-build-tags datastore-fetch-build-tags datastore-find-build-result @@ -2329,11 +2329,13 @@ WHERE uuid = :uuid" #f)))) (#f #f)))))) -(define-method (datastore-list-builds +(define-method (datastore-fold-builds (datastore ) . rest) - (define* (list-builds #:key + (define* (fold-builds proc + initial + #:key (tags '()) (not-tags '()) (systems '()) @@ -2343,7 +2345,7 @@ WHERE uuid = :uuid" (priority-> 'unset) (priority-< 'unset) (after-id #f) - (limit 1000) + (limit #f) ;; other-builds-dependent or no-dependent-builds (relationship 'unset)) (call-with-worker-thread @@ -2514,60 +2516,65 @@ WHERE derivation_outputs.derivation_id = builds.derivation_id)")) (statement (sqlite-prepare db query #:cache? #t))) + (define row->alist + (match-lambda + (#(uuid derivation_name priority processed canceled + created_at end_time) + `((uuid . ,uuid) + (derivation-name . ,derivation_name) + (priority . ,priority) + (processed . ,(cond + ((= 0 processed) #f) + ((= 1 processed) #t) + (else + (error + "unknown processed value")))) + (canceled . ,(cond + ((= 0 canceled) #f) + ((= 1 canceled) #t) + (else + (error "unknown canceled value")))) + (created-at . ,(if (string? created_at) + (match (strptime "%F %T" + created_at) + ((parts . _) parts) + (#f + (error + (simple-format + #f + "error parsing created_at ~A (~A)" + created_at + uuid)))) + #f)) + (end-time . ,(if (string? end_time) + (match (strptime "%F %T" + end_time) + ((parts . _) parts) + (#f + (error + (simple-format + #f + "error parsing end_time ~A (~A)" + end_time + uuid)))) + #f)))))) + (when after-id (sqlite-bind-arguments statement #:after_id after-id)) (let ((result - (sqlite-map - (match-lambda - (#(uuid derivation_name priority processed canceled - created_at end_time) - `((uuid . ,uuid) - (derivation-name . ,derivation_name) - (priority . ,priority) - (processed . ,(cond - ((= 0 processed) #f) - ((= 1 processed) #t) - (else - (error - "unknown processed value")))) - (canceled . ,(cond - ((= 0 canceled) #f) - ((= 1 canceled) #t) - (else - (error "unknown canceled value")))) - (created-at . ,(if (string? created_at) - (match (strptime "%F %T" - created_at) - ((parts . _) parts) - (#f - (error - (simple-format - #f - "error parsing created_at ~A (~A)" - created_at - uuid)))) - #f)) - (end-time . ,(if (string? end_time) - (match (strptime "%F %T" - end_time) - ((parts . _) parts) - (#f - (error - (simple-format - #f - "error parsing end_time ~A (~A)" - end_time - uuid)))) - #f))))) + (sqlite-fold + (lambda (row result) + (proc (row->alist row) result)) + initial statement))) (sqlite-reset statement) result))))))) - (apply list-builds rest)) + (apply fold-builds rest)) (define-method (datastore-fetch-build-tags (datastore ) diff --git a/scripts/guix-build-coordinator.in b/scripts/guix-build-coordinator.in index daa0873..787542c 100644 --- a/scripts/guix-build-coordinator.in +++ b/scripts/guix-build-coordinator.in @@ -38,6 +38,7 @@ (srfi srfi-37) (srfi srfi-43) (ice-9 match) + (ice-9 streams) (ice-9 suspendable-ports) (web uri) (fibers) @@ -242,8 +243,7 @@ %common-build-filtering-options)) (define %builds-list-option-defaults - `(,@%common-build-filtering-option-defaults - (limit . 1000))) + `(,@%common-build-filtering-option-defaults)) (define %build-cancel-options (list (option '("tag") #t #f @@ -529,24 +529,23 @@ canceled?: ~A %client-option-defaults %builds-list-option-defaults) rest))) - (let loop ((after-id #f)) - (let ((response (request-builds-list - (assq-ref opts 'coordinator) - #:tags (assq-ref opts 'tags) - #:not-tags (assq-ref opts 'not-tags) - #:systems (assq-ref opts 'systems) - #:not-systems (assq-ref opts 'not-systems) - #:processed (assq-ref opts 'processed) - #:canceled (assq-ref opts 'canceled) - #:priority-> (assq-ref opts 'priority->) - #:priority-< (assq-ref opts 'priority-<) - #:relationship (assq-ref opts 'relationship) - #:after-id (or after-id (assq-ref opts 'after-id)) - #:limit (assq-ref opts 'limit)))) - (for-each - (lambda (build-details) - (simple-format (current-output-port) - "id: ~A + (let ((response (request-builds-list + (assq-ref opts 'coordinator) + #:tags (assq-ref opts 'tags) + #:not-tags (assq-ref opts 'not-tags) + #:systems (assq-ref opts 'systems) + #:not-systems (assq-ref opts 'not-systems) + #:processed (assq-ref opts 'processed) + #:canceled (assq-ref opts 'canceled) + #:priority-> (assq-ref opts 'priority->) + #:priority-< (assq-ref opts 'priority-<) + #:relationship (assq-ref opts 'relationship) + #:after-id (assq-ref opts 'after-id) + #:limit (assq-ref opts 'limit)))) + (stream-for-each + (lambda (build-details) + (simple-format (current-output-port) + "id: ~A derivation: ~A processed: ~A canceled: ~A @@ -554,31 +553,25 @@ priority: ~A tags: ~A \n" - (assoc-ref build-details "uuid") - (assoc-ref build-details "derivation-name") - (if (assoc-ref build-details "processed") - "true" - "false") - (if (assoc-ref build-details "canceled") - "true" - "false") - (assoc-ref build-details "priority") - (string-join - (map (lambda (tag) - (let ((key (assoc-ref tag "key")) - (val (assoc-ref tag "value"))) - (string-append " " key ": " val))) - (vector->list - (assoc-ref build-details "tags"))) - "\n"))) - (vector->list (assoc-ref response "builds"))) + (assoc-ref build-details "uuid") + (assoc-ref build-details "derivation-name") + (if (assoc-ref build-details "processed") + "true" + "false") + (if (assoc-ref build-details "canceled") + "true" + "false") + (assoc-ref build-details "priority") + (string-join + (map (lambda (tag) + (let ((key (assoc-ref tag "key")) + (val (assoc-ref tag "value"))) + (string-append " " key ": " val))) + (vector->list + (assoc-ref build-details "tags"))) + "\n"))) + response)))) - (unless (= (vector-length (assoc-ref response "builds")) 0) - (loop - (assoc-ref (vector-ref - (assoc-ref response "builds") - (- (vector-length (assoc-ref response "builds")) 1)) - "uuid"))))))) (("build" "show-blocking" rest ...) (let ((opts (parse-options (append %base-options %client-options @@ -608,8 +601,6 @@ tags: %build-cancel-option-defaults) rest))) (define (get-batch) - (define limit 1000) - (simple-format (current-error-port) "requesting matching builds\n") (force-output (current-error-port)) @@ -621,17 +612,12 @@ tags: #:not-systems (assq-ref opts 'not-systems) #:processed #f #:canceled #f - #:relationship 'no-dependent-builds - #:limit 1000)) - (received-builds - (vector-length (assoc-ref response "builds")))) + #:relationship 'no-dependent-builds))) - (fold - (lambda (build-details result) - (cons (assoc-ref build-details "uuid") - result)) - '() - (vector->list (assoc-ref response "builds"))))) + (stream-map + (lambda (build-details) + (assoc-ref build-details "uuid")) + response))) (match (assq-ref opts 'arguments) (#f @@ -689,39 +675,21 @@ tags: %build-update-priority-option-defaults) rest))) (define (find-matching-builds) - (define limit 1000) + (let* ((response (request-builds-list + (assq-ref opts 'coordinator) + #:tags (assq-ref opts 'tags) + #:not-tags (assq-ref opts 'not-tags) + #:systems (assq-ref opts 'systems) + #:not-systems (assq-ref opts 'not-systems) + #:processed #f + #:canceled #f + #:priority-> (assq-ref opts 'priority->) + #:priority-< (assq-ref opts 'priority-<)))) - (let loop ((after-id #f) - (result '())) - (let* ((response (request-builds-list - (assq-ref opts 'coordinator) - #:tags (assq-ref opts 'tags) - #:not-tags (assq-ref opts 'not-tags) - #:systems (assq-ref opts 'systems) - #:not-systems (assq-ref opts 'not-systems) - #:processed #f - #:canceled #f - #:priority-> (assq-ref opts 'priority->) - #:priority-< (assq-ref opts 'priority-<) - #:after-id after-id - #:limit 1000)) - (received-builds - (vector-length (assoc-ref response "builds"))) - (new-result - (fold - (lambda (build-details result) - (cons (assoc-ref build-details "uuid") - result)) - result - (vector->list (assoc-ref response "builds"))))) - (display "." (current-error-port)) - (force-output (current-error-port)) - (if (< received-builds limit) - new-result - (loop (assoc-ref (vector-ref (assoc-ref response "builds") - (- received-builds 1)) - "uuid") - new-result))))) + (stream-map + (lambda (build-details) + (assoc-ref build-details "uuid")) + response))) (match (assq-ref opts 'arguments) (#f -- cgit v1.2.3