diff options
36 files changed, 2172 insertions, 1721 deletions
diff --git a/.dir-locals.el b/.dir-locals.el index 60601df..1c73c7a 100644 --- a/.dir-locals.el +++ b/.dir-locals.el @@ -10,6 +10,6 @@ (eval put 'with-db-worker-thread 'scheme-indent-function 1) (eval put 'with-time-logging 'scheme-indent-function 1) (eval put 'with-timeout 'scheme-indent-function 1) - (eval put 'letpar& 'scheme-indent-function 1) + (eval put 'fibers-let 'scheme-indent-function 1) (eval . (put 'call-with-lzip-output-port 'scheme-indent-function 1)) (eval . (put 'with-store 'scheme-indent-function 1)))) diff --git a/Makefile.am b/Makefile.am index 5081695..8a9af41 100644 --- a/Makefile.am +++ b/Makefile.am @@ -10,7 +10,8 @@ MINIMALSOURCES = \ guix-build-coordinator/agent-messaging/http.scm \ guix-build-coordinator/agent.scm \ guix-build-coordinator/config.scm \ - guix-build-coordinator/utils.scm + guix-build-coordinator/utils.scm \ + guix-build-coordinator/utils/timeout.scm ALLSOURCES = \ $(MINIMALSOURCES) \ diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm index 0baa75b..33fb717 100644 --- a/guix-build-coordinator/agent-messaging/http.scm +++ b/guix-build-coordinator/agent-messaging/http.scm @@ -50,6 +50,7 @@ #:use-module (guix base64) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils timeout) #:use-module (guix-build-coordinator agent-messaging abstract) #:export (make-http-agent-interface @@ -73,10 +74,10 @@ password) (let* ((gnutls-ver (gnutls-version)) (guix-ver %guix-version)) - (simple-format (current-error-port) - "(gnutls version: ~A, guix version: ~A)\n" - gnutls-ver - guix-ver)) + (simple-format/safe (current-error-port) + "(gnutls version: ~A, guix version: ~A)\n" + gnutls-ver + guix-ver)) (make <http-agent-interface> #:coordinator-uri coordinator-uri @@ -204,7 +205,8 @@ response))))) (retry-on-error (lambda () - (with-port-timeouts make-request)) + (with-port-timeouts make-request + #:timeout 120)) #:times retry-times #:delay 10 #:no-retry agent-error-from-coordinator?)) @@ -420,10 +422,11 @@ (lambda (uploaded-bytes) (= uploaded-bytes file-size))) (retry-on-error (lambda () - (with-throw-handler #t - perform-upload - (lambda _ - (backtrace)))) + (with-exception-handler + (lambda (exn) + (backtrace) + (raise-exception exn)) + perform-upload)) #:times 100 #:delay 60 #:error-hook diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm index 2b325d7..fa227da 100644 --- a/guix-build-coordinator/agent-messaging/http/server.scm +++ b/guix-build-coordinator/agent-messaging/http/server.scm @@ -46,13 +46,15 @@ #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) + #:use-module (knots) + #:use-module (knots timeout) + #:use-module (knots web-server) + #:use-module (knots thread-pool) #:use-module (prometheus) #:use-module (guix base32) #:use-module (guix base64) #:use-module (guix progress) #:use-module (guix build utils) - #:use-module ((guix build syscalls) - #:select (set-thread-name)) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator datastore) @@ -145,18 +147,15 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (define process-metrics-updater (get-process-metrics-updater plain-metrics-registry)) - (define thread-metric - (make-gauge-metric - (build-coordinator-metrics-registry build-coordinator) - "guile_threads_total")) - (define datastore-metrics-updater (base-datastore-metrics-updater build-coordinator)) + (define (build-coordinator-metrics-updater) + (build-coordinator-update-metrics build-coordinator)) + (define (update-managed-metrics!) + (call-with-delay-logging build-coordinator-metrics-updater) (call-with-delay-logging gc-metrics-updater) - (metric-set thread-metric - (length (all-threads))) (call-with-delay-logging process-metrics-updater) (call-with-delay-logging datastore-metrics-updater)) @@ -167,8 +166,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." "exception when starting: ~A\n" exn) (primitive-exit 1)) (lambda () - (run-server/patched - (lambda (request body) + (run-knots-web-server + (lambda (request) (log-msg (build-coordinator-logger build-coordinator) 'INFO (format #f "~4a ~a\n" @@ -179,7 +178,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) - body secret-key-base build-coordinator output-hash-channel @@ -609,7 +607,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (define (controller request method-and-path-components - body secret-key-base build-coordinator output-hash-channel @@ -639,6 +636,10 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (define logger (build-coordinator-logger build-coordinator)) + ;; TODO Handle this in the controller + (define body + (read-request-body request)) + (define (controller-thunk) (match method-and-path-components (('GET "agent" uuid) @@ -707,14 +708,11 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (('POST "agent" uuid "fetch-builds") (if (authenticated? uuid request) (let* ((json-body (json-string->scm (utf8->string body))) - ;; count is deprecated, use target_count instead - (count (assoc-ref json-body "count")) (target-count (assoc-ref json-body "target_count")) (systems (assoc-ref json-body "systems")) (builds (fetch-builds build-coordinator uuid (vector->list systems) - target-count - count))) + target-count))) (render-json `((builds . ,(list->vector builds))))) (render-json @@ -1009,7 +1007,7 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (render-json `((error . chunked-input-ended-prematurely)) #:code 400)) - ((worker-thread-timeout-error? exn) + ((thread-pool-timeout-error? exn) (render-json `((error . ,(simple-format #f "~A" exn))) #:code 503)) @@ -1018,30 +1016,20 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." `((error . ,(simple-format #f "~A" exn))) #:code 500)))) (lambda () - (with-throw-handler #t - controller-thunk - (lambda (key . args) - (unless (and (eq? '%exception key) - (or (agent-error? (car args)) - (worker-thread-timeout-error? (car args)) - (chunked-input-ended-prematurely-error? (car args)))) + (with-exception-handler + (lambda (exn) + (unless (or (agent-error? exn) + (thread-pool-timeout-error? exn) + (chunked-input-ended-prematurely-error? exn)) (match method-and-path-components ((method path-components ...) (simple-format (current-error-port) - "error: when processing: /~A ~A\n ~A ~A\n" - method (string-join path-components "/") - key args))) - - (let* ((stack (make-stack #t 4)) - (backtrace - (call-with-output-string - (lambda (port) - (display "\nBacktrace:\n" port) - (display-backtrace stack port) - (newline port) - (newline port))))) - (display - backtrace - (current-error-port))))))) + "error: when processing: /~A ~A\n" + method (string-join path-components "/")))) + + (print-backtrace-and-exception/knots exn)) + + (raise-exception exn)) + controller-thunk)) #:unwind? #t)) diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index 7afd549..47a7c5f 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -48,6 +48,7 @@ #:use-module ((guix build syscalls) #:select (set-thread-name free-disk-space)) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils timeout) #:use-module (guix-build-coordinator agent-messaging) #:use-module (guix-build-coordinator agent-messaging abstract) #:export (run-agent)) @@ -80,7 +81,7 @@ max-1min-load-average timestamp-log-output?) (define lgr (make <logger>)) - (define port-log (make <port-log> + (define port-log (make <custom-port-log> #:port (current-output-port) #:formatter ;; In guile-lib v0.2.8 onwards, the formatter is @@ -99,13 +100,13 @@ (let ((directory (dirname metrics-file))) (or (file-exists? directory) (begin - (simple-format (current-error-port) - "skipping writing metrics as ~A does not exist\n" - directory) + (simple-format/safe (current-error-port) + "skipping writing metrics as ~A does not exist\n" + directory) #f))) (with-exception-handler (lambda (exn) - (simple-format + (simple-format/safe (current-error-port) "skipping writing metrics, encountered exception ~A\n" exn) @@ -381,9 +382,16 @@ list-jobs (create-work-queue current-max-builds (lambda (build) - (process-job build - perform-post-build-actions - uploads-updater)) + ;; This poorly handles cases where the + ;; connection to the daemon fails, or other + ;; errors occur + (retry-on-error + (lambda () + (process-job build + perform-post-build-actions + uploads-updater)) + #:times 3 + #:delay 10)) #:thread-start-delay (make-time time-duration @@ -399,48 +407,47 @@ 20) #:name "job"))) (define (display-info) - (display - (simple-format - #f "current threads: ~A current jobs: ~A\n~A\n" - (count-threads) - (+ (count-jobs) (count-post-build-jobs)) - (string-append - (string-join - (map (match-lambda - ((build-details) - (simple-format - #f " - ~A (derived priority: ~A) + (simple-format/safe + (current-error-port) + "current threads: ~A current jobs: ~A\n~A\n" + (count-threads) + (+ (count-jobs) (count-post-build-jobs)) + (string-append + (string-join + (map (match-lambda + ((build-details) + (simple-format + #f " - ~A (derived priority: ~A) ~A" - (assoc-ref build-details "uuid") - (assoc-ref build-details "derived_priority") - (or - (assoc-ref build-details "derivation_name") - (assoc-ref build-details "derivation-name"))))) - (list-jobs)) - "\n") - "\n" - (string-join - (map (match-lambda - ((build-details upload-progress _) - (simple-format - #f " - ~A (derived priority: ~A) + (assoc-ref build-details "uuid") + (assoc-ref build-details "derived_priority") + (or + (assoc-ref build-details "derivation_name") + (assoc-ref build-details "derivation-name"))))) + (list-jobs)) + "\n") + "\n" + (string-join + (map (match-lambda + ((build-details upload-progress _) + (simple-format + #f " - ~A (derived priority: ~A) ~A~A" - (assoc-ref build-details "uuid") - (assoc-ref build-details "derived_priority") - (or - (assoc-ref build-details "derivation_name") - (assoc-ref build-details "derivation-name")) - (if (upload-progress-file upload-progress) - (simple-format - #f " + (assoc-ref build-details "uuid") + (assoc-ref build-details "derived_priority") + (or + (assoc-ref build-details "derivation_name") + (assoc-ref build-details "derivation-name")) + (if (upload-progress-file upload-progress) + (simple-format/safe + #f " uploading ~A (~A/~A)" - (upload-progress-file upload-progress) - (upload-progress-bytes-sent upload-progress) - (upload-progress-total-bytes upload-progress)) - "")))) - (list-post-build-jobs)) - "\n"))) - (current-error-port))) + (upload-progress-file upload-progress) + (upload-progress-bytes-sent upload-progress) + (upload-progress-total-bytes upload-progress)) + "")))) + (list-post-build-jobs)) + "\n")))) (let ((details (submit-status coordinator-interface 'idle @@ -699,7 +706,15 @@ but the guix-daemon claims it's unavailable" #:timeout ,(* 60 60))) (let ((log-port (open-output-string))) - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (log-msg lgr 'ERROR + "exception when fetching missing paths: " + exn) + (display/safe (get-output-string log-port)) + (display/safe "\n") + (close-output-port log-port) + (raise-exception exn)) (lambda () (with-port-timeouts (lambda () @@ -707,14 +722,7 @@ but the guix-daemon claims it's unavailable" ((current-build-output-port log-port)) (build-things fetch-substitute-store missing-paths))) - #:timeout (* 60 10))) - (lambda (key . args) - (log-msg lgr 'ERROR - "exception when fetching missing paths " - key ": " args) - (display (get-output-string log-port)) - (display (newline)) - (close-output-port log-port))))) + #:timeout (* 60 10)))))) (for-each (lambda (missing-path) (add-temp-root store missing-path)) missing-paths)) @@ -776,7 +784,7 @@ but the guix-daemon claims it's unavailable" (delete-paths store output-file-names))) #t) (lambda (key args) - (display (get-output-string log-port)) + (display/safe (get-output-string log-port)) (log-msg lgr 'ERROR "delete-outputs: " key args) @@ -865,25 +873,30 @@ but the guix-daemon claims it's unavailable" (raise-exception exn))) #f) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (unless (and (store-protocol-error? exn) + (let ((status (store-protocol-error-status exn))) + (or (= status 1) + (= status 100) + (= status 101)))) + (simple-format/safe (current-error-port) + "exception when performing build: ~A\n" + exn) + (let* ((stack (make-stack #t)) + (backtrace + (call-with-output-string + (lambda (port) + (display-backtrace stack port) + (newline port))))) + (display/safe backtrace))) + (raise-exception exn)) (lambda () (build-things store (list (derivation-file-name derivation))) (for-each (lambda (output) (add-temp-root store output)) (map derivation-output-path - (map cdr (derivation-outputs derivation))))) - (lambda (key . args) - (unless (and (eq? key '%exception) - (store-protocol-error? (car args)) - (let ((status (store-protocol-error-status - (car args)))) - (or (= status 1) - (= status 100) - (= status 101)))) - (simple-format (current-error-port) - "exception when performing build: ~A ~A\n" - key args) - (backtrace)))) + (map cdr (derivation-outputs derivation)))))) #t) #:unwind? #t))) diff --git a/guix-build-coordinator/build-allocator.scm b/guix-build-coordinator/build-allocator.scm index 588f213..8c08144 100644 --- a/guix-build-coordinator/build-allocator.scm +++ b/guix-build-coordinator/build-allocator.scm @@ -492,18 +492,6 @@ (let ((prioritised-builds (datastore-fetch-prioritised-unprocessed-builds datastore))) - (define systems-for-builds - ;; TODO Should be one query - (let ((table (make-hash-table))) - (for-each (lambda (build-id) - (hash-set! table - build-id - (datastore-find-build-derivation-system - datastore - build-id))) - prioritised-builds) - table)) - (define tags-for-build (let ((build-tags (make-hash-table))) (lambda (build-id) @@ -545,40 +533,41 @@ (else (error "Unknown setup failure " failure-reason))))) - (lambda (build-id) - (log "build:" build-id) - (and - (or (null? requested-systems) - (let ((build-system (hash-ref systems-for-builds build-id))) - (member build-system requested-systems))) - (agent-tags-match-build-tags agent-tags tags-for-build - agent-id build-id) - (let* ((setup-failures-for-build - (or (hash-ref setup-failures-hash build-id) - '())) - (relevant-setup-failures - (filter relevant-setup-failure? - setup-failures-for-build))) - (log "relevant setup failures:" relevant-setup-failures) - (if (null? relevant-setup-failures) - #t - #f))))) - - (when metrics-registry - (let ((counts - (hash-fold - (lambda (_ system result) - `(,@(alist-delete system result) - (,system . ,(+ 1 (or (assoc-ref result system) 0))))) - '() - systems-for-builds))) - (for-each - (match-lambda - ((system . count) - (metric-set allocator-considered-builds-metric - count - #:label-values `((system . ,system))))) - counts))) + (match-lambda + (#(build-id build-system) + (log "build:" build-id) + (and + (or (null? requested-systems) + (member build-system requested-systems)) + (agent-tags-match-build-tags agent-tags tags-for-build + agent-id build-id) + (let* ((setup-failures-for-build + (or (hash-ref setup-failures-hash build-id) + '())) + (relevant-setup-failures + (filter relevant-setup-failure? + setup-failures-for-build))) + (log "relevant setup failures:" relevant-setup-failures) + (if (null? relevant-setup-failures) + #t + #f)))))) + + ;; TODO Restore this in a more performant way + ;; (when metrics-registry + ;; (let ((counts + ;; (hash-fold + ;; (lambda (_ system result) + ;; `(,@(alist-delete system result) + ;; (,system . ,(+ 1 (or (assoc-ref result system) 0))))) + ;; '() + ;; systems-for-builds))) + ;; (for-each + ;; (match-lambda + ;; ((system . count) + ;; (metric-set allocator-considered-builds-metric + ;; count + ;; #:label-values `((system . ,system))))) + ;; counts))) (let ((result (map @@ -589,21 +578,23 @@ (build-ids (let loop ((count 0) (build-ids '()) - (potential-build-ids prioritised-builds)) + (potential-builds prioritised-builds)) (if (or (and planned-builds-for-agent-limit (>= count planned-builds-for-agent-limit)) - (null? potential-build-ids)) + (null? potential-builds)) (reverse build-ids) ;; highest priority last, so ;; reverse - (let ((potential-build (first potential-build-ids))) - (if (filter-proc potential-build) + (let ((potential-build-details (first potential-builds))) + (if (filter-proc potential-build-details) (loop (+ 1 count) - (cons potential-build + (cons (vector-ref + potential-build-details + 0) build-ids) - (cdr potential-build-ids)) + (cdr potential-builds)) (loop count build-ids - (cdr potential-build-ids)))))))) + (cdr potential-builds)))))))) (cons agent-id build-ids))) (map (lambda (agent) (assq-ref agent 'uuid)) diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm index 47a289d..6ec6578 100644 --- a/guix-build-coordinator/client-communication.scm +++ b/guix-build-coordinator/client-communication.scm @@ -32,6 +32,10 @@ #:use-module (json) #:use-module (logging logger) #:use-module (gcrypt random) + #:use-module (knots) + #:use-module (knots timeout) + #:use-module (knots web-server) + #:use-module (knots thread-pool) #:use-module (prometheus) #:use-module (web uri) #:use-module (web client) @@ -67,9 +71,9 @@ host port build-coordinator - utility-thread-pool-channel) - (run-server/patched - (lambda (request body) + utility-thread-pool) + (run-knots-web-server + (lambda (request) (log-msg (build-coordinator-logger build-coordinator) 'INFO (format #f "~4a ~a\n" @@ -80,10 +84,10 @@ (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) - body + (read-request-body request) secret-key-base build-coordinator - utility-thread-pool-channel))) + utility-thread-pool))) #:host host #:port port)) @@ -92,7 +96,7 @@ raw-body secret-key-base build-coordinator - utility-thread-pool-channel) + utility-thread-pool) (define datastore (build-coordinator-datastore build-coordinator)) @@ -414,6 +418,14 @@ (or (and=> (assq-ref query-parameters 'priority_lt) string->number) 'unset) + #:created-at-> + (or (and=> (assq-ref query-parameters 'created_at_gt) + datastore-validate-datetime-string) + 'unset) + #:created-at-< + (or (and=> (assq-ref query-parameters 'created_at_lt) + datastore-validate-datetime-string) + 'unset) #:relationship (or (and=> (assq-ref query-parameters 'relationship) string->symbol) @@ -490,16 +502,25 @@ (store-path-base derivation-file)) (define (read-drv/substitute derivation-file) - (with-store/non-blocking store - (unless (valid-path? store derivation-file) - (substitute-derivation store - derivation-file - #:substitute-urls substitute-urls))) + (call-with-delay-logging + (lambda () + (with-store/non-blocking store + (unless (valid-path? store derivation-file) + (with-port-timeouts + (lambda () + (substitute-derivation store + derivation-file + #:substitute-urls substitute-urls)) + #:timeout 60))))) ;; Read the derivation in a thread to avoid blocking fibers - (call-with-worker-thread - utility-thread-pool-channel + (call-with-thread + utility-thread-pool (lambda () - (read-derivation-from-file* derivation-file)))) + (read-derivation-from-file* derivation-file)) + #:duration-logger + (lambda (duration) + (log-delay read-derivation-from-file* + duration)))) (let ((submit-build-result (call-with-delay-logging @@ -508,29 +529,36 @@ `(,build-coordinator ,derivation-file #:read-drv - ,(lambda (derivation-file) - (with-exception-handler - (lambda (exn) - (log-msg - (build-coordinator-logger build-coordinator) - 'WARN - "exception substituting derivation " derivation-file - ": " exn) - (raise-exception exn)) - (lambda () - (with-throw-handler #t - (lambda () - (retry-on-error + ,(let ((memoized-drv #f)) + (lambda (derivation-file) + (or + memoized-drv + (with-exception-handler + (lambda (exn) + (log-msg + (build-coordinator-logger build-coordinator) + 'WARN + "exception substituting derivation " derivation-file + ": " exn) + (raise-exception exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) (lambda () - (read-drv/substitute derivation-file)) - #:times 2 - #:delay 3 - #:error-hook - (lambda _ - (metric-increment read-drv-error-count-metric)))) - (lambda args - (backtrace)))) - #:unwind? #t)) + (let ((result + (retry-on-error + (lambda () + (read-drv/substitute derivation-file)) + #:times 2 + #:delay 3 + #:error-hook + (lambda _ + (metric-increment read-drv-error-count-metric))))) + (set! memoized-drv result) + result)))) + #:unwind? #t)))) ,@(let ((priority (assoc-ref body "priority"))) (if priority `(#:priority ,priority) @@ -622,7 +650,9 @@ (datastore-list-agent-builds datastore (assq-ref agent-details 'uuid)))))))) - (datastore-list-agents datastore)))))))))) + (datastore-list-agents datastore))))))) + #:priority 'high + #:duration-metric-name "get_state"))) (('GET "events") (let ((headers (request-headers request))) (list (build-response @@ -667,7 +697,7 @@ (render-json `((error . ,(client-error-details exn))) #:code 400)) - ((worker-thread-timeout-error? exn) + ((thread-pool-timeout-error? exn) (render-json `((error . ,(simple-format #f "~A" exn))) #:code 503)) @@ -676,20 +706,18 @@ `((error . 500)) #:code 500)))) (lambda () - (with-throw-handler #t - controller-thunk - (lambda (key . args) - (unless (and (eq? '%exception key) - (or - (worker-thread-timeout-error? (car args)) - (client-error? (car args)))) + (with-exception-handler + (lambda (exn) + (unless (or + (thread-pool-timeout-error? exn) + (client-error? exn)) (match method-and-path-components ((method path-components ...) (simple-format (current-error-port) - "error: when processing client request: /~A ~A\n ~A ~A\n" + "error: when processing client request: /~A ~A\n ~A\n" method (string-join path-components "/") - key args))) + exn))) (let* ((stack (make-stack #t 4)) (backtrace @@ -701,7 +729,9 @@ (newline port))))) (display backtrace - (current-error-port))))))) + (current-error-port)))) + (raise-exception exn)) + controller-thunk)) #:unwind? #t)) (define* (render-json json #:key (extra-headers '()) @@ -871,6 +901,8 @@ (canceled 'unset) (priority-> 'unset) (priority-< 'unset) + (created-at-> 'unset) + (created-at-< 'unset) (relationship 'unset) (after-id #f) (limit #f)) @@ -919,6 +951,12 @@ ,@(if (number? priority-<) (list (simple-format #f "priority_lt=~A" priority-<)) '()) + ,@(if (string? created-at->) + (list (simple-format #f "created_at_gt=~A" created-at->)) + '()) + ,@(if (string? created-at-<) + (list (simple-format #f "created_at_lt=~A" created-at-<)) + '()) ,@(if (and relationship (not (eq? 'unset relationship))) (list (simple-format #f "relationship=~A" relationship)) '()) diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index a7fb664..adb7575 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -32,12 +32,16 @@ #:use-module (ice-9 match) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) + #:use-module (ice-9 suspendable-ports) #:use-module (ice-9 format) #:use-module (ice-9 atomic) #:use-module (ice-9 control) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) + #:use-module (system repl server) + #:use-module (system repl command) + #:use-module (system repl debug) #:use-module (web uri) #:use-module (web http) #:use-module (oop goops) @@ -45,13 +49,18 @@ #:use-module (logging port-log) #:use-module (gcrypt random) #:use-module (fibers) + #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers conditions) + #:use-module (fibers operations) + #:use-module (knots) + #:use-module (knots timeout) + #:use-module (knots parallelism) + #:use-module (knots thread-pool) #:use-module (prometheus) - #:use-module ((guix build syscalls) - #:select (set-thread-name)) #:use-module (guix store) + #:use-module (guix progress) #:use-module (guix derivations) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) @@ -68,6 +77,8 @@ client-error? client-error-details + %build-coordinator + make-build-coordinator build-coordinator-datastore build-coordinator-hooks @@ -99,6 +110,8 @@ build-coordinator-prompt-hook-processing-for-event start-hook-processing-threads + build-coordinator-update-metrics + build-coordinator-allocation-plan-stats build-coordinator-trigger-build-allocation build-coordinator-list-allocation-plan-builds @@ -108,7 +121,9 @@ build-log-file-location handle-build-start-report handle-build-result - handle-setup-failure-report)) + handle-setup-failure-report + + build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built)) (define-exception-type &agent-error &error make-agent-error @@ -120,6 +135,9 @@ client-error? (details client-error-details)) +(define %build-coordinator + (make-parameter #f)) + (define-record-type <build-coordinator> (make-build-coordinator-record datastore hooks metrics-registry allocation-strategy allocator-channel @@ -129,6 +147,9 @@ (hooks build-coordinator-hooks) (hook-condvars build-coordinator-hook-condvars set-build-coordinator-hook-condvars!) + (background-job-conditions + build-coordinator-background-job-conditions + set-build-coordinator-background-job-conditions!) (metrics-registry build-coordinator-metrics-registry) (allocation-strategy build-coordinator-allocation-strategy) (trigger-build-allocation @@ -141,30 +162,17 @@ (get-state-id build-coordinator-get-state-id-proc set-build-coordinator-get-state-id-proc!) (scheduler build-coordinator-scheduler - set-build-coordinator-scheduler!)) + set-build-coordinator-scheduler!) + (utility-thread-pool build-coordinator-utility-thread-pool + set-build-coordinator-utility-thread-pool!)) (set-record-type-printer! <build-coordinator> (lambda (build-coordinator port) (display "#<build-coordinator>" port))) -(define-class <custom-port-log> (<log-handler>) - (port #:init-value #f #:accessor port #:init-keyword #:port)) - -(define-method (emit-log (self <custom-port-log>) str) - (when (port self) - (put-bytevector (port self) - (string->utf8 str)) - ;; Even though the port is line buffered, writing to it with - ;; put-bytevector doesn't cause the buffer to be flushed. - (force-output (port self)))) - -(define-method (flush-log (self <custom-port-log>)) - (and=> (port self) force-output)) - -(define-method (close-log! (self <custom-port-log>)) - (and=> (port self) close-port) - (set! (port self) #f)) +(define %command-duration-histogram-buckets + (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf))) (define %known-hooks '(build-submitted @@ -210,7 +218,15 @@ #f) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (if (and + (exception-with-origin? exn) + (string=? (exception-origin exn) + "fport_write")) + #f + (print-backtrace-and-exception/knots exn)) + (raise-exception exn)) (lambda () (match (atomic-box-ref current-state-id-and-event-buffer-index-box) @@ -234,16 +250,7 @@ (iota event-count-to-send (+ 1 last-sent-state-id)))) - current-state-id))) - (lambda (key . args) - (if (and - (eq? key 'system-error) - (match args - (("fport_write" "~A" ("Broken pipe") rest ...) - #t) - (_ #f))) - #f - (backtrace))))) + current-state-id))))) #:unwind? #t))) (unless (eq? #f new-state-id) @@ -285,7 +292,8 @@ (if (> requested-after-state-id current-state-id) current-state-id - requested-after-state-id) + (max 0 + requested-after-state-id)) current-state-id))))) (atomic-box-set! listener-channels-box @@ -355,6 +363,27 @@ (define (build-coordinator-get-state-id build-coordinator) ((build-coordinator-get-state-id-proc build-coordinator))) +(define (build-coordinator-update-metrics build-coordinator) + (define metrics-registry + (build-coordinator-metrics-registry build-coordinator)) + + (let ((utility-thread-pool-used-thread-metric + (or (metrics-registry-fetch-metric + metrics-registry + "utility_thread_pool_used_thread_total") + (make-gauge-metric + metrics-registry + "utility_thread_pool_used_thread_total")))) + + (and=> (build-coordinator-utility-thread-pool build-coordinator) + (lambda (utility-thread-pool) + (metric-set + utility-thread-pool-used-thread-metric + (vector-count + (lambda (_ proc) + (->bool proc)) + (thread-pool-proc-vector utility-thread-pool))))))) + (define* (make-build-coordinator #:key database-uri-string @@ -366,7 +395,7 @@ (database-uri->datastore database-uri-string #:metrics-registry metrics-registry - #:worker-thread-log-exception? + #:thread-pool-log-exception? (lambda (exn) (and (not (agent-error? exn)) (not (client-error? exn)))))) @@ -443,6 +472,16 @@ (setrlimit 'core #f #f)) #:unwind? #t) + (let ((core-file + (string-append (getcwd) "/core")) + (metric + (make-gauge-metric (build-coordinator-metrics-registry + build-coordinator) + "core_dump_file_last_modified_seconds"))) + (when (file-exists? core-file) + (metric-set metric + (stat:mtime (stat core-file))))) + (with-exception-handler (lambda (exn) (simple-format #t "failed increasing open file limit: ~A\n" exn)) @@ -457,10 +496,30 @@ (lambda (scheduler port) (display "#<scheduler>" port))) - (when pid-file - (call-with-output-file pid-file - (lambda (port) - (simple-format port "~A\n" (getpid))))) + (call-with-new-thread + (lambda () + (set-thread-name + (string-append "gc watcher")) + + (add-hook! + after-gc-hook + (let ((last-gc-time-taken + (assq-ref (gc-stats) 'gc-time-taken))) + (lambda () + (let* ((gc-time-taken + (assq-ref (gc-stats) 'gc-time-taken)) + (time-since-last + (/ (- gc-time-taken + last-gc-time-taken) + internal-time-units-per-second))) + (when (> time-since-last 0.1) + (format (current-error-port) + "after gc (additional time taken: ~f)\n" + time-since-last)) + (set! last-gc-time-taken + (assq-ref (gc-stats) 'gc-time-taken)))))) + (while #t + (sleep 0.1)))) (when update-datastore? (datastore-update (build-coordinator-datastore build-coordinator))) @@ -469,17 +528,17 @@ build-coordinator (make-build-allocator-thread build-coordinator)) - (set-build-coordinator-hook-condvars! - build-coordinator - (start-hook-processing-threads build-coordinator - parallel-hooks)) - (when trigger-build-allocation? (trigger-build-allocation build-coordinator))) (define %default-agent-uri (string->uri "http://0.0.0.0:8745")) (define %default-client-uri (string->uri "http://127.0.0.1:8746")) +(define %default-repl-server-port + ;; Default port to run REPL server on, if --listen-repl is provided + ;; but no port is mentioned + 37146) + (define* (run-coordinator-service build-coordinator #:key (update-datastore? #t) @@ -487,7 +546,10 @@ (agent-communication-uri %default-agent-uri) (client-communication-uri %default-client-uri) secret-key-base - (parallel-hooks '())) + (parallel-hooks '()) + listen-repl) + (install-suspendable-ports!) + (with-fluids ((%file-port-name-canonicalization 'none)) (perform-coordinator-service-startup build-coordinator @@ -495,23 +557,45 @@ #:pid-file pid-file #:parallel-hooks parallel-hooks) + (when listen-repl + (parameterize ((%build-coordinator build-coordinator)) + (cond + ((or (eq? #t listen-repl) + (number? listen-repl)) + (let ((listen-repl + (if (eq? #t listen-repl) + %default-repl-server-port + listen-repl))) + (format (current-error-port) + "REPL server listening on port ~a~%" + listen-repl) + (spawn-server (make-tcp-server-socket + #:port + (if (eq? #t listen-repl) + %default-repl-server-port + listen-repl))))) + (else + (format (current-error-port) + "REPL server listening on ~a~%" + listen-repl) + (spawn-server (make-unix-domain-server-socket #:path listen-repl)))))) + ;; Create some worker thread channels, which need to be created prior ;; to run-fibers being called. (let ((output-hash-channel (make-output-hash-channel build-coordinator)) - (utility-thread-pool-channel - (make-worker-thread-channel - (const '()) + (utility-thread-pool + (make-thread-pool + 18 #:name "utility" - #:parallelism 10 #:delay-logger (let ((delay-metric (make-histogram-metric (build-coordinator-metrics-registry build-coordinator) "utility_thread_pool_delay_seconds"))) - (lambda (seconds-delayed) + (lambda (seconds-delayed proc) (log-delay "utility thread channel" seconds-delayed) (metric-observe delay-metric seconds-delayed) @@ -521,6 +605,18 @@ "warning: utility thread channel delayed by ~1,2f seconds~%" seconds-delayed))))))) + (let ((metric + (make-gauge-metric + (build-coordinator-metrics-registry build-coordinator) + "utility_thread_pool_thread_total"))) + (metric-set metric + (vector-length + (thread-pool-proc-vector utility-thread-pool)))) + + (set-build-coordinator-utility-thread-pool! + build-coordinator + utility-thread-pool) + (let ((finished? (make-condition))) (call-with-sigint (lambda () @@ -542,6 +638,9 @@ (iota (length schedulers)) schedulers)) + (set-build-coordinator-scheduler! build-coordinator + (current-scheduler)) + (log-msg (build-coordinator-logger build-coordinator) 'INFO "initialising metrics") @@ -553,13 +652,19 @@ (datastore-spawn-fibers (build-coordinator-datastore build-coordinator)) + (set-build-coordinator-hook-condvars! + build-coordinator + (start-hook-processing-threads build-coordinator + parallel-hooks)) + + (set-build-coordinator-background-job-conditions! + build-coordinator + (start-background-job-processing-fibers build-coordinator)) + (spawn-fiber-to-watch-for-deferred-builds build-coordinator) (spawn-build-allocation-plan-management-fiber build-coordinator) - (set-build-coordinator-scheduler! build-coordinator - (current-scheduler)) - (let ((events-channel get-state-id (make-events-channel @@ -591,7 +696,12 @@ (uri-host client-communication-uri) (uri-port client-communication-uri) build-coordinator - utility-thread-pool-channel) + utility-thread-pool) + + (when pid-file + (call-with-output-file pid-file + (lambda (port) + (simple-format port "~A\n" (getpid))))) ;; Guile seems to just stop listening on ports, so try to ;; monitor that internally and just quit if it happens @@ -650,12 +760,9 @@ derivation-file))) (if (eq? #f system) ; derivation does not exist in database? (build-for-output-already-exists/with-derivation? - (with-fibers-port-timeouts - (lambda () - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file))) - #:timeout 240)) + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))) (any (lambda (output-details) (let ((builds-for-output @@ -780,12 +887,9 @@ ;; derivations with no builds works (if (datastore-find-derivation datastore derivation-file) #f - (with-fibers-port-timeouts - (lambda () - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file))) - #:timeout 30)))) + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))))) (when drv (datastore-store-derivation datastore drv)) @@ -806,8 +910,9 @@ ;; time too. (cons related-drv (random-v4-uuid))) related-derivations-lacking-builds)) - #:duration-metric-name - "store_build") + #:priority 'low + #:duration-metric-name "store_build" + #:duration-metric-buckets %command-duration-histogram-buckets) (#t ; build submitted (build-coordinator-prompt-hook-processing-for-event build-coordinator @@ -837,7 +942,8 @@ (stop-condition stop-condition))))) (stop-condition - stop-condition))))) + stop-condition))) + #:buckets %command-duration-histogram-buckets)) (define* (cancel-build build-coordinator uuid #:key (ignore-if-build-required-by-another? #t) @@ -850,6 +956,10 @@ datastore (lambda (db) (let ((build-details (datastore-find-build datastore uuid))) + (unless build-details + (raise-exception + (make-client-error 'build-unknown))) + (when (assq-ref build-details 'canceled) (raise-exception (make-client-error 'build-already-canceled))) @@ -872,7 +982,10 @@ (datastore-insert-unprocessed-hook-event datastore "build-canceled" (list uuid)) - 'build-canceled)))) + 'build-canceled) + #:priority 'low + #:duration-metric-name "cancel_build" + #:duration-metric-buckets %command-duration-histogram-buckets))) (when (eq? val 'build-canceled) (unless skip-updating-derived-priorities? @@ -893,6 +1006,10 @@ val)) + (unless (datastore-find-build datastore uuid) + (raise-exception + (make-client-error 'build-unknown))) + (if ignore-if-build-required-by-another? (let ((build-required ;; Do this check here outside the transaction to avoid having to @@ -915,6 +1032,10 @@ datastore (lambda (db) (let ((build-details (datastore-find-build datastore uuid))) + (unless build-details + (raise-exception + (make-client-error 'build-unknown))) + (when (assq-ref build-details 'canceled) (raise-exception (make-client-error 'build-already-canceled))) @@ -929,7 +1050,10 @@ new-priority #:skip-updating-derived-priorities? skip-updating-derived-priorities? - #:override-derived-priority override-derived-priority))) + #:override-derived-priority override-derived-priority)) + #:priority 'low + #:duration-metric-name "update_build_priority" + #:duration-metric-buckets %command-duration-histogram-buckets) (trigger-build-allocation build-coordinator) @@ -1012,7 +1136,17 @@ (and=> (assoc-ref (build-coordinator-hook-condvars build-coordinator) event-name) (lambda (condvar) - (signal-condition-variable condvar) + (cond + ((condition-variable? condvar) + (signal-condition-variable condvar)) + ((reusable-condition? condvar) + (signal-reusable-condition! + condvar + (build-coordinator-scheduler build-coordinator))) + (else + (error + (simple-format #f "unrecognised condvar ~A" + condvar)))) #t))) (define (update-build-allocation-plan build-coordinator) @@ -1074,24 +1208,19 @@ (lambda () (with-exception-handler (lambda (exn) - (simple-format - (current-error-port) - "build-allocator-thread: exception: ~A\n" - exn) (metric-increment failure-counter-metric) (atomic-box-set! allocation-needed #t)) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "error in build allocator thread\n") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) (lambda () (update-build-allocation-plan build-coordinator) - (metric-increment success-counter-metric)) - (lambda (key . args) - (simple-format - (current-error-port) - "error in build allocator thread: ~A ~A\n" - key - args) - (backtrace)))) + (metric-increment success-counter-metric)))) #:unwind? #t)) #:buckets ((@@ (prometheus) exponential-histogram-buckets) ; TODO #:start 1 @@ -1139,12 +1268,14 @@ (lambda () (while #t (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "exception in allocation plan fiber: ~A\n" - exn)) + (lambda _ #f) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "exception in allocation plan fiber\n") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) (lambda () (match (get-message (build-coordinator-allocator-channel coordinator)) (('stats reply) @@ -1187,9 +1318,7 @@ (update-build-allocation-plan-metrics!) - (put-message reply #t)))) - (lambda _ - (backtrace)))) + (put-message reply #t)))))) #:unwind? #t))))) (define (build-coordinator-allocation-plan-stats coordinator) @@ -1198,6 +1327,11 @@ (list 'stats reply)) (get-message reply))) +(define (build-coordinator-count-allocation-plan-builds coordinator agent-id) + (or (assoc-ref (build-coordinator-allocation-plan-stats coordinator) + agent-id) + 0)) + (define (build-coordinator-fetch-agent-allocation-plan coordinator agent-id) (let ((reply (make-channel))) (put-message (build-coordinator-allocator-channel coordinator) @@ -1275,7 +1409,8 @@ (define* (build-coordinator-list-allocation-plan-builds coordinator agent-id - #:key limit) + #:key limit + filter?) (define (take* lst i) (if (< (length lst) i) lst @@ -1284,31 +1419,51 @@ (define datastore (build-coordinator-datastore coordinator)) + (define (build-data uuid + derivation-name + derived-priority + build-details) + `((uuid . ,uuid) + (derivation_name . ,derivation-name) + (system . ,(datastore-find-build-derivation-system + datastore + uuid)) + (priority . ,(assq-ref build-details 'priority)) + (derived_priority . ,derived-priority) + (tags . ,(vector-map + (lambda (_ tag) + (match tag + ((key . value) + `((key . ,key) + (value . ,value))))) + (datastore-fetch-build-tags + datastore + uuid))))) + (let ((build-ids (build-coordinator-fetch-agent-allocation-plan coordinator agent-id))) (filter-map (lambda (build-id) - (match (datastore-fetch-build-to-allocate datastore build-id) - (#(uuid derivation_id derivation_name derived_priority) - (let ((build-details (datastore-find-build datastore uuid))) - `((uuid . ,uuid) - (derivation_name . ,derivation_name) - (system . ,(datastore-find-build-derivation-system - datastore - uuid)) - (priority . ,(assq-ref build-details 'priority)) - (derived_priority . ,derived_priority) - (tags . ,(vector-map - (lambda (_ tag) - (match tag - ((key . value) - `((key . ,key) - (value . ,value))))) - (datastore-fetch-build-tags - datastore - uuid)))))) - (#f #f))) + (if filter? + (match (datastore-fetch-build-to-allocate datastore build-id) + (#(uuid derivation_id derivation_name derived_priority) + (let ((build-details (datastore-find-build datastore uuid))) + (build-data uuid + derivation_name + derived_priority + build-details))) + (#f #f)) + (let ((build-details (datastore-find-build datastore build-id)) + (unprocessed-builds-entry + (datastore-find-unprocessed-build-entry + datastore + build-id))) + (build-data build-id + (assq-ref build-details 'derivation-name) + (assq-ref unprocessed-builds-entry + 'derived-priority) + build-details)))) (if limit (take* build-ids limit) build-ids)))) @@ -1359,6 +1514,18 @@ "hook_failure_total" #:labels '(event))) + (define process-events-thread-pool-thread-total-metric + (make-gauge-metric + (build-coordinator-metrics-registry build-coordinator) + "hook_thread_pool_thread_total" + #:labels '(event))) + + (define process-events-thread-pool-used-thread-total-metric + (make-gauge-metric + (build-coordinator-metrics-registry build-coordinator) + "hook_thread_pool_used_thread_total" + #:labels '(event))) + (define (process-event id event arguments handler) (log-msg (build-coordinator-logger build-coordinator) 'DEBUG @@ -1367,10 +1534,6 @@ (and (with-exception-handler (lambda (exn) - (log-msg (build-coordinator-logger build-coordinator) - 'ERROR - "error running " event " (" id ") hook: " - exn) (metric-increment failure-counter-metric #:label-values `((event . ,event))) @@ -1381,25 +1544,34 @@ (build-coordinator-metrics-registry build-coordinator) "hook_duration_seconds" (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (let* ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))))) + (backtrace + (call-with-output-string + (lambda (port) + (print-frames (stack->vector stack) + port + #:count (stack-length stack)) + (print-exception + port + (stack-ref stack 4) + '%exception + (list exn)))))) + (log-msg (build-coordinator-logger build-coordinator) + 'ERROR + "error running " event " (" id ") hook\n" + backtrace)) + (raise-exception exn)) (lambda () (start-stack - 'hook - (apply handler build-coordinator arguments))) - (lambda (key . args) - (log-msg (build-coordinator-logger build-coordinator) - 'ERROR - "error running " event " (" id ") hook: " - key " " args) - (let* ((stack (make-stack #t 3)) - (backtrace - (call-with-output-string - (lambda (port) - (display-backtrace stack port) - (newline port))))) - (display - backtrace - (current-output-port)))))) + #t + (apply handler build-coordinator arguments))))) #:labels '(event) #:label-values `((event . ,event))) #t) @@ -1425,84 +1597,116 @@ (define (single-thread-process-events event-name handler) (let ((mtx (make-mutex)) (condvar (make-condition-variable))) - (call-with-new-thread + (call-with-default-io-waiters (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (symbol->string event-name))) - (const #t)) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (symbol->string event-name))) + (const #t)) + + (lock-mutex mtx) + (while #t + (with-exception-handler + (lambda (exn) + (sleep 10)) + (lambda () + (with-exception-handler + (lambda (exn) + (log-msg + (build-coordinator-logger build-coordinator) + 'CRITICAL + "error in " event-name " hook processing thread") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (while #t + (match (datastore-list-unprocessed-hook-events datastore event-name 1) + (() + (wait-condition-variable condvar mtx)) + (((id event arguments)) + (process-event id event arguments handler))))))) + #:unwind? #t)))))) + condvar)) - (lock-mutex mtx) - (while #t - (with-exception-handler - (lambda (exn) - (with-exception-handler - (lambda _ - ;; Things are really going wrong if logging about - ;; the hook processing thread crashing, also raises - ;; an exception, so just try and sleep and hope - ;; things go better next time - (sleep 10)) - (lambda () - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "hook processing thread " event-name - " exception: " exn)) - #:unwind? #t) - (sleep 10)) - (lambda () - (with-throw-handler #t + (define (thread-pool-process-events event-name handler thread-count) + (let ((thread-pool + (make-thread-pool + thread-count + #:name (simple-format #f "~A" event-name))) + (reusable-condition + (make-reusable-condition)) + (coordination-channel + (make-channel))) + + (metric-set process-events-thread-pool-thread-total-metric + thread-count + #:label-values `((event . ,event-name))) + + (spawn-fiber + (lambda () + (let loop ((running-ids '())) + (metric-set process-events-thread-pool-used-thread-total-metric + (vector-count + (lambda (_ proc) + (->bool proc)) + (thread-pool-proc-vector thread-pool)) + #:label-values `((event . ,event-name))) + (match (get-message coordination-channel) + (('process id event arguments) + (if (member id running-ids) + ;; Ignore already running jobs + (loop running-ids) + (begin + (spawn-fiber + (lambda () + (call-with-thread + thread-pool + (lambda () + (process-event id event arguments handler)) + ;; TODO Make this the default through knots + #:timeout #f) + (put-message coordination-channel + (list 'finished id)))) + (loop (cons id running-ids))))) + (('finished id) + (when (< (length running-ids) + (* 2 thread-count)) + (signal-reusable-condition! reusable-condition)) + (loop (delete id running-ids))) + (('count-running reply) + (let ((count (length running-ids))) + (spawn-fiber (lambda () - (while #t - (match (datastore-list-unprocessed-hook-events datastore event-name 1) - (() - (wait-condition-variable condvar mtx)) - (((id event arguments)) - (process-event id event arguments handler))))) - (lambda (key . args) - (log-msg - (build-coordinator-logger build-coordinator) - 'CRITICAL - "error in " event-name " hook processing thread: " key " " args) - (backtrace)))) - #:unwind? #t)))) - condvar)) + (put-message reply count)))) + (loop running-ids)))))) - (define (work-queue-process-events event-name handler thread-count) - (let-values (((pool-mutex job-available count-threads list-jobs) - (create-thread-pool - (lambda () - (max - 1 - (length - (datastore-list-unprocessed-hook-events - datastore - event-name - thread-count)))) - (lambda (running-jobs) - (let* ((in-progress-ids - (map car running-jobs)) - (potential-jobs - (datastore-list-unprocessed-hook-events - datastore - event-name - (+ 1 (length in-progress-ids)))) - (job - (find - (match-lambda - ((id rest ...) - (not (member id in-progress-ids)))) - potential-jobs))) - (log-msg - (build-coordinator-logger build-coordinator) - 'DEBUG - event-name " work queue, got job " job) - job)) - (lambda (id event arguments) - (process-event id event arguments handler)) - #:name (symbol->string event-name)))) - job-available)) + (spawn-fiber + (lambda () + (while #t + (let ((count + (let ((channel (make-channel))) + (put-message coordination-channel + (list 'count-running channel)) + (get-message channel)))) + (when (< count (* 2 thread-count)) + (log-msg (build-coordinator-logger build-coordinator) + 'DEBUG "submitting batch of " event-name " hook events") + (for-each + (match-lambda + ((id event arguments) + (put-message coordination-channel + (list 'process id event arguments)))) + (datastore-list-unprocessed-hook-events + datastore + event-name + (* 20 thread-count))))) + (reusable-condition-wait reusable-condition + #:timeout 60)))) + + reusable-condition)) (map (match-lambda @@ -1511,15 +1715,14 @@ (or (and=> (assq-ref parallel-hooks event-name) (lambda (thread-count) - (work-queue-process-events event-name - handler - thread-count))) + (thread-pool-process-events event-name + handler + thread-count))) (single-thread-process-events event-name handler))))) (build-coordinator-hooks build-coordinator))) -(define (fetch-builds build-coordinator agent systems - max-builds deprecated-requested-count) +(define (fetch-builds build-coordinator agent systems max-builds) (define datastore (build-coordinator-datastore build-coordinator)) @@ -1528,10 +1731,14 @@ build-coordinator agent-id))) (if build-details (let ((build-id (assq-ref build-details 'uuid))) - (datastore-insert-to-allocated-builds datastore agent-id (list build-id)) + (datastore-insert-to-allocated-builds + datastore + agent-id + build-id) (build-coordinator-remove-build-from-allocation-plan build-coordinator build-id) - build-details) + `(,@build-details + (submit_outputs . null))) #f))) (define (allocate-several-builds agent-id count) @@ -1555,28 +1762,68 @@ (datastore-list-agent-builds datastore agent)) (start-count (length initially-allocated-builds)) - (target-count (or max-builds - (+ start-count - deprecated-requested-count)))) + (target-count max-builds)) (if (< start-count target-count) (let ((new-builds (allocate-several-builds agent (- target-count start-count)))) - ;; Previously allocate builds just returned newly allocated - ;; builds, but if max-builds is provided, return all the - ;; builds. This means the agent can handle this in a idempotent - ;; manor. - (if max-builds - (append initially-allocated-builds - new-builds) - new-builds)) - ;; Previously allocate builds just returned newly allocated builds, - ;; but if max-builds is provided, return all the builds. This means - ;; the agent can handle this in a idempotent manor. - (if max-builds - initially-allocated-builds - '())))) - #:duration-metric-name "allocate_builds_to_agent")) + (if (null? new-builds) + (values initially-allocated-builds + #f) + (values (append initially-allocated-builds + new-builds) + #t))) + (values initially-allocated-builds + #f)))) + #:duration-metric-name "allocate_builds_to_agent" + #:duration-metric-buckets %command-duration-histogram-buckets)) + + (define (send-agent-builds-allocated-event builds) + (build-coordinator-send-event + build-coordinator + "agent-builds-allocated" + `((agent_id . ,agent) + (builds . ,(list->vector + (map + (lambda (build) + `(,@build + (tags + . ,(list->vector + (map + (match-lambda + ((key . value) + `((key . ,key) + (value . ,value)))) + (vector->list + (datastore-fetch-build-tags + datastore + (assq-ref build 'uuid)))))))) + builds)))))) + + (define (submit-outputs? build) + (with-exception-handler + (lambda (exn) + (log-msg (build-coordinator-logger build-coordinator) + 'CRITICAL + "build-submit-outputs hook raised exception") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (let ((hook-result + (call-with-delay-logging + (lambda () + (build-submit-outputs-hook + build-coordinator + (assq-ref build 'uuid)))))) + (if (boolean? hook-result) + hook-result + (begin + (log-msg + (build-coordinator-logger build-coordinator) + 'CRITICAL + "build-submit-outputs hook returned non boolean: " + hook-result) + #t)))))) (call-with-duration-metric (build-coordinator-metrics-registry build-coordinator) @@ -1592,111 +1839,42 @@ (trigger-build-allocation build-coordinator))) (let ((builds - (get-builds))) - - (build-coordinator-send-event - build-coordinator - "agent-builds-allocated" - `((agent_id . ,agent) - (builds . ,(list->vector - (map - (lambda (build) - `(,@build - (tags - . ,(list->vector - (map - (match-lambda - ((key . value) - `((key . ,key) - (value . ,value)))) - (vector->list - (datastore-fetch-build-tags - datastore - (assq-ref build 'uuid)))))))) - builds))))) - - (map (lambda (build) - (define submit-outputs? - (with-exception-handler - (lambda (exn) - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook raised exception: " - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (let ((hook-result - (call-with-delay-logging - (lambda () - (build-submit-outputs-hook - build-coordinator - (assq-ref build 'uuid)))))) - (if (boolean? hook-result) - hook-result - (begin - (log-msg - (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook returned non boolean: " - hook-result) - #t)))) - (lambda (key . args) - (backtrace)))) - #:unwind? #t)) - - `(,@build - ;; TODO This needs reconsidering when things having been built in - ;; the past doesn't necessarily mean they're still available. - (submit_outputs . ,submit-outputs?))) - builds))))))) + new-builds-allocated? + (if (= 0 + (build-coordinator-count-allocation-plan-builds + build-coordinator + agent)) + (values + (datastore-list-agent-builds datastore agent) + #f) + (get-builds)))) + + (when new-builds-allocated? + (send-agent-builds-allocated-event builds)) + + (map + (lambda (build) + (if (eq? 'null (assq-ref build 'submit_outputs)) + (let ((submit-outputs? (submit-outputs? build))) + (datastore-update-allocated-build-submit-outputs + (build-coordinator-datastore build-coordinator) + (assq-ref build 'uuid) + submit-outputs?) + + `(,@(alist-delete 'submit_outputs build) + (submit_outputs . ,submit-outputs?))) + build)) + builds))))))) (define (agent-details build-coordinator agent-id) (define datastore (build-coordinator-datastore build-coordinator)) - (define build-submit-outputs-hook - (assq-ref (build-coordinator-hooks build-coordinator) - 'build-submit-outputs)) - - (define (submit-outputs? build) - (with-exception-handler - (lambda (exn) - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook raised exception: " - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (let ((hook-result - (call-with-delay-logging - (lambda () - (build-submit-outputs-hook - build-coordinator - (assq-ref build 'uuid)))))) - (if (boolean? hook-result) - hook-result - (begin - (log-msg - (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook returned non boolean: " - hook-result) - #t)))) - (lambda (key . args) - (backtrace)))) - #:unwind? #t)) - (let ((agent (datastore-find-agent datastore agent-id)) (allocated-builds (datastore-list-agent-builds datastore agent-id))) `(,@agent ; description - (builds . ,(list->vector - (map (lambda (build) - `(,@build - (submit_outputs . ,(submit-outputs? build)))) - allocated-builds)))))) + (builds . ,(list->vector allocated-builds))))) (define (build-data-location build-id ) (string-append (%config 'builds-dir) "/" @@ -1845,10 +2023,11 @@ (list build-id)) (when success? - (datastore-delete-relevant-outputs-from-unbuilt-outputs + (datastore-insert-background-job datastore - build-id) - (datastore-update-unprocessed-builds-for-build-success + 'build-success + (list build-id)) + (datastore-delete-relevant-outputs-from-unbuilt-outputs datastore build-id) (datastore-store-output-metadata @@ -1858,7 +2037,8 @@ (assoc-ref result-json "outputs")))) #f)))) - #:duration-metric-name "store_build_result"))) + #:duration-metric-name "store_build_result" + #:duration-metric-buckets %command-duration-histogram-buckets))) (when exception ;; Raise the exception here to avoid aborting the transaction (raise-exception exception))) @@ -1872,6 +2052,11 @@ 'build-success 'build-failure)) + (when success? + (build-coordinator-trigger-background-job-processing + build-coordinator + 'build-success)) + (build-coordinator-send-event build-coordinator (if success? @@ -1884,7 +2069,8 @@ ;; could change the allocation (trigger-build-allocation build-coordinator) - #t)))) + #t)) + #:buckets %command-duration-histogram-buckets)) (define (handle-build-start-report build-coordinator agent-id @@ -1903,7 +2089,8 @@ build-coordinator 'build-started `((build_id . ,build-id) - (agent_id . ,agent-id)))))) + (agent_id . ,agent-id)))) + #:buckets %command-duration-histogram-buckets)) (define (handle-setup-failure-report build-coordinator agent-id build-id report-json) @@ -1939,3 +2126,142 @@ ;; Trigger build allocation, so that the allocator can handle this setup ;; failure (trigger-build-allocation build-coordinator)) + +(define (build-coordinator-trigger-background-job-processing + build-coordinator + type) + (let ((condition + (assq-ref (build-coordinator-background-job-conditions + build-coordinator) + type))) + (unless condition + (error + (simple-format #f "unknown condition ~A" type))) + (signal-reusable-condition! condition))) + +(define (start-background-job-processing-fibers build-coordinator) + (define %background-job-duration-histogram-buckets + (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf))) + + (define* (start-job-fibers type proc #:key (parallelism 1)) + (let ((coordination-channel + (make-channel)) + (condition + (make-reusable-condition)) + (process-in-fiber + (fiberize + (lambda args + (call-with-duration-metric + (build-coordinator-metrics-registry build-coordinator) + "coordinator_background_job_duration_seconds" + (lambda () + (call-with-delay-logging proc #:args args)) + #:labels '(name) + #:label-values `((name . ,type)) + #:buckets %background-job-duration-histogram-buckets)) + #:parallelism parallelism)) + (job-exception-counter-metric + (make-counter-metric + (build-coordinator-metrics-registry build-coordinator) + "coordinator_background_job_failures_total" + #:labels '(name)))) + + (define (process id . args) + (spawn-fiber + (lambda () + (let loop ((retry 0)) + (log-msg (build-coordinator-logger build-coordinator) + 'DEBUG + "processing " type " background job (id: " + id ", args: " args ", retry: " retry ")") + (let ((success? + (with-exception-handler + (lambda (exn) + (log-msg (build-coordinator-logger build-coordinator) + 'WARN + type " background job error (id: " + id "): " exn) + #f) + (lambda () + (apply process-in-fiber args)) + #:unwind? #t))) + (if success? + (begin + (datastore-delete-background-job + (build-coordinator-datastore build-coordinator) + id) + (put-message coordination-channel + (list 'job-finished id))) + (begin + (metric-increment job-exception-counter-metric + #:label-values `((name . ,type))) + (sleep 30) + (loop (+ 1 retry))))))))) + + (spawn-fiber + (lambda () + (while #t + (let ((job-details + (datastore-select-background-jobs + (build-coordinator-datastore build-coordinator) + type + #:limit (* 2 parallelism)))) + + (unless (null? job-details) + (put-message coordination-channel + (list 'process job-details))) + + (reusable-condition-wait condition + #:timeout 30))))) + + (spawn-fiber + (lambda () + (let loop ((running-job-ids '())) + (match (get-message coordination-channel) + (('process jobs) + (let* ((job-ids (map (lambda (job) + (assq-ref job 'id)) + jobs)) + (new-ids + (lset-difference = job-ids running-job-ids)) + (jobs-to-start + (take new-ids + (min + (- parallelism + (length running-job-ids)) + (length new-ids))))) + (for-each (lambda (job) + (apply process + (assq-ref job 'id) + (assq-ref job 'args))) + (filter + (lambda (job-details) + (member (assq-ref job-details 'id) + jobs-to-start)) + jobs)) + (loop (append running-job-ids + jobs-to-start)))) + (('job-finished id) + ;; Maybe not very efficient, but should work + (signal-reusable-condition! condition) + (log-msg (build-coordinator-logger build-coordinator) + 'DEBUG type + " background job " id + " finished successfully") + (loop (delete id running-job-ids))))))) + + condition)) + + `((build-success . ,(start-job-fibers + 'build-success + (lambda (build-id) + (datastore-update-unprocessed-builds-for-build-success + (build-coordinator-datastore build-coordinator) + build-id)) + #:parallelism 24)))) + +(define (build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built + build-coordinator) + (datastore-check-and-correct-unprocessed-builds-all-inputs-built + (build-coordinator-datastore build-coordinator) + #:progress-reporter progress-reporter/bar)) diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm index dc4fec6..5768630 100644 --- a/guix-build-coordinator/datastore.scm +++ b/guix-build-coordinator/datastore.scm @@ -6,7 +6,8 @@ #:use-module (guix-build-coordinator datastore postgresql) #:duplicates (merge-generics) #:export (database-uri->datastore - datastore-find-build-output)) + datastore-find-build-output + datastore-validate-datetime-string)) (re-export datastore-optimize) (re-export datastore-spawn-fibers) @@ -92,19 +93,24 @@ (re-export datastore-fetch-build-to-allocate) (re-export datastore-check-if-derivation-conflicts?) (re-export datastore-insert-to-allocated-builds) +(re-export datastore-update-allocated-build-submit-outputs) +(re-export datastore-insert-background-job) +(re-export datastore-delete-background-job) +(re-export datastore-select-background-jobs) +(re-export datastore-check-and-correct-unprocessed-builds-all-inputs-built) (define* (database-uri->datastore database #:key metrics-registry - worker-thread-log-exception?) + thread-pool-log-exception?) (cond ((string-prefix? "pg://" database) (postgresql-datastore database)) ((string-prefix? "sqlite://" database) (sqlite-datastore database #:metrics-registry metrics-registry - #:worker-thread-log-exception? - worker-thread-log-exception?)) + #:thread-pool-log-exception? + thread-pool-log-exception?)) (else (error (simple-format #f "Unknown database ~A" database))))) @@ -119,3 +125,6 @@ (assq-ref output 'output) #f)) outputs))) + +(define (datastore-validate-datetime-string s) + (strftime "%F %T" (car (strptime "%F %T" s)))) diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index ac04362..bb1d8e8 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -10,8 +10,11 @@ #:use-module (ice-9 exceptions) #:use-module (sqlite3) #:use-module (fibers) + #:use-module (knots parallelism) + #:use-module (knots thread-pool) #:use-module (prometheus) #:use-module (guix base16) + #:use-module (guix progress) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) @@ -102,21 +105,31 @@ datastore-update-agent-requested-systems datastore-fetch-build-to-allocate datastore-check-if-derivation-conflicts? - datastore-insert-to-allocated-builds)) + datastore-insert-to-allocated-builds + datastore-update-allocated-build-submit-outputs + datastore-insert-background-job + datastore-delete-background-job + datastore-select-background-jobs + datastore-check-and-correct-unprocessed-builds-all-inputs-built)) + +(define %transaction-duration-histogram-buckets + (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf))) (define-class <sqlite-datastore> (<abstract-datastore>) database-file - worker-reader-thread-channel - worker-reader-thread-proc-vector - worker-writer-thread-channel - worker-writer-thread-proc-vector + reader-thread-pool + writer-thread-pool + low-priority-writer-thread-channel + default-priority-writer-thread-channel + high-priority-writer-thread-channel + writer-thread-channel-queue-stats metrics-registry) (define* (sqlite-datastore database-uri #:key update-database? metrics-registry - worker-thread-log-exception?) + thread-pool-log-exception?) (define database-file (string-drop database-uri (string-length "sqlite://"))) @@ -131,7 +144,8 @@ (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA optimize;") - (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") + (with-time-logging "truncating the WAL" + (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")) (sqlite-close db)) (let ((datastore (make <sqlite-datastore>))) @@ -139,9 +153,11 @@ (slot-set! datastore 'database-file database-file) (slot-set! datastore 'metrics-registry metrics-registry) - (let ((channel - proc-vector - (make-worker-thread-channel + (let ((writer-thread-pool + (make-thread-pool + ;; SQLite doesn't support parallel writes + 1 + #:thread-initializer (lambda () (let ((db (db-open database-file))) @@ -152,7 +168,7 @@ (list db))) #:name "ds write" - #:destructor + #:thread-destructor (let ((writer-thread-destructor-counter (make-gauge-metric metrics-registry "datastore_writer_thread_close_total"))) @@ -164,16 +180,16 @@ (metric-increment writer-thread-destructor-counter) (sqlite-close db))) - #:lifetime 500 + #:thread-lifetime 500 #:expire-on-exception? #t - ;; SQLite doesn't support parallel writes - #:parallelism 1 #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry - "datastore_write_delay_seconds"))) - (lambda (seconds-delayed) + "datastore_write_delay_seconds" + #:buckets + %transaction-duration-histogram-buckets))) + (lambda (seconds-delayed proc) (metric-observe delay-metric ;; TODO exact->inexact to work around ;; a bug in guile-prometheus where @@ -182,30 +198,41 @@ (exact->inexact seconds-delayed)) (log-delay "datastore write" seconds-delayed) (when (> seconds-delayed 1) - (format + (format/safe (current-error-port) "warning: database write delayed by ~1,2f seconds~%" seconds-delayed)))) #:duration-logger (lambda (duration proc) (when (> duration 10) - (format + (format/safe (current-error-port) "warning: database write took ~1,2f seconds (~a)~%" duration proc))) - #:log-exception? worker-thread-log-exception?))) + #:log-exception? thread-pool-log-exception?))) (slot-set! datastore - 'worker-writer-thread-channel - channel) + 'writer-thread-pool + writer-thread-pool) + ;; This is changed in datastore-spawn-fibers + (slot-set! datastore + 'low-priority-writer-thread-channel + (thread-pool-channel writer-thread-pool)) + (slot-set! datastore + 'default-priority-writer-thread-channel + (thread-pool-channel writer-thread-pool)) (slot-set! datastore - 'worker-writer-thread-proc-vector - proc-vector)) + 'high-priority-writer-thread-channel + (thread-pool-channel writer-thread-pool))) - (let ((channel - proc-vector - (make-worker-thread-channel + (let ((reader-thread-pool + (make-thread-pool + ;; Use a minimum of 8 and a maximum of 12 threads + (min (max (current-processor-count) + 8) + 12) + #:thread-initializer (lambda () (let ((db (db-open database-file #:write? #f))) @@ -214,27 +241,24 @@ (sqlite-exec db "PRAGMA cache_size = -16000;") (list db))) - #:name "ds read" - #:destructor + #:thread-destructor (let ((reader-thread-destructor-counter (make-gauge-metric metrics-registry "datastore_reader_thread_close_total"))) (lambda (db) (metric-increment reader-thread-destructor-counter) (sqlite-close db))) - #:lifetime 50000 + #:name "ds read" + #:thread-lifetime 50000 #:expire-on-exception? #t - ;; Use a minimum of 8 and a maximum of 16 threads - #:parallelism - (min (max (current-processor-count) - 8) - 16) #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry - "datastore_read_delay_seconds"))) - (lambda (seconds-delayed) + "datastore_read_delay_seconds" + #:buckets + %transaction-duration-histogram-buckets))) + (lambda (seconds-delayed proc) (metric-observe delay-metric ;; TODO exact->inexact to work around ;; a bug in guile-prometheus where @@ -243,28 +267,50 @@ (exact->inexact seconds-delayed)) (log-delay "datastore read" seconds-delayed) (when (> seconds-delayed 1) - (format + (format/safe (current-error-port) "warning: database read delayed by ~1,2f seconds~%" seconds-delayed)))) #:duration-logger (lambda (duration proc) (when (> duration 30) - (format + (format/safe (current-error-port) "warning: database read took ~1,2f seconds (~a)~%" duration proc))) - #:log-exception? worker-thread-log-exception?))) - (slot-set! datastore - 'worker-reader-thread-channel - channel) + #:log-exception? thread-pool-log-exception?))) + + (let ((metric (make-gauge-metric + metrics-registry + "datastore_reader_threads_total"))) + (metric-set metric + (vector-length + (thread-pool-proc-vector reader-thread-pool)))) + + (slot-set! datastore - 'worker-reader-thread-proc-vector - proc-vector)) + 'reader-thread-pool + reader-thread-pool)) datastore)) +(define* (call-with-writer-thread + datastore + proc + #:key priority duration-logger) + (call-with-thread + (slot-ref datastore 'writer-thread-pool) + proc + #:duration-logger duration-logger + #:channel + (slot-ref datastore + (assq-ref + '((high . high-priority-writer-thread-channel) + (default . default-priority-writer-thread-channel) + (low . low-priority-writer-thread-channel)) + (or priority 'default))))) + (define (sqlite-step-and-reset statement) (let ((val (sqlite-step statement))) (sqlite-reset statement) @@ -305,12 +351,12 @@ (#(blocked? modified-page-count pages-moved-to-db) (if (= blocked? 1) (begin - (simple-format + (simple-format/safe (current-error-port) "warning: wal checkpoint blocked\n") #f) (begin - (simple-format + (simple-format/safe (current-error-port) "wal checkpoint completed (~A, ~A)\n" modified-page-count @@ -331,8 +377,8 @@ PRAGMA optimize;") (define-method (datastore-optimize (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (db-optimize db @@ -343,22 +389,37 @@ PRAGMA optimize;") (datastore <sqlite-datastore>)) ;; Queue messages to the writer thread, so that they're handled in a first ;; come first served manor - (slot-set! - datastore - 'worker-writer-thread-channel - (make-queueing-channel - (slot-ref datastore 'worker-writer-thread-channel))) + (let ((queues + queue-stats + (make-discrete-priority-queueing-channels + (thread-pool-channel + (slot-ref datastore 'writer-thread-pool)) + 3))) + (match queues + ((high default low) + (slot-set! datastore 'high-priority-writer-thread-channel high) + (slot-set! datastore 'default-priority-writer-thread-channel default) + (slot-set! datastore 'low-priority-writer-thread-channel low))) + (slot-set! datastore + 'writer-thread-channel-queue-stats + queue-stats)) (spawn-fiber (lambda () (while #t (sleep 20) - (vector-for-each - (lambda (i proc) - (simple-format (current-error-port) - "reader thread ~A running: ~A\n" - i proc)) - (slot-ref datastore 'worker-reader-thread-proc-vector))))) + (let ((procs + (vector->list + (thread-pool-proc-vector + (slot-ref datastore 'reader-thread-pool))))) + (when (every procedure? procs) + (for-each + (lambda (i proc) + (simple-format/safe (current-error-port) + "reader thread ~A running: ~A\n" + i proc)) + (iota (length procs)) + procs)))))) (spawn-fiber (lambda () @@ -366,9 +427,9 @@ PRAGMA optimize;") (sleep (* 60 10)) ; 10 minutes (with-exception-handler (lambda (exn) - (simple-format (current-error-port) - "exception when performing WAL checkpoint: ~A\n" - exn)) + (simple-format/safe (current-error-port) + "exception when performing WAL checkpoint: ~A\n" + exn)) (lambda () (with-time-logging "performing regular database maintenance" @@ -393,11 +454,18 @@ PRAGMA optimize;") (let ((setup-failures-total (make-gauge-metric registry "setup_failures_total" - #:labels '(agent_id reason)))) - - (letpar& ((setup-failure-counts - (with-time-logging "counting setup failures" - (datastore-count-setup-failures datastore)))) + #:labels '(agent_id reason))) + (background-jobs-inserted-total + (make-counter-metric registry + "coordinator_background_job_inserted_total" + #:labels '(name)))) + + (fibers-let ((setup-failure-counts + (with-time-logging "counting setup failures" + (datastore-count-setup-failures datastore))) + (background-job-counts + (with-time-logging "counting background jobs" + (datastore-count-background-jobs datastore)))) (for-each (match-lambda (((agent-id reason) . count) @@ -406,7 +474,14 @@ PRAGMA optimize;") #:label-values `((agent_id . ,agent-id) (reason . ,reason))))) - setup-failure-counts))) + setup-failure-counts) + + (for-each (match-lambda + ((type . count) + (metric-increment background-jobs-inserted-total + #:by count + #:label-values `((name . ,type))))) + background-job-counts))) #t) (define-method (datastore-update-metrics! @@ -439,12 +514,38 @@ PRAGMA optimize;") "datastore_wal_bytes") (make-gauge-metric registry "datastore_wal_bytes" - #:docstring "Size of the SQLite Write Ahead Log file")))) - - (letpar& ((build-counts - (datastore-count-builds datastore)) - (build-result-counts - (datastore-count-build-results datastore))) + #:docstring "Size of the SQLite Write Ahead Log file"))) + (reader-threads-used-metric + (or (metrics-registry-fetch-metric registry + "datastore_reader_threads_used_total") + (make-gauge-metric + registry "datastore_reader_threads_used_total"))) + (writer-queue-length-metric + (or (metrics-registry-fetch-metric registry + "datastore_writer_queue_total") + (make-gauge-metric + registry "datastore_writer_queue_total" + #:labels '(priority))))) + + (metric-set reader-threads-used-metric + (vector-count + (lambda (_ proc) + (->bool proc)) + (thread-pool-proc-vector + (slot-ref datastore 'reader-thread-pool)))) + + (for-each + (lambda (priority stats) + (metric-set writer-queue-length-metric + (assq-ref stats 'length) + #:label-values `((priority . ,priority)))) + '(high default low) + ((slot-ref datastore 'writer-thread-channel-queue-stats))) + + (fibers-let ((build-counts + (datastore-count-builds datastore)) + (build-result-counts + (datastore-count-build-results datastore))) (for-each (match-lambda ((system . count) (metric-set builds-total @@ -477,9 +578,10 @@ PRAGMA optimize;") internal-time-units-per-second)) (apply values vals))))) -(define (metric-observe-duration datastore - thing - duration-seconds) +(define* (metric-observe-duration datastore + thing + duration-seconds + #:key (buckets %default-histogram-buckets)) (define registry (slot-ref datastore 'metrics-registry)) (define metric-name (string-append "datastore_" thing "_duration_seconds")) @@ -487,15 +589,25 @@ PRAGMA optimize;") (let ((metric (or (metrics-registry-fetch-metric registry metric-name) (make-histogram-metric registry - metric-name)))) + metric-name + #:buckets buckets)))) (metric-observe metric duration-seconds))) -(define (call-with-worker-thread/delay-logging channel proc) - (call-with-worker-thread channel +(define (call-with-thread/delay-logging thread-pool proc) + (call-with-thread thread-pool + proc + #:duration-logger + (lambda (duration) + (log-delay proc duration)))) + +(define* (call-with-writer-thread/delay-logging datastore proc + #:key priority) + (call-with-writer-thread datastore proc #:duration-logger (lambda (duration) - (log-delay proc duration)))) + (log-delay proc duration)) + #:priority priority)) (define-exception-type &transaction-rollback-exception &exception make-transaction-rollback-exception @@ -509,21 +621,24 @@ PRAGMA optimize;") #:key readonly? (immediate? (not readonly?)) - duration-metric-name) + priority + duration-metric-name + (duration-metric-buckets + %transaction-duration-histogram-buckets)) (define (run-proc-within-transaction db) (define (attempt-begin) (with-exception-handler (lambda (exn) (match (exception-args exn) (('sqlite-exec 5 msg) - (simple-format + (simple-format/safe (current-error-port) "warning: issue starting transaction (code: 5, proc: ~A): ~A\n" proc msg) #f) (_ - (simple-format (current-error-port) - "exception starting transaction\n") + (simple-format/safe (current-error-port) + "exception starting transaction\n") (raise-exception exn)))) (lambda () (sqlite-exec db (if immediate? @@ -537,14 +652,14 @@ PRAGMA optimize;") (lambda (exn) (match (exception-args exn) (('sqlite-exec 5 msg) - (simple-format + (simple-format/safe (current-error-port) "warning: attempt commit (code: 5, proc: ~A): ~A\n" proc msg) #f) (_ - (simple-format (current-error-port) - "exception committing transaction\n") + (simple-format/safe (current-error-port) + "exception committing transaction\n") (raise-exception exn)))) (lambda () (sqlite-exec db "COMMIT TRANSACTION;") @@ -552,63 +667,88 @@ PRAGMA optimize;") #:unwind? #t)) (if (attempt-begin) - (call-with-values - (lambda () - (with-exception-handler - (lambda (exn) - (if (transaction-rollback-exception? exn) - (begin - (sqlite-exec db "ROLLBACK TRANSACTION;") - (transaction-rollback-exception-return-value exn)) - (begin - (simple-format (current-error-port) - "error: sqlite rolling back transaction (~A)\n" - exn) - (sqlite-exec db "ROLLBACK TRANSACTION;") - (raise-exception exn)))) + (with-exception-handler + (lambda (exn) + (if (transaction-rollback-exception? exn) + (begin + (sqlite-exec db "ROLLBACK TRANSACTION;") + (transaction-rollback-exception-return-value exn)) + (begin + (simple-format/safe + (current-error-port) + "error: sqlite rolling back transaction (~A)\n" + exn) + (sqlite-exec db "ROLLBACK TRANSACTION;") + (raise-exception exn)))) + (lambda () + (call-with-values (lambda () - (parameterize ((%current-transaction-proc proc)) - (call-with-delay-logging proc #:args (list db)))) - #:unwind? #t)) - (lambda vals - (let loop ((success? (attempt-commit))) - (if success? - (apply values vals) - (loop (attempt-commit)))))) + (with-exception-handler + (lambda (exn) + (unless (transaction-rollback-exception? exn) + (backtrace)) + (raise-exception exn)) + (lambda () + (parameterize ((%current-transaction-proc proc) + ;; Set the arguments parameter for the + ;; reader thread pool so that any nested + ;; calls to call-with-thread for the + ;; reader thread pool just use the writer + ;; db connection and thus this + ;; transaction + ((thread-pool-arguments-parameter + (slot-ref datastore 'reader-thread-pool)) + (list db))) + (call-with-delay-logging proc #:args (list db)))))) + (lambda vals + (let loop ((success? (attempt-commit))) + (if success? + (apply values vals) + (loop (attempt-commit))))))) + #:unwind? #t) ;; Database is busy, so retry (run-proc-within-transaction db))) - (call-with-worker-thread + (call-with-thread (slot-ref datastore (if readonly? - 'worker-reader-thread-channel - 'worker-writer-thread-channel)) + 'reader-thread-pool + 'writer-thread-pool)) (lambda (db) (if (%current-transaction-proc) (call-with-delay-logging proc #:args (list db)) ; already in transaction (run-proc-within-transaction db))) + #:channel + (if readonly? + (thread-pool-channel + (slot-ref datastore 'reader-thread-pool)) + (slot-ref datastore + (assq-ref + '((high . high-priority-writer-thread-channel) + (default . default-priority-writer-thread-channel) + (low . low-priority-writer-thread-channel)) + (or priority 'default)))) #:duration-logger (lambda (duration-seconds) (when (and (not readonly?) (> duration-seconds 2)) - (display - (format - #f - "warning: ~a:\n took ~4f seconds in transaction\n" - proc - duration-seconds) - (current-error-port)) - - (when duration-metric-name - (metric-observe-duration datastore - duration-metric-name - duration-seconds)))))) + (format/safe + (current-error-port) + "warning: ~a:\n took ~4f seconds in transaction\n" + proc + duration-seconds)) + + (when duration-metric-name + (metric-observe-duration datastore + duration-metric-name + duration-seconds + #:buckets duration-metric-buckets))))) (define-method (datastore-find-agent (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -636,8 +776,8 @@ SELECT description FROM agents WHERE id = :id" (define-method (datastore-find-agent-by-name (datastore <sqlite-datastore>) name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -664,8 +804,8 @@ SELECT id FROM agents WHERE name = :name" (define-method (datastore-insert-dynamic-auth-token (datastore <sqlite-datastore>) token) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -683,8 +823,8 @@ INSERT INTO dynamic_auth_tokens (token) VALUES (:token)" (define-method (datastore-dynamic-auth-token-exists? (datastore <sqlite-datastore>) token) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -711,8 +851,8 @@ SELECT 1 FROM dynamic_auth_tokens WHERE token = :token" (define-method (datastore-fetch-agent-tags (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -748,16 +888,16 @@ WHERE agent_tags.agent_id = :agent_id" uuid name description) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (insert-agent db uuid name description))) #t) (define-method (datastore-list-agents (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -785,8 +925,8 @@ SELECT id, name, description, active FROM agents ORDER BY id" (unless (boolean? active?) (error "datastore-set-agent-active called with non-boolean")) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -805,8 +945,8 @@ UPDATE agents SET active = :active WHERE id = :uuid" (define-method (datastore-find-agent-status (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -840,8 +980,8 @@ WHERE agent_id = :agent_id" 1min-load-average system-uptime processor-count) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -877,8 +1017,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr (datastore <sqlite-datastore>) agent-uuid password) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (insert-agent-password db agent-uuid password))) #t) @@ -887,8 +1027,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr (datastore <sqlite-datastore>) uuid password) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -910,8 +1050,8 @@ WHERE agent_id = :agent_id AND password = :password" (define-method (datastore-agent-list-passwords (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1008,6 +1148,7 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)" (lambda (db) (insert-derivation-and-return-outputs db derivation) (hash-clear! %derivation-outputs-cache)) + #:priority 'low #:duration-metric-name "store_derivation") #t) @@ -1015,8 +1156,8 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)" (define-method (datastore-build-exists-for-derivation-outputs? (datastore <sqlite-datastore>) derivation) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1048,8 +1189,8 @@ WHERE derivation_outputs.derivation_id = :derivation_id (define-method (datastore-build-required-by-another? (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1143,8 +1284,8 @@ SELECT name FROM derivations WHERE id = :id" (match (sqlite-step-and-reset statement) (#(name) name)))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let loop ((derivation-ids (list (db-find-derivation-id db derivation))) (result '())) @@ -1169,8 +1310,8 @@ SELECT name FROM derivations WHERE id = :id" args) (apply (lambda* (system #:key include-cancelled?) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1258,8 +1399,8 @@ FROM ( (define-method (datastore-list-builds-for-derivation-recursive-inputs (datastore <sqlite-datastore>) derivation) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1296,8 +1437,8 @@ INNER JOIN related_derivations (define-method (datastore-find-unprocessed-build-entry (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1326,8 +1467,8 @@ WHERE build_id = :build_id" (datastore <sqlite-datastore>) build-uuid tags) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((insert-tag-statement (sqlite-prepare @@ -1527,8 +1668,8 @@ WHERE build_id = :build_id" uuid explicit-priority-lower-bound) (define builds-to-consider - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) ;; Recursively find builds for all missing outputs that this build ;; takes as inputs. The order is important here, since we want to @@ -1678,8 +1819,8 @@ WHERE build_id = :build_id" override-derived-priority) (let ((build-id old-priority - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((build-id (db-find-build-id db uuid))) @@ -1738,8 +1879,8 @@ WHERE build_id = :build_id" (define-method (datastore-count-build-results (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1797,8 +1938,8 @@ VALUES (:agent_id, :result, 1)" #t)))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -1849,14 +1990,12 @@ LIMIT 1" (#f #t) (#(1) #f)))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (let ((builds-statement - (sqlite-prepare - db - " -SELECT DISTINCT unprocessed_builds.id + (define (all-build-ids db) + (let ((statement + (sqlite-prepare + db + " +SELECT input_builds.id FROM builds INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id @@ -1864,45 +2003,62 @@ INNER JOIN derivation_outputs AS all_derivation_outputs ON all_derivation_outputs.output_id = derivation_outputs.output_id INNER JOIN derivation_inputs ON derivation_inputs.derivation_output_id = all_derivation_outputs.id -INNER JOIN builds AS unprocessed_builds - ON unprocessed_builds.processed = 0 - AND unprocessed_builds.derivation_id = derivation_inputs.derivation_id -INNER JOIN unprocessed_builds_with_derived_priorities - ON unprocessed_builds_with_derived_priorities.build_id = unprocessed_builds.id - AND unprocessed_builds_with_derived_priorities.all_inputs_built = 0 +INNER JOIN builds AS input_builds + ON input_builds.processed = 0 + AND input_builds.canceled = 0 + AND input_builds.derivation_id = derivation_inputs.derivation_id WHERE builds.id = :build_id" - #:cache? #t)) + #:cache? #t))) - (update-statement - (sqlite-prepare - db - " + (sqlite-bind-arguments + statement + #:build_id (db-find-build-id db build-uuid)) + + (let ((result + (sqlite-fold + (lambda (row result) + (match row + (#(build-id) + (if (all-inputs-built? db build-id) + (cons build-id result) + result)))) + '() + statement))) + (sqlite-reset statement) + + result))) + + (let ((build-ids + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (all-build-ids db))))) + (call-with-writer-thread/delay-logging + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " UPDATE unprocessed_builds_with_derived_priorities SET all_inputs_built = 1 WHERE build_id = :build_id" - #:cache? #t))) + #:cache? #t))) + + (for-each + (lambda (build-id) + (sqlite-bind-arguments statement + #:build_id build-id) + (sqlite-step-and-reset statement)) + build-ids) - (sqlite-bind-arguments builds-statement - #:build_id (db-find-build-id db build-uuid)) - - (sqlite-fold - (lambda (row result) - (match row - (#(build-id) - (when (all-inputs-built? db build-id) - (sqlite-bind-arguments update-statement - #:build_id build-id) - (sqlite-step-and-reset update-statement)))) - #f) - #f - builds-statement) - (sqlite-reset builds-statement))))) + #t))))) (define-method (datastore-remove-build-allocation (datastore <sqlite-datastore>) build-uuid agent-id) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -1924,8 +2080,8 @@ DELETE FROM allocated_builds (define-method (datastore-mark-build-as-processed (datastore <sqlite-datastore>) build-uuid end-time) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -1961,8 +2117,8 @@ DELETE FROM unprocessed_builds_with_derived_priorities (define-method (datastore-delete-relevant-outputs-from-unbuilt-outputs (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -1989,8 +2145,8 @@ WHERE output_id IN ( (datastore <sqlite-datastore>) build-uuid output-metadata) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (define (name->output-id name) (let ((statement @@ -2067,8 +2223,8 @@ INSERT INTO build_starts ( (define-method (datastore-find-build-starts (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2177,8 +2333,8 @@ INSERT INTO setup_failure_missing_inputs ( (define-method (datastore-list-setup-failure-missing-inputs (datastore <sqlite-datastore>) setup-failure-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2206,8 +2362,8 @@ WHERE setup_failure_id = :id" build-uuid agent-id failure-reason) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (insert-setup-failure-and-remove-allocation db (db-find-build-id db build-uuid) @@ -2224,8 +2380,8 @@ WHERE setup_failure_id = :id" (define-method (datastore-count-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2248,8 +2404,8 @@ FROM builds_counts" (define-method (datastore-for-each-build (datastore <sqlite-datastore>) proc) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2288,8 +2444,8 @@ SELECT id FROM builds WHERE uuid = :uuid" (define-method (datastore-find-build (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2347,12 +2503,14 @@ WHERE uuid = :uuid" (canceled 'unset) (priority-> 'unset) (priority-< 'unset) + (created-at-> 'unset) + (created-at-< 'unset) (after-id #f) (limit #f) ;; other-builds-dependent or no-dependent-builds (relationship 'unset)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (define tag->expression (let ((statement @@ -2365,7 +2523,7 @@ SELECT id FROM tags WHERE key = :key AND value = :value" (sqlite-prepare db " -SELECT id FROM tags WHERE key = :key" +SELECT 1 FROM tags WHERE key = :key LIMIT 1" #:cache? #t))) (lambda (tag not?) (match tag @@ -2386,28 +2544,27 @@ SELECT id FROM tags WHERE key = :key" (sqlite-bind-arguments key-statement #:key key) - (let* ((tag-ids (sqlite-map - (match-lambda - (#(id) id)) - key-statement)) - (result - (string-append - "(" - (string-join - (map - (lambda (id) + (let ((tag-with-key-exists? + (->bool (sqlite-step-and-reset key-statement)))) + (if tag-with-key-exists? + (let ((result (string-append + "(" (if not? "NOT " "") - "EXISTS (SELECT 1 FROM build_tags " - "WHERE build_id = builds.id AND tag_id = " - (number->string id) - ")")) - tag-ids) - (if not? " AND " " OR ")) - ")"))) - (sqlite-reset key-statement) - - result)))))) + " +EXISTS ( + SELECT 1 + FROM build_tags + INNER JOIN tags ON build_tags.tag_id = tags.id + WHERE build_id = builds.id + AND tags.key = '" + key "' +)" + ")"))) + result) + (if not? + "TRUE" + "FALSE")))))))) (let ((tag-expressions (map (lambda (tag) @@ -2429,6 +2586,8 @@ SELECT id FROM tags WHERE key = :key" (not (null? not-systems)) (not (eq? priority-> 'unset)) (not (eq? priority-< 'unset)) + (not (eq? created-at-> 'unset)) + (not (eq? created-at-< 'unset)) (not (eq? processed 'unset)) (not (eq? canceled 'unset)) (not (eq? relationship 'unset)) @@ -2479,6 +2638,14 @@ INNER JOIN derivations (list (simple-format #f "priority < ~A" priority-<)) '()) + (if (string? created-at->) + (list + (simple-format #f "created_at > '~A'" created-at->)) + '()) + (if (string? created-at-<) + (list + (simple-format #f "created_at < '~A'" created-at-<)) + '()) (cond ((eq? processed #t) '("processed = 1")) ((eq? processed #f) '("processed = 0")) @@ -2582,8 +2749,8 @@ WHERE derivation_outputs.derivation_id = builds.derivation_id)")) (define-method (datastore-fetch-build-tags (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2617,8 +2784,8 @@ WHERE build_tags.build_id = :build_id" (define-method (datastore-find-build-result (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2643,8 +2810,8 @@ WHERE build_id = :build_id" (define-method (datastore-find-build-derivation-system (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2680,8 +2847,8 @@ WHERE builds.id = :build_id" datastore "list_builds_for_output" (lambda () - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2729,8 +2896,8 @@ WHERE derivation_outputs.output_id = :output_id" rest) (apply (lambda* (output system #:key include-canceled?) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2774,8 +2941,8 @@ WHERE derivation_outputs.output_id = :output_id rest) (apply (lambda* (derivation #:key (include-canceled? #t)) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2810,8 +2977,8 @@ WHERE derivations.name = :derivation" (define-method (datastore-count-setup-failures (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2835,8 +3002,8 @@ GROUP BY agent_id, failure_reason" (define-method (datastore-list-setup-failures-for-build (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2868,8 +3035,8 @@ WHERE build_id = :build_id" args) (apply (lambda* (#:key agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2920,8 +3087,8 @@ WHERE builds.processed = 0 (define-method (datastore-list-processed-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2947,8 +3114,8 @@ WHERE processed = 1" (define-method (datastore-list-unprocessed-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2985,8 +3152,8 @@ ORDER BY priority DESC" (define-method (datastore-find-deferred-build (datastore <sqlite-datastore>) select?) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3030,9 +3197,11 @@ ORDER BY deferred_until ASC" (sqlite-prepare db " -SELECT builds.uuid +SELECT builds.uuid, systems.system FROM unprocessed_builds_with_derived_priorities INNER JOIN builds ON build_id = builds.id +INNER JOIN derivations ON builds.derivation_id = derivations.id +INNER JOIN systems ON derivations.system_id = systems.id WHERE all_inputs_built = 1 AND NOT EXISTS ( SELECT 1 @@ -3042,26 +3211,21 @@ WHERE all_inputs_built = 1 ) ORDER BY derived_priority ASC" #:cache? #t))) - (let ((result (sqlite-fold - (lambda (row result) - (cons (vector-ref row 0) - result)) - '() - statement))) + (let ((result (sqlite-fold cons '() statement))) (sqlite-reset statement) result))) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) fetch-prioritised-unprocessed-builds)) (define-method (datastore-insert-unprocessed-hook-event (datastore <sqlite-datastore>) event arguments) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (insert-unprocessed-hook-event db event @@ -3091,8 +3255,8 @@ VALUES (:event, :arguments)" (define-method (datastore-count-unprocessed-hook-events (datastore <sqlite-datastore>)) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3115,8 +3279,8 @@ SELECT event, COUNT(*) FROM unprocessed_hook_events GROUP BY event" (datastore <sqlite-datastore>) event limit) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3149,8 +3313,8 @@ LIMIT :limit" (define-method (datastore-find-unprocessed-hook-event (datastore <sqlite-datastore>) id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3176,8 +3340,8 @@ WHERE id = :id" (define-method (datastore-delete-unprocessed-hook-event (datastore <sqlite-datastore>) id) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -3190,13 +3354,14 @@ DELETE FROM unprocessed_hook_events WHERE id = :id" statement #:id id) - (sqlite-step-and-reset statement)))) + (sqlite-step-and-reset statement))) + #:priority 'high) #t) (define-method (datastore-count-allocated-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3218,8 +3383,8 @@ SELECT agent_id, COUNT(*) FROM allocated_builds GROUP BY agent_id" (define-method (datastore-agent-requested-systems (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3293,8 +3458,8 @@ INSERT INTO build_allocation_agent_requested_systems (agent_id, system_id) VALUE (define-method (datastore-fetch-build-to-allocate (datastore <sqlite-datastore>) build-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3322,8 +3487,8 @@ WHERE builds.uuid = :uuid (datastore <sqlite-datastore>) agent-id derivation-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3352,98 +3517,54 @@ WHERE build_derivation_outputs.derivation_id = :derivation_id (define-method (datastore-insert-to-allocated-builds (datastore <sqlite-datastore>) agent-id - build-uuids) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + build-uuid) + (call-with-writer-thread + datastore (lambda (db) - (sqlite-exec - db - (string-append - " -INSERT INTO allocated_builds (build_id, agent_id) VALUES " - (string-join - (map (lambda (build-uuid) - (simple-format - #f - "(~A, '~A')" - (db-find-build-id db build-uuid) - agent-id)) - build-uuids) - ", ") - ";"))))) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO allocated_builds (build_id, agent_id) + VALUES (:build_id, :agent_id)" + #:cache? #t))) -(define-method (datastore-list-allocation-plan-builds - (datastore <sqlite-datastore>) - . - rest) - (apply - (lambda* (agent-id #:key limit) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (let ((statement - (sqlite-prepare - db - (string-append - " -SELECT builds.uuid, derivations.name, systems.system, - builds.priority, - unprocessed_builds_with_derived_priorities.derived_priority -FROM builds -INNER JOIN derivations - ON builds.derivation_id = derivations.id -INNER JOIN systems - ON derivations.system_id = systems.id -INNER JOIN build_allocation_plan - ON builds.id = build_allocation_plan.build_id -LEFT JOIN unprocessed_builds_with_derived_priorities - ON builds.id = unprocessed_builds_with_derived_priorities.build_id -WHERE build_allocation_plan.agent_id = :agent_id - AND builds.processed = 0 - AND builds.id NOT IN (SELECT build_id FROM allocated_builds) -ORDER BY build_allocation_plan.ordering ASC" - (if limit - " -LIMIT :limit" - "")) - #:cache? #t))) + (sqlite-bind-arguments + statement + #:build_id (db-find-build-id db build-uuid) + #:agent_id agent-id) - (apply sqlite-bind-arguments - statement - #:agent_id agent-id - (if limit - (list #:limit limit) - '())) + (sqlite-step-and-reset statement)))) + #t) - (let ((builds (sqlite-map - (match-lambda - (#(uuid derivation_name system - priority derived_priority) - `((uuid . ,uuid) - (derivation_name . ,derivation_name) - (system . ,system) - (priority . ,priority) - (derived_priority . ,derived_priority) - (tags . ,(vector-map - (lambda (_ tag) - (match tag - ((key . value) - `((key . ,key) - (value . ,value))))) - (datastore-fetch-build-tags - datastore - uuid)))))) - statement))) - (sqlite-reset statement) +(define-method (datastore-update-allocated-build-submit-outputs + (datastore <sqlite-datastore>) + build-uuid + submit-outputs?) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +UPDATE allocated_builds +SET submit_outputs = :submit_outputs +WHERE build_id = :build_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:build_id (db-find-build-id db build-uuid) + #:submit_outputs (if submit-outputs? 1 0)) - builds))))) - rest)) + (sqlite-step-and-reset statement)))) + #t) (define-method (datastore-list-agent-builds (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3451,7 +3572,7 @@ LIMIT :limit" " SELECT builds.uuid, derivations.name, unprocessed_builds_with_derived_priorities.derived_priority, - builds.canceled + builds.canceled, allocated_builds.submit_outputs FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id @@ -3468,11 +3589,17 @@ WHERE allocated_builds.agent_id = :agent_id" (let ((builds (sqlite-map (match-lambda - (#(uuid derivation_name derived_priority canceled) + (#(uuid derivation_name derived_priority canceled + submit_outputs) `((uuid . ,uuid) (derivation_name . ,derivation_name) (derived_priority . ,derived_priority) - (canceled . ,(= 1 canceled))))) + (canceled . ,(= 1 canceled)) + (submit_outputs . ,(cond + ((not submit_outputs) + 'null) + (else + (= 1 submit_outputs))))))) statement))) (sqlite-reset statement) @@ -3481,8 +3608,8 @@ WHERE allocated_builds.agent_id = :agent_id" (define-method (datastore-agent-for-build (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3544,11 +3671,11 @@ WHERE build_results.build_id = :build_id" "_sqitch_registry.db") (string-append "db:sqlite:" database-file)))) - (simple-format #t "running command: ~A\n" - (string-join command)) + (simple-format/safe #t "running command: ~A\n" + (string-join command)) (let ((pid (spawn (%config 'sqitch) command))) (unless (zero? (cdr (waitpid pid))) - (simple-format + (simple-format/safe (current-error-port) "error: sqitch command failed\n") (exit 1))))) @@ -3638,16 +3765,16 @@ WHERE name = :name" (define-method (datastore-find-derivation (datastore <sqlite-datastore>) name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (db-find-derivation db name)))) (define-method (datastore-find-derivation-outputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3679,8 +3806,8 @@ WHERE derivation_id = :derivation_id" (define-method (datastore-find-derivation-output-details (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3722,8 +3849,8 @@ WHERE derivation_id = :derivation_id" (define-method (datastore-list-unbuilt-derivation-outputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3756,8 +3883,8 @@ WHERE derivation_id = :derivation_id" (define-method (datastore-list-build-outputs (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3801,8 +3928,8 @@ WHERE builds.id = :build_id" (define-method (datastore-find-derivation-system (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3825,8 +3952,8 @@ WHERE name = :name" (define-method (datastore-find-derivation-inputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3866,8 +3993,8 @@ WHERE derivations.id = :derivation_id" (define-method (datastore-find-recursive-derivation-input-outputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3908,8 +4035,8 @@ INNER JOIN outputs (datastore <sqlite-datastore>) start-derivation-name output) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3992,20 +4119,6 @@ SELECT system FROM systems WHERE id = :id" (define derivation-name (derivation-file-name derivation)) - (define (maybe-fix-fixed-output-field derivation-details) - (let ((fixed-output? (fixed-output-derivation? derivation))) - (unless (equal? (assq-ref derivation-details 'fixed-output?) - fixed-output?) - (let ((statement (sqlite-prepare - db - " -UPDATE derivations SET fixed_output = :fixed_output WHERE name = :name" - #:cache? #t))) - (sqlite-bind-arguments statement - #:name derivation-name - #:fixed_output (if fixed-output? 1 0)) - (sqlite-step-and-reset statement))))) - (define (insert-derivation) (let ((statement (sqlite-prepare @@ -4036,8 +4149,6 @@ INSERT INTO derivations (name, system_id, fixed_output) (db-find-derivation db derivation-name))) (if derivation-details (begin - (maybe-fix-fixed-output-field derivation-details) - (let ((derivation-outputs (select-derivation-outputs db derivation-name))) @@ -4346,15 +4457,15 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)" (lambda* (uuid drv-name priority defer-until #:key skip-updating-other-build-derived-priorities) (define system-id - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (db-system->system-id db (datastore-find-derivation-system datastore drv-name))))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let* ((build-id (insert-build db drv-name uuid priority defer-until)) @@ -4380,7 +4491,8 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)" #:args (list db build-id - derived-priority))))))) + derived-priority))))) + #:priority 'low)) rest) #t) @@ -4416,3 +4528,212 @@ VALUES (:agent_id, :password)" #:password password) (sqlite-step-and-reset statement))) + +(define-method (datastore-insert-background-job + (datastore <sqlite-datastore>) + type + args) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO background_jobs_queue (type, args) +VALUES (:type, :args) +RETURNING id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:type (symbol->string type) + #:args (call-with-output-string + (lambda (port) + (write args port)))) + + (match (sqlite-step-and-reset statement) + (#(id) + + (metric-increment + (metrics-registry-fetch-metric + (slot-ref datastore 'metrics-registry) + "coordinator_background_job_inserted_total") + #:label-values `((name . ,type))) + + id)))))) + +(define-method (datastore-delete-background-job + (datastore <sqlite-datastore>) + id) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +DELETE FROM background_jobs_queue WHERE id = :id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:id id) + + (sqlite-step-and-reset statement)) + #t) + #:priority 'high)) + +(define-method (datastore-select-background-jobs + (datastore <sqlite-datastore>) + . + args) + (apply + (lambda* (type #:key (limit 1)) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT id, args +FROM background_jobs_queue +WHERE type = :type +ORDER BY id ASC +LIMIT :limit" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:type (symbol->string type) + #:limit limit) + + (let ((result + (sqlite-map + (match-lambda + (#(id args) + `((id . ,id) + (args . ,(call-with-input-string args read))))) + statement))) + (sqlite-reset statement) + result))))) + args)) + +(define-method (datastore-count-background-jobs + (datastore <sqlite-datastore>)) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT type, COUNT(*) +FROM background_jobs_queue +GROUP BY type" + #:cache? #t))) + + (let ((result + (sqlite-map + (match-lambda + (#(type count) + (cons type count))) + statement))) + (sqlite-reset statement) + + result))))) + +(define-method (datastore-check-and-correct-unprocessed-builds-all-inputs-built + (datastore <sqlite-datastore>) + . + args) + (define entries-to-check + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT build_id, builds.derivation_id +FROM unprocessed_builds_with_derived_priorities +INNER JOIN builds ON builds.id = build_id +WHERE all_inputs_built = 0 +ORDER BY build_id DESC" + #:cache? #t))) + (let ((result (sqlite-map identity statement))) + (sqlite-reset statement) + result))))) + + (define (all-inputs-built? derivation-id) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT 1 +FROM derivation_inputs +INNER JOIN derivation_outputs + ON derivation_inputs.derivation_output_id = derivation_outputs.id +INNER JOIN unbuilt_outputs + ON unbuilt_outputs.output_id = derivation_outputs.output_id +WHERE derivation_inputs.derivation_id = :derivation_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:derivation_id derivation-id) + + (match (sqlite-step-and-reset statement) + (#(1) #f) + (#f #t)))))) + + (define (update build-id) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +UPDATE unprocessed_builds_with_derived_priorities +SET all_inputs_built = 1 +WHERE build_id = :build_id AND all_inputs_built = 0 +RETURNING 1" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:build_id build-id) + + ;; This is to cope with the check not being transactional, so we + ;; might go to update a record which has just been changed to have + ;; all_inputs_built = 1 + (match (sqlite-step-and-reset statement) + (#(1) #t) + (#f #f)))))) + + (apply + (lambda* (#:key (progress-reporter (const progress-reporter/silent))) + (let ((reporter + (progress-reporter (length entries-to-check)))) + (start-progress-reporter! reporter) + (let loop ((entries entries-to-check) + (updated-count 0)) + (if (null? entries) + (begin + (stop-progress-reporter! reporter) + updated-count) + (match (car entries) + (#(build-id derivation-id) + (progress-reporter-report! reporter) + (if (all-inputs-built? derivation-id) + (loop (cdr entries) + (+ updated-count + (if (update build-id) + 1 + 0))) + (loop (cdr entries) + updated-count)))))))) + args)) diff --git a/guix-build-coordinator/guix-data-service.scm b/guix-build-coordinator/guix-data-service.scm index e2ecbe7..584bbbd 100644 --- a/guix-build-coordinator/guix-data-service.scm +++ b/guix-build-coordinator/guix-data-service.scm @@ -27,6 +27,7 @@ #:use-module (json) #:use-module (web client) #:use-module (web response) + #:use-module (knots timeout) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator datastore) #:export (send-build-event-to-guix-data-service @@ -51,7 +52,8 @@ #:body body ;; Guile doesn't treat JSON as text, so decode the ;; body manually - #:decode-body? #f)))) + #:decode-body? #f)) + #:timeout 10)) (code (response-code response))) (unless (and (>= code 200) diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm index 030f310..3cd1c59 100644 --- a/guix-build-coordinator/hooks.scm +++ b/guix-build-coordinator/hooks.scm @@ -26,6 +26,7 @@ #:use-module (gcrypt pk-crypto) #:use-module (zlib) #:use-module (lzlib) + #:use-module (knots timeout) #:use-module (guix pki) #:use-module (guix store) #:use-module (guix base32) @@ -156,7 +157,8 @@ (substitute-derivation store drv-name #:substitute-urls - derivation-substitute-urls))) + derivation-substitute-urls)) + #:timeout 120) (add-temp-root store drv-name)) (let* ((drv (read-derivation-from-file* drv-name)) @@ -620,18 +622,15 @@ (unless (eq? source-compression recompress-to) (when (file-exists? tmp-output-log-file) (delete-file tmp-output-log-file)) - (with-port-timeouts - (lambda () - (call-with-compressed-input-file - source-log-file - source-compression - (lambda (input-port) - (call-with-compressed-output-file - tmp-output-log-file - recompress-to - (lambda (output-port) - (dump-port input-port output-port)))))) - #:timeout timeout) + (call-with-compressed-input-file + source-log-file + source-compression + (lambda (input-port) + (call-with-compressed-output-file + tmp-output-log-file + recompress-to + (lambda (output-port) + (dump-port input-port output-port))))) (rename-file tmp-output-log-file output-log-file) (delete-file source-log-file))))) diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index d747962..f4f15dc 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -14,10 +14,11 @@ #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) - #:use-module (ice-9 suspendable-ports) - #:use-module ((ice-9 ports internal) #:select (port-poll)) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) + #:use-module (oop goops) + #:use-module (logging logger) + #:use-module (logging port-log) #:use-module (web uri) #:use-module (web http) #:use-module (web client) @@ -42,18 +43,9 @@ #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix scripts substitute) + #:use-module (guix-build-coordinator utils timeout) #:export (random-v4-uuid - &port-timeout - &port-read-timeout - &port-write-timeout - - port-timeout-error? - port-read-timeout-error? - port-write-timeout-error? - - with-port-timeouts - request-query-parameters call-with-streaming-http-request @@ -94,7 +86,13 @@ open-socket-for-uri* - check-locale!)) + check-locale! + + display/safe + simple-format/safe + format/safe + + <custom-port-log>)) (eval-when (eval load compile) (begin @@ -182,73 +180,6 @@ (parse-query-string query)) '()))) -(define &port-timeout - (make-exception-type '&port-timeout - &external-error - '(port))) - -(define make-port-timeout-error - (record-constructor &port-timeout)) - -(define port-timeout-error? - (record-predicate &port-timeout)) - -(define &port-read-timeout - (make-exception-type '&port-read-timeout - &port-timeout - '())) - -(define make-port-read-timeout-error - (record-constructor &port-read-timeout)) - -(define port-read-timeout-error? - (record-predicate &port-read-timeout)) - -(define &port-write-timeout - (make-exception-type '&port-write-timeout - &port-timeout - '())) - -(define make-port-write-timeout-error - (record-constructor &port-write-timeout)) - -(define port-write-timeout-error? - (record-predicate &port-write-timeout)) - -(define* (with-port-timeouts thunk #:key (timeout 120)) - - ;; When the GC runs, it restarts the poll syscall, but the timeout remains - ;; unchanged! When the timeout is longer than the time between the syscall - ;; restarting, I think this renders the timeout useless. Therefore, this - ;; code uses a short timeout, and repeatedly calls poll while watching the - ;; clock to see if it has timed out overall. - (define poll-timeout-ms 200) - - (define (wait port mode) - (let ((timeout-internal - (+ (get-internal-real-time) - (* internal-time-units-per-second timeout)))) - (let loop ((poll-value - (port-poll port mode poll-timeout-ms))) - (if (= poll-value 0) - (if (> (get-internal-real-time) - timeout-internal) - (raise-exception - (if (string=? mode "r") - (make-port-read-timeout-error port) - (make-port-write-timeout-error port))) - (loop (port-poll port mode poll-timeout-ms))) - poll-value)))) - - (parameterize - ((current-read-waiter - (lambda (port) - (wait port "r"))) - (current-write-waiter - (lambda (port) - (wait port "w")))) - (thunk))) - (define* (call-with-streaming-http-request uri content-length callback @@ -275,8 +206,8 @@ (setvbuf port 'block (expt 2 13)) (with-exception-handler (lambda (exp) - (simple-format #t "error: ~A ~A: ~A\n" - method (uri-path uri) exp) + (simple-format/safe #t "error: ~A ~A: ~A\n" + method (uri-path uri) exp) (close-port port) (raise-exception exp)) (lambda () @@ -291,7 +222,8 @@ (let ((body (read-response-body response))) (close-port port) (values response - body))))))))))) + body))))))))) + #:timeout 120)) (define (find-missing-substitutes-for-output store substitute-urls output) (if (valid-path? store output) @@ -349,7 +281,7 @@ (when (file-exists? cache-file) (with-exception-handler (lambda (exn) - (simple-format + (simple-format/safe (current-error-port) "error: when deleting substitute cache file: ~A\n" exn)) @@ -361,7 +293,18 @@ (let ((substitute-urls (append-map (lambda (substitute-url) (let ((log-port (open-output-string))) - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format/safe + (current-error-port) + "exception in has-substiutes-no-cache? (~A): ~A\n" + substitute-url exn) + (display/safe (string-append + (get-output-string log-port) + "\n") + (current-error-port)) + (close-output-port log-port) + (raise-exception exn)) (lambda () (if (null? ;; I doubt the caching is thread safe, so @@ -371,17 +314,7 @@ (lookup-narinfos substitute-url (list file))))) '() - (list substitute-url))) - (lambda (key . args) - (simple-format - (current-error-port) - "exception in has-substiutes-no-cache? (~A) ~A: ~A\n" - substitute-url key args) - (display (string-append - (get-output-string log-port) - "\n") - (current-error-port)) - (close-output-port log-port))))) + (list substitute-url)))))) substitute-urls))) substitute-urls)) @@ -415,7 +348,7 @@ (take-right lines 10) lines))) (close-output-port log-port) - (simple-format + (simple-format/safe (current-error-port) "exception when substituting derivation: ~A:\n ~A\n" exn (string-join last-n-lines "\n")) @@ -425,27 +358,23 @@ (ensure-path store derivation-name))) #:unwind? #t))) -(define read-derivation-from-file* - (let ((%derivation-cache - (@@ (guix derivations) %derivation-cache))) - (lambda (file) - (or (and file (hash-ref %derivation-cache file)) - (let ((drv - ;; read-derivation can call read-derivation-from-file, so to - ;; avoid having many open files when reading a derivation with - ;; inputs, read it in to a string first. - (call-with-input-string - ;; Avoid calling scm_i_relativize_path in - ;; fport_canonicalize_filename since this leads to lots - ;; of readlink calls - (with-fluids ((%file-port-name-canonicalization 'none)) - (call-with-input-file file - get-string-all)) - (lambda (port) - (set-port-filename! port file) - (read-derivation port read-derivation-from-file*))))) - (hash-set! %derivation-cache file drv) - drv))))) +(define* (read-derivation-from-file* file #:optional (drv-hash (make-hash-table))) + (or (and file (hash-ref drv-hash file)) + (let ((drv + ;; read-derivation can call read-derivation-from-file, so to + ;; avoid having many open files when reading a derivation with + ;; inputs, read it in to a string first. + (call-with-input-string + (call-with-input-file file + get-string-all) + (lambda (port) + (set-port-filename! port file) + (read-derivation port (lambda (file) + (read-derivation-from-file* + file + drv-hash))))))) + (hash-set! drv-hash file drv) + drv))) (define (read-derivation-through-substitutes derivation-name substitute-urls) @@ -638,7 +567,7 @@ References: ~a~%" #:unwind? #t) ((#t . return-values) (when (> attempt 1) - (simple-format + (simple-format/safe (current-error-port) "retry success: ~A\n on attempt ~A of ~A\n" f @@ -649,7 +578,7 @@ References: ~a~%" (if (>= attempt (- times 1)) (begin - (simple-format + (simple-format/safe (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n" f @@ -660,14 +589,14 @@ References: ~a~%" (when error-hook (error-hook attempt exn)) (sleep-impl delay) - (simple-format + (simple-format/safe (current-error-port) "running last retry of ~A after ~A failed attempts\n" f attempt) (f)) (begin - (simple-format + (simple-format/safe (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n" f @@ -895,27 +824,29 @@ References: ~a~%" (define (thread-process-job job-args) (with-exception-handler (lambda (exn) - (simple-format (current-error-port) - "~A work queue, job raised exception ~A: ~A\n" - name job-args exn)) + (simple-format/safe + (current-error-port) + "~A work queue, job raised exception ~A: ~A\n" + name job-args exn)) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format/safe + (current-error-port) + "~A work queue, exception when handling job: ~A\n" + name exn) + (let* ((stack (make-stack #t 3)) + (backtrace + (call-with-output-string + (lambda (port) + (display-backtrace stack port) + (newline port))))) + (display/safe + backtrace + (current-error-port))) + (raise-exception exn)) (lambda () - (apply proc job-args)) - (lambda (key . args) - (simple-format - (current-error-port) - "~A work queue, exception when handling job: ~A ~A\n" - name key args) - (let* ((stack (make-stack #t 3)) - (backtrace - (call-with-output-string - (lambda (port) - (display-backtrace stack port) - (newline port))))) - (display - backtrace - (current-error-port)))))) + (apply proc job-args)))) #:unwind? #t)) (define (start-thread thread-index) @@ -1085,36 +1016,29 @@ References: ~a~%" (define (thread-process-job job-args) (with-exception-handler (lambda (exn) - (with-exception-handler - (lambda _ - #f) - (lambda () - ;; Logging may raise an exception, so try and just keep going. - (display - (simple-format - #f - "~A thread pool, job raised exception ~A: ~A\n" - name job-args exn) - (current-error-port))) - #:unwind? #t)) + (simple-format/safe + (current-error-port) + "~A thread pool, job raised exception ~A: ~A\n" + name job-args exn)) (lambda () - (with-throw-handler #t - (lambda () - (apply proc job-args)) - (lambda (key . args) - (simple-format + (with-exception-handler + (lambda (exn) + (simple-format/safe (current-error-port) - "~A thread pool, exception when handling job: ~A ~A\n" - name key args) + "~A thread pool, exception when handling job: ~A\n" + name exn) (let* ((stack (make-stack #t 3)) (backtrace (call-with-output-string (lambda (port) (display-backtrace stack port) (newline port))))) - (display + (display/safe backtrace - (current-error-port)))))) + (current-error-port))) + (raise-exception exn)) + (lambda () + (apply proc job-args)))) #:unwind? #t)) (define (start-thread thread-index) @@ -1354,22 +1278,18 @@ References: ~a~%" (define (check-locale!) (with-exception-handler (lambda (exn) - (display - (simple-format - #f - "exception when calling setlocale: ~A + (simple-format/safe + (current-error-port) + "exception when calling setlocale: ~A falling back to en_US.utf8\n" - exn) - (current-error-port)) + exn) (with-exception-handler (lambda (exn) - (display - (simple-format - #f - "exception when calling setlocale with en_US.utf8: ~A\n" - exn) - (current-error-port)) + (simple-format/safe + (current-error-port) + "exception when calling setlocale with en_US.utf8: ~A\n" + exn) (exit 1)) (lambda _ @@ -1378,3 +1298,51 @@ falling back to en_US.utf8\n" (lambda _ (setlocale LC_ALL "")) #:unwind? #t)) + +(define* (display/safe obj #:optional (port (current-output-port))) + ;; Try to avoid the dreaded conversion to port encoding failed error #62590 + (put-bytevector + port + (string->utf8 + (call-with-output-string + (lambda (port) + (display obj port))))) + (force-output port)) + +(define (simple-format/safe port s . args) + (let ((str (apply simple-format #f s args))) + (if (eq? #f port) + str + (display/safe + str + (if (eq? #t port) + (current-output-port) + port))))) + +(define (format/safe port s . args) + (let ((str (apply format #f s args))) + (if (eq? #f port) + str + (display/safe + str + (if (eq? #t port) + (current-output-port) + port))))) + +(define-class <custom-port-log> (<log-handler>) + (port #:init-value #f #:accessor port #:init-keyword #:port)) + +(define-method (emit-log (self <custom-port-log>) str) + (when (port self) + (put-bytevector (port self) + (string->utf8 str)) + ;; Even though the port is line buffered, writing to it with + ;; put-bytevector doesn't cause the buffer to be flushed. + (force-output (port self)))) + +(define-method (flush-log (self <custom-port-log>)) + (and=> (port self) force-output)) + +(define-method (close-log! (self <custom-port-log>)) + (and=> (port self) close-port) + (set! (port self) #f)) diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index 450c36b..d836ceb 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -1,4 +1,5 @@ (define-module (guix-build-coordinator utils fibers) + #:use-module (srfi srfi-9) #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 atomic) @@ -12,305 +13,20 @@ #:use-module (fibers scheduler) #:use-module (fibers operations) #:use-module (fibers conditions) + #:use-module (knots timeout) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix-build-coordinator utils) - #:export (make-worker-thread-channel - %worker-thread-default-timeout - call-with-worker-thread - worker-thread-timeout-error? + #:export (spawn-port-monitoring-fiber - call-with-sigint + make-discrete-priority-queueing-channels - run-server/patched - - spawn-port-monitoring-fiber - - letpar& - - port-timeout-error? - port-read-timeout-error? - port-write-timeout-error? - with-fibers-timeout - with-fibers-port-timeouts - - make-queueing-channel - make-discrete-priority-queueing-channels) + make-reusable-condition + reusable-condition? + signal-reusable-condition! + reusable-condition-wait) #:replace (retry-on-error)) -(define %worker-thread-args - (make-parameter #f)) - -(define* (make-worker-thread-channel initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - destructor - lifetime - (log-exception? (const #t)) - (expire-on-exception? #f) - (name "unnamed")) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." - (define thread-proc-vector - (make-vector parallelism #f)) - - (define (initializer/safe) - (let ((args - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running initializer in worker thread (~A): ~A:\n ~A\n" - name - initializer - exn) - #f) - (lambda () - (with-throw-handler #t - initializer - (lambda args - (backtrace)))) - #:unwind? #t))) - - (if args - args - ;; never give up, just keep retrying - (begin - (sleep 1) - (initializer/safe))))) - - (define (destructor/safe args) - (let ((success? - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running destructor in worker thread (~A): ~A:\n ~A\n" - name - destructor - exn) - #f) - (lambda () - (with-throw-handler #t - (lambda () - (apply destructor args) - #t) - (lambda _ - (backtrace)))) - #:unwind? #t))) - - (or success? - #t - (begin - (sleep 1) - (destructor/safe args))))) - - (define (process thread-index channel args) - (let loop ((current-lifetime lifetime)) - (let ((exception? - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second)) - - (let* ((start-time (get-internal-real-time)) - (response - (with-exception-handler - (lambda (exn) - (list 'worker-thread-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (vector-set! thread-proc-vector - thread-index - proc) - (with-throw-handler #t - (lambda () - (call-with-values - (lambda () - (start-stack - 'worker-thread - (apply proc args))) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - vals)))) - (lambda args - (when (match args - (('%exception exn) - (log-exception? exn)) - (_ #t)) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" args) - (backtrace))))) - #:unwind? #t))) - (put-message reply - response) - - (vector-set! thread-proc-vector - thread-index - #f) - - (match response - (('worker-thread-error duration _) - (when duration-logger - (duration-logger duration proc)) - #t) - ((duration . _) - (when duration-logger - (duration-logger duration proc)) - #f)))))))) - (unless (and expire-on-exception? - exception?) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))))) - - (let ((channel (make-channel))) - (for-each - (lambda (thread-index) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append - name " w t " - (number->string thread-index)))) - (const #t)) - - (let init ((args (initializer/safe))) - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "worker-thread-channel: exception: ~A\n" exn)) - (lambda () - (parameterize ((%worker-thread-args args)) - (process thread-index channel args))) - #:unwind? #t) - - (when destructor - (destructor/safe args)) - - (init (initializer/safe)))))) - (iota parallelism)) - - (values channel - thread-proc-vector))) - -(define &worker-thread-timeout - (make-exception-type '&worker-thread-timeout - &error - '())) - -(define make-worker-thread-timeout-error - (record-constructor &worker-thread-timeout)) - -(define worker-thread-timeout-error? - (record-predicate &worker-thread-timeout)) - -(define %worker-thread-default-timeout - (make-parameter 30)) - -(define* (call-with-worker-thread channel proc #:key duration-logger - (timeout (%worker-thread-default-timeout))) - "Send PROC to the worker thread through CHANNEL. Return the result of PROC. -If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) - (if args - (call-with-delay-logging proc #:args args) - (let* ((reply (make-channel)) - (operation-success? - (perform-operation - (let ((put - (wrap-operation - (put-operation channel - (list reply - (get-internal-real-time) - proc)) - (const #t)))) - - (if timeout - (choice-operation - put - (wrap-operation (sleep-operation timeout) - (const #f))) - put))))) - - (unless operation-success? - (raise-exception - (make-worker-thread-timeout-error))) - - (match (get-message reply) - (('worker-thread-error duration exn) - (when duration-logger - (duration-logger duration)) - (raise-exception exn)) - ((duration . result) - (when duration-logger - (duration-logger duration)) - (apply values result))))))) - -;; Copied from (fibers web server) -(define (call-with-sigint thunk cvar) - (let ((handler #f)) - (dynamic-wind - (lambda () - (set! handler - (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) - thunk - (lambda () - (if handler - ;; restore Scheme handler, SIG_IGN or SIG_DFL. - (sigaction SIGINT (car handler) (cdr handler)) - ;; restore original C handler. - (sigaction SIGINT #f)))))) - -;; This variant of run-server from the fibers library supports running -;; multiple servers within one process. -(define run-server/patched - (let ((fibers-web-server-module - (resolve-module '(fibers web server)))) - - (define set-nonblocking! - (module-ref fibers-web-server-module 'set-nonblocking!)) - - (define make-default-socket - (module-ref fibers-web-server-module 'make-default-socket)) - - (define socket-loop - (module-ref fibers-web-server-module 'socket-loop)) - - (lambda* (handler - #:key - (host #f) - (family AF_INET) - (addr (if host - (inet-pton family host) - INADDR_LOOPBACK)) - (port 8080) - (socket (make-default-socket family addr port))) - ;; We use a large backlog by default. If the server is suddenly hit - ;; with a number of connections on a small backlog, clients won't - ;; receive confirmation for their SYN, leading them to retry -- - ;; probably successfully, but with a large latency. - (listen socket 1024) - (set-nonblocking! socket) - (sigaction SIGPIPE SIG_IGN) - (spawn-fiber (lambda () (socket-loop socket handler)))))) - (define (spawn-port-monitoring-fiber port error-condition) (spawn-fiber (lambda () @@ -326,7 +42,7 @@ If already in the worker thread, call PROC immediately." "port monitoring fiber error-condition unresponsive") (primitive-exit 1)) (lambda () - (with-fibers-port-timeouts + (with-port-timeouts (lambda () (let ((sock (non-blocking-port @@ -337,218 +53,6 @@ If already in the worker thread, call PROC immediately." #:unwind? #t) (sleep 20))))) -(define (defer-to-fiber thunk) - (let ((reply (make-channel))) - (spawn-fiber - (lambda () - (put-message - reply - (with-exception-handler - (lambda (exn) - (cons 'worker-fiber-error exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "worker fiber: exception: ~A\n" - exn) - (backtrace) - (raise-exception exn)) - (lambda () - (call-with-values - thunk - (lambda vals - vals))))) - #:unwind? #t))) - #:parallel? #t) - reply)) - -(define (fetch-result-of-defered-thunks . reply-channels) - (let ((responses (map get-message reply-channels))) - (map - (match-lambda - (('worker-thread-error . exn) - (raise-exception exn)) - (result - (apply values result))) - responses))) - -(define-syntax parallel-via-fibers - (lambda (x) - (syntax-case x () - ((_ e0 ...) - (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) - #'(let ((tmp0 (defer-to-fiber - (lambda () - e0))) - ...) - (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) - -(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) - (call-with-values - (lambda () (parallel-via-fibers e ...)) - (lambda (v ...) - b0 b1 ...))) - -(define* (with-fibers-timeout thunk #:key timeout on-timeout) - (let ((channel (make-channel))) - (spawn-fiber - (lambda () - (with-exception-handler - (lambda (exn) - (perform-operation - (choice-operation - (put-operation channel (cons 'exception exn)) - (sleep-operation timeout)))) - (lambda () - (call-with-values thunk - (lambda vals - (perform-operation - (choice-operation - (put-operation channel vals) - (sleep-operation timeout)))))) - #:unwind? #t))) - - (match (perform-operation - (choice-operation - (get-operation channel) - (wrap-operation (sleep-operation timeout) - (const 'timeout)))) - ('timeout - (on-timeout)) - (('exception . exn) - (raise-exception exn)) - (vals - (apply values vals))))) - -(define &port-timeout - (make-exception-type '&port-timeout - &external-error - '(thunk port))) - -(define make-port-timeout-error - (record-constructor &port-timeout)) - -(define port-timeout-error? - (record-predicate &port-timeout)) - -(define &port-read-timeout - (make-exception-type '&port-read-timeout - &port-timeout - '())) - -(define make-port-read-timeout-error - (record-constructor &port-read-timeout)) - -(define port-read-timeout-error? - (record-predicate &port-read-timeout)) - -(define &port-write-timeout - (make-exception-type '&port-write-timeout - &port-timeout - '())) - -(define make-port-write-timeout-error - (record-constructor &port-write-timeout)) - -(define port-write-timeout-error? - (record-predicate &port-write-timeout)) - -;; These procedure are subject to spurious wakeups. - -(define (readable? port) - "Test if PORT is writable." - (= 1 (port-poll port "r" 0))) - -(define (writable? port) - "Test if PORT is writable." - (= 1 (port-poll port "w" 0))) - -(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) - (make-base-operation #f - (lambda _ - (and (ready? port) values)) - (lambda (flag sched resume) - (define (commit) - (match (atomic-box-compare-and-swap! flag 'W 'S) - ('W (resume values)) - ('C (commit)) - ('S #f))) - (schedule-when-ready - sched (port-ready-fd port) commit)))) - -(define (wait-until-port-readable-operation port) - "Make an operation that will succeed when PORT is readable." - (unless (input-port? port) - (error "refusing to wait forever for input on non-input port")) - (make-wait-operation readable? schedule-task-when-fd-readable port - port-read-wait-fd - wait-until-port-readable-operation)) - -(define (wait-until-port-writable-operation port) - "Make an operation that will succeed when PORT is writable." - (unless (output-port? port) - (error "refusing to wait forever for output on non-output port")) - (make-wait-operation writable? schedule-task-when-fd-writable port - port-write-wait-fd - wait-until-port-writable-operation)) - -(define* (with-fibers-port-timeouts thunk - #:key timeout - (read-timeout timeout) - (write-timeout timeout)) - (define (no-fibers-wait thunk port mode timeout) - (define poll-timeout-ms 200) - - ;; When the GC runs, it restarts the poll syscall, but the timeout - ;; remains unchanged! When the timeout is longer than the time - ;; between the syscall restarting, I think this renders the - ;; timeout useless. Therefore, this code uses a short timeout, and - ;; repeatedly calls poll while watching the clock to see if it has - ;; timed out overall. - (let ((timeout-internal - (+ (get-internal-real-time) - (* internal-time-units-per-second timeout)))) - (let loop ((poll-value - (port-poll port mode poll-timeout-ms))) - (if (= poll-value 0) - (if (> (get-internal-real-time) - timeout-internal) - (raise-exception - (if (string=? mode "r") - (make-port-read-timeout-error thunk port) - (make-port-write-timeout-error thunk port))) - (loop (port-poll port mode poll-timeout-ms))) - poll-value)))) - - (parameterize - ((current-read-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-readable-operation port) - (wrap-operation - (sleep-operation read-timeout) - (lambda () - (raise-exception - (make-port-read-timeout-error thunk port)))))) - (no-fibers-wait thunk port "r" read-timeout)))) - (current-write-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-writable-operation port) - (wrap-operation - (sleep-operation write-timeout) - (lambda () - (raise-exception - (make-port-write-timeout-error thunk port)))))) - (no-fibers-wait thunk port "w" write-timeout))))) - (thunk))) - ;; Use the fibers sleep (define (retry-on-error . args) (apply @@ -557,28 +61,6 @@ If already in the worker thread, call PROC immediately." args (list #:sleep-impl sleep)))) -(define (make-queueing-channel channel) - (define queue (make-q)) - - (let ((queue-channel (make-channel))) - (spawn-fiber - (lambda () - (while #t - (if (q-empty? queue) - (enq! queue - (perform-operation - (get-operation queue-channel))) - (let ((front (q-front queue))) - (perform-operation - (choice-operation - (wrap-operation (get-operation queue-channel) - (lambda (val) - (enq! queue val))) - (wrap-operation (put-operation channel front) - (lambda _ - (q-pop! queue)))))))))) - queue-channel)) - (define (make-discrete-priority-queueing-channels channel num-priorities) (define all-queues (map (lambda _ (make-q)) @@ -588,6 +70,11 @@ If already in the worker thread, call PROC immediately." (map (lambda _ (make-channel)) (iota num-priorities))) + (define (stats) + (map (lambda (queue) + `((length . ,(q-length queue)))) + all-queues)) + (spawn-fiber (lambda () (while #t @@ -620,4 +107,48 @@ If already in the worker thread, call PROC immediately." (enq! queue val)))) all-queues queue-channels))))))))))) - (apply values queue-channels)) + (values (list-copy queue-channels) + stats)) + +(define-record-type <reusable-condition> + (%make-reusable-condition atomic-box channel) + reusable-condition? + (atomic-box reusable-condition-atomic-box) + (channel reusable-condition-channel)) + +(define (make-reusable-condition) + (%make-reusable-condition (make-atomic-box #f) + (make-channel))) + +(define* (signal-reusable-condition! reusable-condition + #:optional (scheduler (current-scheduler))) + (match (atomic-box-compare-and-swap! + (reusable-condition-atomic-box reusable-condition) + #f + #t) + (#f + (spawn-fiber + (lambda () + (put-message (reusable-condition-channel reusable-condition) + #t)) + scheduler) + #t) + (#t #f))) + +(define* (reusable-condition-wait reusable-condition + #:key (timeout #f)) + (let ((val + (if (atomic-box-ref (reusable-condition-atomic-box reusable-condition)) + #t + ;; Not great as this is subject to race conditions, but it should + ;; roughly work + (if timeout + (perform-operation + (choice-operation + (get-operation (reusable-condition-channel reusable-condition)) + (wrap-operation (sleep-operation timeout) + (const #f)))) + (get-message (reusable-condition-channel reusable-condition)))))) + (atomic-box-set! (reusable-condition-atomic-box reusable-condition) + #f) + val)) diff --git a/guix-build-coordinator/utils/timeout.scm b/guix-build-coordinator/utils/timeout.scm new file mode 100644 index 0000000..bb133d7 --- /dev/null +++ b/guix-build-coordinator/utils/timeout.scm @@ -0,0 +1,81 @@ +(define-module (guix-build-coordinator utils timeout) + #:use-module (ice-9 exceptions) + #:use-module (ice-9 suspendable-ports) + #:use-module ((ice-9 ports internal) #:select (port-poll)) + #:export (&port-timeout + &port-read-timeout + &port-write-timeout + + port-timeout-error? + port-read-timeout-error? + port-write-timeout-error? + + with-port-timeouts)) + +(define &port-timeout + (make-exception-type '&port-timeout + &external-error + '(port))) + +(define make-port-timeout-error + (record-constructor &port-timeout)) + +(define port-timeout-error? + (record-predicate &port-timeout)) + +(define &port-read-timeout + (make-exception-type '&port-read-timeout + &port-timeout + '())) + +(define make-port-read-timeout-error + (record-constructor &port-read-timeout)) + +(define port-read-timeout-error? + (record-predicate &port-read-timeout)) + +(define &port-write-timeout + (make-exception-type '&port-write-timeout + &port-timeout + '())) + +(define make-port-write-timeout-error + (record-constructor &port-write-timeout)) + +(define port-write-timeout-error? + (record-predicate &port-write-timeout)) + +(define* (with-port-timeouts thunk #:key timeout) + + ;; When the GC runs, it restarts the poll syscall, but the timeout remains + ;; unchanged! When the timeout is longer than the time between the syscall + ;; restarting, I think this renders the timeout useless. Therefore, this + ;; code uses a short timeout, and repeatedly calls poll while watching the + ;; clock to see if it has timed out overall. + (define poll-timeout-ms 200) + + (define (wait port mode) + (let ((timeout-internal + (+ (get-internal-real-time) + (* internal-time-units-per-second timeout)))) + (let loop ((poll-value + (port-poll port mode poll-timeout-ms))) + (if (= poll-value 0) + (if (> (get-internal-real-time) + timeout-internal) + (raise-exception + (if (string=? mode "r") + (make-port-read-timeout-error port) + (make-port-write-timeout-error port))) + (loop (port-poll port mode poll-timeout-ms))) + poll-value)))) + + (parameterize + ((current-read-waiter + (lambda (port) + (wait port "r"))) + (current-write-waiter + (lambda (port) + (wait port "w")))) + (thunk))) + diff --git a/guix-dev.scm b/guix-dev.scm index 35869dc..011d9e0 100644 --- a/guix-dev.scm +++ b/guix-dev.scm @@ -44,6 +44,39 @@ (gnu packages texinfo) (srfi srfi-1)) +(define guile-knots + (let ((commit "d572f591a3c136bfc7b23160e16381c92588f8d9") + (revision "1")) + (package + (name "guile-knots") + (version (git-version "0" revision commit)) + (source (origin + (method git-fetch) + (uri (git-reference + (url "https://git.cbaines.net/git/guile/knots") + (commit commit))) + (sha256 + (base32 + "0g85frfniblxb2cl81fg558ic3cxvla7fvml08scjgbbxn8151gv")) + (file-name (string-append name "-" version "-checkout")))) + (build-system gnu-build-system) + (native-inputs + (list pkg-config + autoconf + automake + guile-3.0 + guile-lib + guile-fibers)) + (inputs + (list guile-3.0)) + (propagated-inputs + (list guile-fibers)) + (home-page "https://git.cbaines.net/guile/knots") + (synopsis "Patterns and functionality to use with Guile Fibers") + (description + "") + (license license:gpl3+)))) + (package (name "guix-build-coordinator") (version "0.0.0") @@ -53,6 +86,7 @@ (list guix guile-json-4 guile-fibers-1.3 + guile-knots guile-gcrypt guile-readline guile-lib diff --git a/scripts/guix-build-coordinator.in b/scripts/guix-build-coordinator.in index 72aa8d4..0c06579 100644 --- a/scripts/guix-build-coordinator.in +++ b/scripts/guix-build-coordinator.in @@ -58,15 +58,6 @@ (guix-build-coordinator build-allocator) (guix-build-coordinator client-communication)) -(install-suspendable-ports!) - -;; TODO Work around this causing problems with backtraces -;; https://github.com/wingo/fibers/issues/76 -(set-record-type-printer! - (@@ (fibers scheduler) <scheduler>) - (lambda (scheduler port) - (display "#<scheduler>" port))) - (define %base-options ;; Specifications of the command-line options (list (option '("secret-key-base-file") #t #f @@ -83,6 +74,13 @@ (option '("update-database") #f #f (lambda (opt name _ result) (alist-cons 'update-database #t result))) + (option '("listen-repl") #f #t + (lambda (opt name arg result) + (alist-cons 'listen-repl + (if arg + (string->number arg) + #t) + (alist-delete 'listen-repl result)))) (option '("show-error-details") #f #f (lambda (opt name _ result) (alist-cons 'show-error-details #t result))))) @@ -295,6 +293,16 @@ (or (assq-ref result 'not-systems) '())) (alist-delete 'not-systems result)))) + (option '("created-at-gt") #t #f + (lambda (opt name arg result) + (alist-cons 'created-at-> + (datastore-validate-datetime-string arg) + result))) + (option '("created-at-lt") #t #f + (lambda (opt name arg result) + (alist-cons 'created-at-< + (datastore-validate-datetime-string arg) + result))) (option '("skip-updating-derived-priorities") #f #f (lambda (opt name _ result) (alist-cons 'skip-updating-derived-priorities @@ -665,6 +673,8 @@ tags: #:not-tags (assq-ref opts 'not-tags) #:systems (assq-ref opts 'systems) #:not-systems (assq-ref opts 'not-systems) + #:created-at-< (assq-ref opts 'created-at-<) + #:created-at-> (assq-ref opts 'created-at->) #:processed #f #:canceled #f #:relationship (assq-ref opts 'relationship))) @@ -1110,4 +1120,5 @@ tags: #:pid-file (assq-ref opts 'pid-file) #:agent-communication-uri (assq-ref opts 'agent-communication) #:client-communication-uri (assq-ref opts 'client-communication) - #:parallel-hooks (assq-ref opts 'parallel-hooks))))))) + #:parallel-hooks (assq-ref opts 'parallel-hooks) + #:listen-repl (assoc-ref opts 'listen-repl))))))) diff --git a/sqitch/pg/deploy/allocated_builds_submit_outputs.sql b/sqitch/pg/deploy/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..a2ebe1e --- /dev/null +++ b/sqitch/pg/deploy/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Deploy guix-build-coordinator:allocated_builds_submit_outputs to pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/deploy/background-jobs-queue.sql b/sqitch/pg/deploy/background-jobs-queue.sql new file mode 100644 index 0000000..63dd8d4 --- /dev/null +++ b/sqitch/pg/deploy/background-jobs-queue.sql @@ -0,0 +1,7 @@ +-- Deploy guix-build-coordinator:background-jobs-queue to pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/deploy/builds_replace_unprocessed_index.sql b/sqitch/pg/deploy/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..782cf6b --- /dev/null +++ b/sqitch/pg/deploy/builds_replace_unprocessed_index.sql @@ -0,0 +1,7 @@ +-- Deploy guix-build-coordinator:builds_replace_unprocessed_index to pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/revert/allocated_builds_submit_outputs.sql b/sqitch/pg/revert/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..255efb1 --- /dev/null +++ b/sqitch/pg/revert/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:allocated_builds_submit_outputs from pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/revert/background-jobs-queue.sql b/sqitch/pg/revert/background-jobs-queue.sql new file mode 100644 index 0000000..46b0761 --- /dev/null +++ b/sqitch/pg/revert/background-jobs-queue.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:background-jobs-queue from pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/revert/builds_replace_unprocessed_index.sql b/sqitch/pg/revert/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..4395f90 --- /dev/null +++ b/sqitch/pg/revert/builds_replace_unprocessed_index.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:builds_replace_unprocessed_index from pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/verify/allocated_builds_submit_outputs.sql b/sqitch/pg/verify/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..e95a717 --- /dev/null +++ b/sqitch/pg/verify/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:allocated_builds_submit_outputs on pg + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; diff --git a/sqitch/pg/verify/background-jobs-queue.sql b/sqitch/pg/verify/background-jobs-queue.sql new file mode 100644 index 0000000..a36097e --- /dev/null +++ b/sqitch/pg/verify/background-jobs-queue.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:background-jobs-queue on pg + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; diff --git a/sqitch/pg/verify/builds_replace_unprocessed_index.sql b/sqitch/pg/verify/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..050ef75 --- /dev/null +++ b/sqitch/pg/verify/builds_replace_unprocessed_index.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:builds_replace_unprocessed_index on pg + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; diff --git a/sqitch/sqitch.plan b/sqitch/sqitch.plan index cfb5b9d..f4f538b 100644 --- a/sqitch/sqitch.plan +++ b/sqitch/sqitch.plan @@ -46,3 +46,6 @@ agent_status_add_processor_count 2023-03-24T09:28:47Z Chris <chris@felis> # Add remove_build_allocation_plan 2023-04-23T19:50:23Z Chris <chris@felis> # Remove build_allocation_plan system_uptime 2023-05-05T18:18:35Z Chris <chris@felis> # Add system uptime build_starts_index 2023-11-24T16:30:13Z Chris <chris@felis> # build_starts index +background-jobs-queue 2025-02-06T10:49:08Z Chris <chris@fang> # Add background_jobs_queue +builds_replace_unprocessed_index 2025-02-19T11:19:42Z Chris <chris@fang> # Replace builds_unprocessed +allocated_builds_submit_outputs 2025-03-02T08:22:48Z Chris <chris@fang> # Add allocated_builds.submit_outputs diff --git a/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql b/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..66d6b45 --- /dev/null +++ b/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Deploy guix-build-coordinator:allocated_builds_submit_outputs to sqlite + +BEGIN; + +ALTER TABLE allocated_builds ADD COLUMN submit_outputs BOOLEAN DEFAULT NULL; + +COMMIT; diff --git a/sqitch/sqlite/deploy/background-jobs-queue.sql b/sqitch/sqlite/deploy/background-jobs-queue.sql new file mode 100644 index 0000000..1cb35f0 --- /dev/null +++ b/sqitch/sqlite/deploy/background-jobs-queue.sql @@ -0,0 +1,11 @@ +-- Deploy guix-build-coordinator:background-jobs-queue to sqlite + +BEGIN; + +CREATE TABLE background_jobs_queue ( + id INTEGER PRIMARY KEY, + type TEXT NOT NULL, + args TEXT NOT NULL +); + +COMMIT; diff --git a/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql b/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..a404f31 --- /dev/null +++ b/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql @@ -0,0 +1,9 @@ +-- Deploy guix-build-coordinator:builds_replace_unprocessed_index to sqlite + +BEGIN; + +DROP INDEX builds_unprocessed; + +CREATE INDEX builds_live ON builds (id) WHERE processed = 0 AND canceled = 0; + +COMMIT; diff --git a/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql b/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..240de22 --- /dev/null +++ b/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:allocated_builds_submit_outputs from sqlite + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/sqlite/revert/background-jobs-queue.sql b/sqitch/sqlite/revert/background-jobs-queue.sql new file mode 100644 index 0000000..10ef1ff --- /dev/null +++ b/sqitch/sqlite/revert/background-jobs-queue.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:background-jobs-queue from sqlite + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql b/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..b02dd3c --- /dev/null +++ b/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:builds_replace_unprocessed_index from sqlite + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql b/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..0b1331e --- /dev/null +++ b/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:allocated_builds_submit_outputs on sqlite + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; diff --git a/sqitch/sqlite/verify/background-jobs-queue.sql b/sqitch/sqlite/verify/background-jobs-queue.sql new file mode 100644 index 0000000..1cf9965 --- /dev/null +++ b/sqitch/sqlite/verify/background-jobs-queue.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:background-jobs-queue on sqlite + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; diff --git a/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql b/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..d0dd086 --- /dev/null +++ b/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:builds_replace_unprocessed_index on sqlite + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; |