diff options
38 files changed, 3001 insertions, 2310 deletions
diff --git a/.dir-locals.el b/.dir-locals.el index 60601df..1c73c7a 100644 --- a/.dir-locals.el +++ b/.dir-locals.el @@ -10,6 +10,6 @@ (eval put 'with-db-worker-thread 'scheme-indent-function 1) (eval put 'with-time-logging 'scheme-indent-function 1) (eval put 'with-timeout 'scheme-indent-function 1) - (eval put 'letpar& 'scheme-indent-function 1) + (eval put 'fibers-let 'scheme-indent-function 1) (eval . (put 'call-with-lzip-output-port 'scheme-indent-function 1)) (eval . (put 'with-store 'scheme-indent-function 1)))) diff --git a/Makefile.am b/Makefile.am index 5081695..8a9af41 100644 --- a/Makefile.am +++ b/Makefile.am @@ -10,7 +10,8 @@ MINIMALSOURCES = \ guix-build-coordinator/agent-messaging/http.scm \ guix-build-coordinator/agent.scm \ guix-build-coordinator/config.scm \ - guix-build-coordinator/utils.scm + guix-build-coordinator/utils.scm \ + guix-build-coordinator/utils/timeout.scm ALLSOURCES = \ $(MINIMALSOURCES) \ diff --git a/guix-build-coordinator/agent-messaging/http.scm b/guix-build-coordinator/agent-messaging/http.scm index 0baa75b..33fb717 100644 --- a/guix-build-coordinator/agent-messaging/http.scm +++ b/guix-build-coordinator/agent-messaging/http.scm @@ -50,6 +50,7 @@ #:use-module (guix base64) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils timeout) #:use-module (guix-build-coordinator agent-messaging abstract) #:export (make-http-agent-interface @@ -73,10 +74,10 @@ password) (let* ((gnutls-ver (gnutls-version)) (guix-ver %guix-version)) - (simple-format (current-error-port) - "(gnutls version: ~A, guix version: ~A)\n" - gnutls-ver - guix-ver)) + (simple-format/safe (current-error-port) + "(gnutls version: ~A, guix version: ~A)\n" + gnutls-ver + guix-ver)) (make <http-agent-interface> #:coordinator-uri coordinator-uri @@ -204,7 +205,8 @@ response))))) (retry-on-error (lambda () - (with-port-timeouts make-request)) + (with-port-timeouts make-request + #:timeout 120)) #:times retry-times #:delay 10 #:no-retry agent-error-from-coordinator?)) @@ -420,10 +422,11 @@ (lambda (uploaded-bytes) (= uploaded-bytes file-size))) (retry-on-error (lambda () - (with-throw-handler #t - perform-upload - (lambda _ - (backtrace)))) + (with-exception-handler + (lambda (exn) + (backtrace) + (raise-exception exn)) + perform-upload)) #:times 100 #:delay 60 #:error-hook diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm index 85ba0d4..fa227da 100644 --- a/guix-build-coordinator/agent-messaging/http/server.scm +++ b/guix-build-coordinator/agent-messaging/http/server.scm @@ -46,13 +46,15 @@ #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) + #:use-module (knots) + #:use-module (knots timeout) + #:use-module (knots web-server) + #:use-module (knots thread-pool) #:use-module (prometheus) #:use-module (guix base32) #:use-module (guix base64) #:use-module (guix progress) #:use-module (guix build utils) - #:use-module ((guix build syscalls) - #:select (set-thread-name)) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) #:use-module (guix-build-coordinator datastore) @@ -135,7 +137,6 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (define (http-agent-messaging-start-server port host secret-key-base build-coordinator - chunked-request-channel output-hash-channel) (define plain-metrics-registry (make-metrics-registry)) @@ -146,18 +147,15 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (define process-metrics-updater (get-process-metrics-updater plain-metrics-registry)) - (define thread-metric - (make-gauge-metric - (build-coordinator-metrics-registry build-coordinator) - "guile_threads_total")) - (define datastore-metrics-updater (base-datastore-metrics-updater build-coordinator)) + (define (build-coordinator-metrics-updater) + (build-coordinator-update-metrics build-coordinator)) + (define (update-managed-metrics!) + (call-with-delay-logging build-coordinator-metrics-updater) (call-with-delay-logging gc-metrics-updater) - (metric-set thread-metric - (length (all-threads))) (call-with-delay-logging process-metrics-updater) (call-with-delay-logging datastore-metrics-updater)) @@ -168,8 +166,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." "exception when starting: ~A\n" exn) (primitive-exit 1)) (lambda () - (run-server/patched - (lambda (request body) + (run-knots-web-server + (lambda (request) (log-msg (build-coordinator-logger build-coordinator) 'INFO (format #f "~4a ~a\n" @@ -180,10 +178,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) - body secret-key-base build-coordinator - chunked-request-channel output-hash-channel update-managed-metrics! plain-metrics-registry))) @@ -611,10 +607,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (define (controller request method-and-path-components - body secret-key-base build-coordinator - chunked-request-channel output-hash-channel update-managed-metrics! plain-metrics-registry) @@ -642,6 +636,10 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (define logger (build-coordinator-logger build-coordinator)) + ;; TODO Handle this in the controller + (define body + (read-request-body request)) + (define (controller-thunk) (match method-and-path-components (('GET "agent" uuid) @@ -710,14 +708,11 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (('POST "agent" uuid "fetch-builds") (if (authenticated? uuid request) (let* ((json-body (json-string->scm (utf8->string body))) - ;; count is deprecated, use target_count instead - (count (assoc-ref json-body "count")) (target-count (assoc-ref json-body "target_count")) (systems (assoc-ref json-body "systems")) (builds (fetch-builds build-coordinator uuid (vector->list systems) - target-count - count))) + target-count))) (render-json `((builds . ,(list->vector builds))))) (render-json @@ -794,18 +789,9 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." body))) (call-with-output-file tmp-output-file-name (lambda (output-port) - ;; Older agents may still attempt to use chunked encoding - ;; for this request - (if (member '(chunked) (request-transfer-encoding request)) - (call-with-worker-thread - chunked-request-channel - (lambda () - (dump-port body-port - output-port - (request-content-length request)))) - (dump-port body-port - output-port - (request-content-length request)))))) + (dump-port body-port + output-port + (request-content-length request))))) (rename-file tmp-output-file-name output-file-name) (no-content)) @@ -852,20 +838,12 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." "deleting " tmp-output-file-name) (delete-file tmp-output-file-name)) - (if (member '(chunked) (request-transfer-encoding request)) - ;; Older agents may use chunked encoding for this request - (call-with-worker-thread - chunked-request-channel - (lambda () - (receive-file body - #f - tmp-output-file-name))) - (let ((content-length - (request-content-length request))) - (when (> content-length 0) - (receive-file body - content-length - tmp-output-file-name)))) + (let ((content-length + (request-content-length request))) + (when (> content-length 0) + (receive-file body + content-length + tmp-output-file-name))) (if (file-exists? output-file-name) (render-json @@ -945,19 +923,10 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." "deleting " output-file-name) (delete-file output-file-name)) - (if (member '(chunked) (request-transfer-encoding request)) - ;; Older agents may use chunked encoding for this request - (call-with-worker-thread - chunked-request-channel - (lambda () - (receive-file body - #f - tmp-output-file-name - #:append? #t))) - (receive-file body - (request-content-length request) - tmp-output-file-name - #:append? #t))) + (receive-file body + content-length + tmp-output-file-name + #:append? #t)) (if (file-exists? output-file-name) (render-json @@ -1015,12 +984,13 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." #:code 200 #:headers '((content-type . (text/plain)) (vary . (accept)))) - (lambda (port) - (write-metrics (build-coordinator-metrics-registry - build-coordinator) - port) - (write-metrics plain-metrics-registry - port)))))) + (call-with-output-string + (lambda (port) + (write-metrics (build-coordinator-metrics-registry + build-coordinator) + port) + (write-metrics plain-metrics-registry + port))))))) (_ (render-json "not-found" @@ -1037,7 +1007,7 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (render-json `((error . chunked-input-ended-prematurely)) #:code 400)) - ((worker-thread-timeout-error? exn) + ((thread-pool-timeout-error? exn) (render-json `((error . ,(simple-format #f "~A" exn))) #:code 503)) @@ -1046,30 +1016,20 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." `((error . ,(simple-format #f "~A" exn))) #:code 500)))) (lambda () - (with-throw-handler #t - controller-thunk - (lambda (key . args) - (unless (and (eq? '%exception key) - (or (agent-error? (car args)) - (worker-thread-timeout-error? (car args)) - (chunked-input-ended-prematurely-error? (car args)))) + (with-exception-handler + (lambda (exn) + (unless (or (agent-error? exn) + (thread-pool-timeout-error? exn) + (chunked-input-ended-prematurely-error? exn)) (match method-and-path-components ((method path-components ...) (simple-format (current-error-port) - "error: when processing: /~A ~A\n ~A ~A\n" - method (string-join path-components "/") - key args))) - - (let* ((stack (make-stack #t 4)) - (backtrace - (call-with-output-string - (lambda (port) - (display "\nBacktrace:\n" port) - (display-backtrace stack port) - (newline port) - (newline port))))) - (display - backtrace - (current-error-port))))))) + "error: when processing: /~A ~A\n" + method (string-join path-components "/")))) + + (print-backtrace-and-exception/knots exn)) + + (raise-exception exn)) + controller-thunk)) #:unwind? #t)) diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index 8144947..47a7c5f 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -46,8 +46,9 @@ #:use-module (guix base32) #:use-module (guix serialization) #:use-module ((guix build syscalls) - #:select (set-thread-name)) + #:select (set-thread-name free-disk-space)) #:use-module (guix-build-coordinator utils) + #:use-module (guix-build-coordinator utils timeout) #:use-module (guix-build-coordinator agent-messaging) #:use-module (guix-build-coordinator agent-messaging abstract) #:export (run-agent)) @@ -77,29 +78,35 @@ derivation-substitute-urls non-derivation-substitute-urls metrics-file - max-1min-load-average) + max-1min-load-average + timestamp-log-output?) (define lgr (make <logger>)) - (define port-log (make <port-log> + (define port-log (make <custom-port-log> #:port (current-output-port) #:formatter - (lambda (lvl time str) - (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime time)) - lvl - str)))) + ;; In guile-lib v0.2.8 onwards, the formatter is + ;; called with more arguments + (lambda args ; lvl, time, str + (format #f "~a(~5a): ~a~%" + (if timestamp-log-output? + (strftime "%F %H:%M:%S " (localtime + (second args))) + "") + (first args) + (third args))))) (define metrics-enabled? (and (not (string-null? metrics-file)) (let ((directory (dirname metrics-file))) (or (file-exists? directory) (begin - (simple-format (current-error-port) - "skipping writing metrics as ~A does not exist\n" - directory) + (simple-format/safe (current-error-port) + "skipping writing metrics as ~A does not exist\n" + directory) #f))) (with-exception-handler (lambda (exn) - (simple-format + (simple-format/safe (current-error-port) "skipping writing metrics, encountered exception ~A\n" exn) @@ -375,9 +382,16 @@ list-jobs (create-work-queue current-max-builds (lambda (build) - (process-job build - perform-post-build-actions - uploads-updater)) + ;; This poorly handles cases where the + ;; connection to the daemon fails, or other + ;; errors occur + (retry-on-error + (lambda () + (process-job build + perform-post-build-actions + uploads-updater)) + #:times 3 + #:delay 10)) #:thread-start-delay (make-time time-duration @@ -393,48 +407,47 @@ 20) #:name "job"))) (define (display-info) - (display - (simple-format - #f "current threads: ~A current jobs: ~A\n~A\n" - (count-threads) - (+ (count-jobs) (count-post-build-jobs)) - (string-append - (string-join - (map (match-lambda - ((build-details) - (simple-format - #f " - ~A (derived priority: ~A) + (simple-format/safe + (current-error-port) + "current threads: ~A current jobs: ~A\n~A\n" + (count-threads) + (+ (count-jobs) (count-post-build-jobs)) + (string-append + (string-join + (map (match-lambda + ((build-details) + (simple-format + #f " - ~A (derived priority: ~A) ~A" - (assoc-ref build-details "uuid") - (assoc-ref build-details "derived_priority") - (or - (assoc-ref build-details "derivation_name") - (assoc-ref build-details "derivation-name"))))) - (list-jobs)) - "\n") - "\n" - (string-join - (map (match-lambda - ((build-details upload-progress _) - (simple-format - #f " - ~A (derived priority: ~A) + (assoc-ref build-details "uuid") + (assoc-ref build-details "derived_priority") + (or + (assoc-ref build-details "derivation_name") + (assoc-ref build-details "derivation-name"))))) + (list-jobs)) + "\n") + "\n" + (string-join + (map (match-lambda + ((build-details upload-progress _) + (simple-format + #f " - ~A (derived priority: ~A) ~A~A" - (assoc-ref build-details "uuid") - (assoc-ref build-details "derived_priority") - (or - (assoc-ref build-details "derivation_name") - (assoc-ref build-details "derivation-name")) - (if (upload-progress-file upload-progress) - (simple-format - #f " + (assoc-ref build-details "uuid") + (assoc-ref build-details "derived_priority") + (or + (assoc-ref build-details "derivation_name") + (assoc-ref build-details "derivation-name")) + (if (upload-progress-file upload-progress) + (simple-format/safe + #f " uploading ~A (~A/~A)" - (upload-progress-file upload-progress) - (upload-progress-bytes-sent upload-progress) - (upload-progress-total-bytes upload-progress)) - "")))) - (list-post-build-jobs)) - "\n"))) - (current-error-port))) + (upload-progress-file upload-progress) + (upload-progress-bytes-sent upload-progress) + (upload-progress-total-bytes upload-progress)) + "")))) + (list-post-build-jobs)) + "\n")))) (let ((details (submit-status coordinator-interface 'idle @@ -513,12 +526,21 @@ '()) (lambda () - (fetch-builds-for-agent - coordinator-interface - systems - (+ (max current-threads 1) - (count-post-build-jobs)) - #:log (build-log-procedure lgr))) + (let ((free-space (free-disk-space "/gnu/store"))) + (if (< free-space (* 2 (expt 2 30))) ; 2G + (begin + (log-msg + lgr 'WARN + "low space on /gnu/store, " + "not fetching new builds") + (sleep 30) + '()) + (fetch-builds-for-agent + coordinator-interface + systems + (+ (max current-threads 1) + (count-post-build-jobs)) + #:log (build-log-procedure lgr))))) #:unwind? #t)) (new-builds (remove (lambda (build) @@ -630,7 +652,7 @@ unable to query substitute servers without caching")) (has-substiutes-no-cache? non-derivation-substitute-urls file)) - #:timeout (* 60 1000))) + #:timeout 60)) #:times 20 #:delay (random 15)) #f))) @@ -684,7 +706,15 @@ but the guix-daemon claims it's unavailable" #:timeout ,(* 60 60))) (let ((log-port (open-output-string))) - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (log-msg lgr 'ERROR + "exception when fetching missing paths: " + exn) + (display/safe (get-output-string log-port)) + (display/safe "\n") + (close-output-port log-port) + (raise-exception exn)) (lambda () (with-port-timeouts (lambda () @@ -692,14 +722,7 @@ but the guix-daemon claims it's unavailable" ((current-build-output-port log-port)) (build-things fetch-substitute-store missing-paths))) - #:timeout (* 60 10 1000))) - (lambda (key . args) - (log-msg lgr 'ERROR - "exception when fetching missing paths " - key ": " args) - (display (get-output-string log-port)) - (display (newline)) - (close-output-port log-port))))) + #:timeout (* 60 10)))))) (for-each (lambda (missing-path) (add-temp-root store missing-path)) missing-paths)) @@ -761,7 +784,7 @@ but the guix-daemon claims it's unavailable" (delete-paths store output-file-names))) #t) (lambda (key args) - (display (get-output-string log-port)) + (display/safe (get-output-string log-port)) (log-msg lgr 'ERROR "delete-outputs: " key args) @@ -850,25 +873,30 @@ but the guix-daemon claims it's unavailable" (raise-exception exn))) #f) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (unless (and (store-protocol-error? exn) + (let ((status (store-protocol-error-status exn))) + (or (= status 1) + (= status 100) + (= status 101)))) + (simple-format/safe (current-error-port) + "exception when performing build: ~A\n" + exn) + (let* ((stack (make-stack #t)) + (backtrace + (call-with-output-string + (lambda (port) + (display-backtrace stack port) + (newline port))))) + (display/safe backtrace))) + (raise-exception exn)) (lambda () (build-things store (list (derivation-file-name derivation))) (for-each (lambda (output) (add-temp-root store output)) (map derivation-output-path - (map cdr (derivation-outputs derivation))))) - (lambda (key . args) - (unless (and (eq? key '%exception) - (store-protocol-error? (car args)) - (let ((status (store-protocol-error-status - (car args)))) - (or (= status 1) - (= status 100) - (= status 101)))) - (simple-format (current-error-port) - "exception when performing build: ~A ~A\n" - key args) - (backtrace)))) + (map cdr (derivation-outputs derivation)))))) #t) #:unwind? #t))) diff --git a/guix-build-coordinator/build-allocator.scm b/guix-build-coordinator/build-allocator.scm index f73c457..8c08144 100644 --- a/guix-build-coordinator/build-allocator.scm +++ b/guix-build-coordinator/build-allocator.scm @@ -352,20 +352,13 @@ #:label-values `((system . ,system))))) counts))) - ;; Go through the setup failures and look specifically at the - ;; missing_inputs ones. Eliminate any missing_inputs failures if all the - ;; missing inputs appear to have been built successfully, and update the + ;; Go through the setup failures and update the ;; derived-build-priorities-hash to reflect the priorities of builds based ;; on the builds that would be "unblocked" if they were completed. (for-each (lambda (setup-failure-build-id) (let ((setup-failures - (filter - (lambda (setup-failure) - (string=? "missing_inputs" - (assq-ref setup-failure 'failure-reason))) - (hash-ref setup-failures-hash - setup-failure-build-id))) + (hash-ref setup-failures-hash setup-failure-build-id)) (setup-failure-build-derived-priority (hash-ref derived-build-priorities-hash setup-failure-build-id))) @@ -425,31 +418,28 @@ setup-failures-hash)) (let ((result - (append-map + (map (lambda (agent-id) (log "considering builds for" agent-id) (let ((builds-sorted-by-derived-priority (sort-list (filter (filter-builds-for-agent agent-id) builds) (build-sorting-function-for-agent agent-id)))) - (if (null? builds-sorted-by-derived-priority) - '() - (let ((final-ordered-builds - (concatenate - (map sort-priority-sublist - (limit-processed-sublists - (break-builds-in-to-priority-sublists - builds-sorted-by-derived-priority)))))) - (let ((builds-for-agent - (limit-planned-builds final-ordered-builds))) - (map (lambda (build-id ordering) - (list build-id - agent-id - ordering)) - (map (lambda (build) - (assq-ref build 'uuid)) - builds-for-agent) - (iota (length builds-for-agent)))))))) + (cons + agent-id + (if (null? builds-sorted-by-derived-priority) + '() + (let ((final-ordered-builds + (concatenate + (map sort-priority-sublist + (limit-processed-sublists + (break-builds-in-to-priority-sublists + builds-sorted-by-derived-priority)))))) + (let ((builds-for-agent + (limit-planned-builds final-ordered-builds))) + (map (lambda (build) + (assq-ref build 'uuid)) + builds-for-agent))))))) (map (lambda (agent) (assq-ref agent 'uuid)) agents)))) @@ -502,18 +492,6 @@ (let ((prioritised-builds (datastore-fetch-prioritised-unprocessed-builds datastore))) - (define systems-for-builds - ;; TODO Should be one query - (let ((table (make-hash-table))) - (for-each (lambda (build-id) - (hash-set! table - build-id - (datastore-find-build-derivation-system - datastore - build-id))) - prioritised-builds) - table)) - (define tags-for-build (let ((build-tags (make-hash-table))) (lambda (build-id) @@ -538,7 +516,10 @@ (assq-ref setup-failure 'failure-reason))) (cond ((string=? failure-reason "missing_inputs") - #f) + ;; This problem might go away, but just don't try the same agent + ;; again for now. + (string=? (assq-ref setup-failure 'agent-id) + agent-id)) ((string=? failure-reason "could_not_delete_outputs") ;; This problem might go away, but just don't try the same agent ;; again for now. @@ -552,43 +533,44 @@ (else (error "Unknown setup failure " failure-reason))))) - (lambda (build-id) - (log "build:" build-id) - (and - (or (null? requested-systems) - (let ((build-system (hash-ref systems-for-builds build-id))) - (member build-system requested-systems))) - (agent-tags-match-build-tags agent-tags tags-for-build - agent-id build-id) - (let* ((setup-failures-for-build - (or (hash-ref setup-failures-hash build-id) - '())) - (relevant-setup-failures - (filter relevant-setup-failure? - setup-failures-for-build))) - (log "relevant setup failures:" relevant-setup-failures) - (if (null? relevant-setup-failures) - #t - #f))))) - - (when metrics-registry - (let ((counts - (hash-fold - (lambda (_ system result) - `(,@(alist-delete system result) - (,system . ,(+ 1 (or (assoc-ref result system) 0))))) - '() - systems-for-builds))) - (for-each - (match-lambda - ((system . count) - (metric-set allocator-considered-builds-metric - count - #:label-values `((system . ,system))))) - counts))) + (match-lambda + (#(build-id build-system) + (log "build:" build-id) + (and + (or (null? requested-systems) + (member build-system requested-systems)) + (agent-tags-match-build-tags agent-tags tags-for-build + agent-id build-id) + (let* ((setup-failures-for-build + (or (hash-ref setup-failures-hash build-id) + '())) + (relevant-setup-failures + (filter relevant-setup-failure? + setup-failures-for-build))) + (log "relevant setup failures:" relevant-setup-failures) + (if (null? relevant-setup-failures) + #t + #f)))))) + + ;; TODO Restore this in a more performant way + ;; (when metrics-registry + ;; (let ((counts + ;; (hash-fold + ;; (lambda (_ system result) + ;; `(,@(alist-delete system result) + ;; (,system . ,(+ 1 (or (assoc-ref result system) 0))))) + ;; '() + ;; systems-for-builds))) + ;; (for-each + ;; (match-lambda + ;; ((system . count) + ;; (metric-set allocator-considered-builds-metric + ;; count + ;; #:label-values `((system . ,system))))) + ;; counts))) (let ((result - (append-map + (map (lambda (agent-id) (log "considering builds for" agent-id) (let* ((filter-proc @@ -596,32 +578,24 @@ (build-ids (let loop ((count 0) (build-ids '()) - (potential-build-ids prioritised-builds)) + (potential-builds prioritised-builds)) (if (or (and planned-builds-for-agent-limit (>= count planned-builds-for-agent-limit)) - (null? potential-build-ids)) - build-ids ;; highest priority last - (let ((potential-build (first potential-build-ids))) - (if (filter-proc potential-build) + (null? potential-builds)) + (reverse build-ids) ;; highest priority last, so + ;; reverse + (let ((potential-build-details (first potential-builds))) + (if (filter-proc potential-build-details) (loop (+ 1 count) - (cons potential-build + (cons (vector-ref + potential-build-details + 0) build-ids) - (cdr potential-build-ids)) + (cdr potential-builds)) (loop count build-ids - (cdr potential-build-ids)))))))) - (if (null? build-ids) - '() - (let ((build-ids-count - (length build-ids))) - (map (lambda (build-id ordering) - (list build-id - agent-id - ordering)) - build-ids - (iota build-ids-count - build-ids-count - -1)))))) + (cdr potential-builds)))))))) + (cons agent-id build-ids))) (map (lambda (agent) (assq-ref agent 'uuid)) agents)))) diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm index c0118aa..6ec6578 100644 --- a/guix-build-coordinator/client-communication.scm +++ b/guix-build-coordinator/client-communication.scm @@ -32,6 +32,11 @@ #:use-module (json) #:use-module (logging logger) #:use-module (gcrypt random) + #:use-module (knots) + #:use-module (knots timeout) + #:use-module (knots web-server) + #:use-module (knots thread-pool) + #:use-module (prometheus) #:use-module (web uri) #:use-module (web client) #:use-module (web request) @@ -65,9 +70,10 @@ (define (start-client-request-server secret-key-base host port - build-coordinator) - (run-server/patched - (lambda (request body) + build-coordinator + utility-thread-pool) + (run-knots-web-server + (lambda (request) (log-msg (build-coordinator-logger build-coordinator) 'INFO (format #f "~4a ~a\n" @@ -78,9 +84,10 @@ (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) - body + (read-request-body request) secret-key-base - build-coordinator))) + build-coordinator + utility-thread-pool))) #:host host #:port port)) @@ -88,7 +95,8 @@ method-and-path-components raw-body secret-key-base - build-coordinator) + build-coordinator + utility-thread-pool) (define datastore (build-coordinator-datastore build-coordinator)) @@ -97,6 +105,14 @@ (json-string->scm (utf8->string raw-body)) '())) + (define read-drv-error-count-metric + (or (metrics-registry-fetch-metric + (build-coordinator-metrics-registry build-coordinator) + "read_derivation_error_total") + (make-counter-metric + (build-coordinator-metrics-registry build-coordinator) + "read_derivation_error_total"))) + (define (controller-thunk) (match method-and-path-components (('GET "build" uuid) @@ -151,7 +167,8 @@ (alist-delete 'end-time build-details)) - ,@(if (assq-ref build-details 'processed) + ,@(if (or (assq-ref build-details 'processed) + (assq-ref build-details 'canceled)) '() (datastore-find-unprocessed-build-entry datastore uuid)) (created-at . ,(or (and=> @@ -291,8 +308,8 @@ (render-json `((build_allocation_plan . ,(list->vector - (datastore-list-allocation-plan-builds - datastore + (build-coordinator-list-allocation-plan-builds + build-coordinator agent-id))))))) (('POST "agent" agent-id "passwords") (let ((password (new-agent-password @@ -401,6 +418,14 @@ (or (and=> (assq-ref query-parameters 'priority_lt) string->number) 'unset) + #:created-at-> + (or (and=> (assq-ref query-parameters 'created_at_gt) + datastore-validate-datetime-string) + 'unset) + #:created-at-< + (or (and=> (assq-ref query-parameters 'created_at_lt) + datastore-validate-datetime-string) + 'unset) #:relationship (or (and=> (assq-ref query-parameters 'relationship) string->symbol) @@ -460,13 +485,42 @@ (simple-format #f "derivation must be a string: ~A\n" derivation)))) + (unless (derivation-path? derivation-file) + (raise-exception + (make-exception-with-message + "invalid derivation path"))) + + (string-for-each + (lambda (c) + (unless (or (char-alphabetic? c) + (char-numeric? c) + (member c '(#\+ #\- #\. #\_ #\? #\=))) + (raise-exception + (make-exception-with-message + (simple-format #f "invalid character in derivation name: ~A" + c))))) + (store-path-base derivation-file)) + (define (read-drv/substitute derivation-file) - (with-store/non-blocking store - (unless (valid-path? store derivation-file) - (substitute-derivation store - derivation-file - #:substitute-urls substitute-urls))) - (read-derivation-from-file* derivation-file)) + (call-with-delay-logging + (lambda () + (with-store/non-blocking store + (unless (valid-path? store derivation-file) + (with-port-timeouts + (lambda () + (substitute-derivation store + derivation-file + #:substitute-urls substitute-urls)) + #:timeout 60))))) + ;; Read the derivation in a thread to avoid blocking fibers + (call-with-thread + utility-thread-pool + (lambda () + (read-derivation-from-file* derivation-file)) + #:duration-logger + (lambda (duration) + (log-delay read-derivation-from-file* + duration)))) (let ((submit-build-result (call-with-delay-logging @@ -475,24 +529,36 @@ `(,build-coordinator ,derivation-file #:read-drv - ,(lambda (derivation-file) - (with-exception-handler - (lambda (exn) - (log-msg - (build-coordinator-logger build-coordinator) - 'WARN - "exception substituting derivation " derivation-file - ": " exn) - - (if (null? (or substitute-urls '())) - ;; Try again - (read-drv/substitute derivation-file) - (read-derivation-through-substitutes - derivation-file - substitute-urls))) - (lambda () - (read-drv/substitute derivation-file)) - #:unwind? #t)) + ,(let ((memoized-drv #f)) + (lambda (derivation-file) + (or + memoized-drv + (with-exception-handler + (lambda (exn) + (log-msg + (build-coordinator-logger build-coordinator) + 'WARN + "exception substituting derivation " derivation-file + ": " exn) + (raise-exception exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (let ((result + (retry-on-error + (lambda () + (read-drv/substitute derivation-file)) + #:times 2 + #:delay 3 + #:error-hook + (lambda _ + (metric-increment read-drv-error-count-metric))))) + (set! memoized-drv result) + result)))) + #:unwind? #t)))) ,@(let ((priority (assoc-ref body "priority"))) (if priority `(#:priority ,priority) @@ -507,6 +573,10 @@ body "ensure-all-related-derivation-outputs-have-builds") '(#:ensure-all-related-derivation-outputs-have-builds? #t) '()) + ,@(if (assoc-ref + body "skip-updating-derived-priorities") + '(#:skip-updating-derived-priorities? #t) + '()) ,@(if (assoc-ref body "tags") `(#:tags ,(map @@ -530,8 +600,7 @@ datastore (lambda (_) (let ((allocation-plan-counts - (datastore-count-build-allocation-plan-entries - datastore))) + (build-coordinator-allocation-plan-stats build-coordinator))) `((state_id . ,(build-coordinator-get-state-id build-coordinator)) (agents . ,(list->vector (map @@ -581,7 +650,9 @@ (datastore-list-agent-builds datastore (assq-ref agent-details 'uuid)))))))) - (datastore-list-agents datastore)))))))))) + (datastore-list-agents datastore))))))) + #:priority 'high + #:duration-metric-name "get_state"))) (('GET "events") (let ((headers (request-headers request))) (list (build-response @@ -626,7 +697,7 @@ (render-json `((error . ,(client-error-details exn))) #:code 400)) - ((worker-thread-timeout-error? exn) + ((thread-pool-timeout-error? exn) (render-json `((error . ,(simple-format #f "~A" exn))) #:code 503)) @@ -635,20 +706,18 @@ `((error . 500)) #:code 500)))) (lambda () - (with-throw-handler #t - controller-thunk - (lambda (key . args) - (unless (and (eq? '%exception key) - (or - (worker-thread-timeout-error? (car args)) - (client-error? (car args)))) + (with-exception-handler + (lambda (exn) + (unless (or + (thread-pool-timeout-error? exn) + (client-error? exn)) (match method-and-path-components ((method path-components ...) (simple-format (current-error-port) - "error: when processing client request: /~A ~A\n ~A ~A\n" + "error: when processing client request: /~A ~A\n ~A\n" method (string-join path-components "/") - key args))) + exn))) (let* ((stack (make-stack #t 4)) (backtrace @@ -660,7 +729,9 @@ (newline port))))) (display backtrace - (current-error-port))))))) + (current-error-port)))) + (raise-exception exn)) + controller-thunk)) #:unwind? #t)) (define* (render-json json #:key (extra-headers '()) @@ -748,7 +819,8 @@ ensure-all-related-derivation-outputs-have-builds? tags #:key - defer-until) + defer-until + skip-updating-derived-priorities?) (send-request coordinator-uri 'POST "/builds" @@ -766,6 +838,9 @@ ,@(if ensure-all-related-derivation-outputs-have-builds? '((ensure-all-related-derivation-outputs-have-builds . #t)) '()) + ,@(if skip-updating-derived-priorities? + '((skip-updating-derived-priorities . #t)) + '()) ,@(if (null? tags) '() `((tags . ,(list->vector tags)))) @@ -826,6 +901,8 @@ (canceled 'unset) (priority-> 'unset) (priority-< 'unset) + (created-at-> 'unset) + (created-at-< 'unset) (relationship 'unset) (after-id #f) (limit #f)) @@ -874,6 +951,12 @@ ,@(if (number? priority-<) (list (simple-format #f "priority_lt=~A" priority-<)) '()) + ,@(if (string? created-at->) + (list (simple-format #f "created_at_gt=~A" created-at->)) + '()) + ,@(if (string? created-at-<) + (list (simple-format #f "created_at_lt=~A" created-at-<)) + '()) ,@(if (and relationship (not (eq? 'unset relationship))) (list (simple-format #f "relationship=~A" relationship)) '()) diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 3830d88..adb7575 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -25,18 +25,23 @@ #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) + #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 vlist) #:use-module (ice-9 match) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) + #:use-module (ice-9 suspendable-ports) #:use-module (ice-9 format) #:use-module (ice-9 atomic) #:use-module (ice-9 control) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) + #:use-module (system repl server) + #:use-module (system repl command) + #:use-module (system repl debug) #:use-module (web uri) #:use-module (web http) #:use-module (oop goops) @@ -44,12 +49,18 @@ #:use-module (logging port-log) #:use-module (gcrypt random) #:use-module (fibers) + #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers conditions) + #:use-module (fibers operations) + #:use-module (knots) + #:use-module (knots timeout) + #:use-module (knots parallelism) + #:use-module (knots thread-pool) #:use-module (prometheus) - #:use-module ((guix build syscalls) - #:select (set-thread-name)) + #:use-module (guix store) + #:use-module (guix progress) #:use-module (guix derivations) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) @@ -66,6 +77,8 @@ client-error? client-error-details + %build-coordinator + make-build-coordinator build-coordinator-datastore build-coordinator-hooks @@ -97,12 +110,20 @@ build-coordinator-prompt-hook-processing-for-event start-hook-processing-threads + build-coordinator-update-metrics + + build-coordinator-allocation-plan-stats + build-coordinator-trigger-build-allocation + build-coordinator-list-allocation-plan-builds + build-output-file-location build-log-file-destination build-log-file-location handle-build-start-report handle-build-result - handle-setup-failure-report)) + handle-setup-failure-report + + build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built)) (define-exception-type &agent-error &error make-agent-error @@ -114,48 +135,44 @@ client-error? (details client-error-details)) +(define %build-coordinator + (make-parameter #f)) + (define-record-type <build-coordinator> (make-build-coordinator-record datastore hooks metrics-registry - allocation-strategy logger) + allocation-strategy allocator-channel + logger) build-coordinator? (datastore build-coordinator-datastore) (hooks build-coordinator-hooks) (hook-condvars build-coordinator-hook-condvars set-build-coordinator-hook-condvars!) + (background-job-conditions + build-coordinator-background-job-conditions + set-build-coordinator-background-job-conditions!) (metrics-registry build-coordinator-metrics-registry) (allocation-strategy build-coordinator-allocation-strategy) - (allocator-thread build-coordinator-allocator-thread - set-build-coordinator-allocator-thread!) + (trigger-build-allocation + build-coordinator-trigger-build-allocation + set-build-coordinator-trigger-build-allocation!) + (allocator-channel build-coordinator-allocator-channel) (logger build-coordinator-logger) (events-channel build-coordinator-events-channel set-build-coordinator-events-channel!) (get-state-id build-coordinator-get-state-id-proc set-build-coordinator-get-state-id-proc!) (scheduler build-coordinator-scheduler - set-build-coordinator-scheduler!)) + set-build-coordinator-scheduler!) + (utility-thread-pool build-coordinator-utility-thread-pool + set-build-coordinator-utility-thread-pool!)) (set-record-type-printer! <build-coordinator> (lambda (build-coordinator port) (display "#<build-coordinator>" port))) -(define-class <custom-port-log> (<log-handler>) - (port #:init-value #f #:accessor port #:init-keyword #:port)) - -(define-method (emit-log (self <custom-port-log>) str) - (when (port self) - (put-bytevector (port self) - (string->utf8 str)) - ;; Even though the port is line buffered, writing to it with - ;; put-bytevector doesn't cause the buffer to be flushed. - (force-output (port self)))) - -(define-method (flush-log (self <custom-port-log>)) - (and=> (port self) force-output)) - -(define-method (close-log! (self <custom-port-log>)) - (and=> (port self) close-port) - (set! (port self) #f)) +(define %command-duration-histogram-buckets + (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf))) (define %known-hooks '(build-submitted @@ -201,7 +218,15 @@ #f) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (if (and + (exception-with-origin? exn) + (string=? (exception-origin exn) + "fport_write")) + #f + (print-backtrace-and-exception/knots exn)) + (raise-exception exn)) (lambda () (match (atomic-box-ref current-state-id-and-event-buffer-index-box) @@ -225,16 +250,7 @@ (iota event-count-to-send (+ 1 last-sent-state-id)))) - current-state-id))) - (lambda (key . args) - (if (and - (eq? key 'system-error) - (match args - (("fport_write" "~A" ("Broken pipe") rest ...) - #t) - (_ #f))) - #f - (backtrace))))) + current-state-id))))) #:unwind? #t))) (unless (eq? #f new-state-id) @@ -276,7 +292,8 @@ (if (> requested-after-state-id current-state-id) current-state-id - requested-after-state-id) + (max 0 + requested-after-state-id)) current-state-id))))) (atomic-box-set! listener-channels-box @@ -346,6 +363,27 @@ (define (build-coordinator-get-state-id build-coordinator) ((build-coordinator-get-state-id-proc build-coordinator))) +(define (build-coordinator-update-metrics build-coordinator) + (define metrics-registry + (build-coordinator-metrics-registry build-coordinator)) + + (let ((utility-thread-pool-used-thread-metric + (or (metrics-registry-fetch-metric + metrics-registry + "utility_thread_pool_used_thread_total") + (make-gauge-metric + metrics-registry + "utility_thread_pool_used_thread_total")))) + + (and=> (build-coordinator-utility-thread-pool build-coordinator) + (lambda (utility-thread-pool) + (metric-set + utility-thread-pool-used-thread-metric + (vector-count + (lambda (_ proc) + (->bool proc)) + (thread-pool-proc-vector utility-thread-pool))))))) + (define* (make-build-coordinator #:key database-uri-string @@ -357,13 +395,14 @@ (database-uri->datastore database-uri-string #:metrics-registry metrics-registry - #:worker-thread-log-exception? + #:thread-pool-log-exception? (lambda (exn) (and (not (agent-error? exn)) (not (client-error? exn)))))) hooks (allocation-strategy - basic-build-allocation-strategy)) + basic-build-allocation-strategy) + (timestamp-log-output? #t)) (and (or (list? hooks) (begin (simple-format @@ -394,16 +433,22 @@ (port-log (make <custom-port-log> #:port (current-output-port) #:formatter - (lambda (lvl time str) - (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime time)) - lvl - str)))) + ;; In guile-lib v0.2.8 onwards, the formatter is + ;; called with more arguments + (lambda args ; lvl, time, str + (format #f "~a(~5a): ~a~%" + (if timestamp-log-output? + (strftime "%F %H:%M:%S " (localtime + (second args))) + "") + (first args) + (third args))))) (build-coordinator (make-build-coordinator-record datastore hooks metrics-registry allocation-strategy + (make-channel) lgr))) (add-handler! lgr port-log) @@ -427,6 +472,16 @@ (setrlimit 'core #f #f)) #:unwind? #t) + (let ((core-file + (string-append (getcwd) "/core")) + (metric + (make-gauge-metric (build-coordinator-metrics-registry + build-coordinator) + "core_dump_file_last_modified_seconds"))) + (when (file-exists? core-file) + (metric-set metric + (stat:mtime (stat core-file))))) + (with-exception-handler (lambda (exn) (simple-format #t "failed increasing open file limit: ~A\n" exn)) @@ -441,29 +496,49 @@ (lambda (scheduler port) (display "#<scheduler>" port))) + (call-with-new-thread + (lambda () + (set-thread-name + (string-append "gc watcher")) + + (add-hook! + after-gc-hook + (let ((last-gc-time-taken + (assq-ref (gc-stats) 'gc-time-taken))) + (lambda () + (let* ((gc-time-taken + (assq-ref (gc-stats) 'gc-time-taken)) + (time-since-last + (/ (- gc-time-taken + last-gc-time-taken) + internal-time-units-per-second))) + (when (> time-since-last 0.1) + (format (current-error-port) + "after gc (additional time taken: ~f)\n" + time-since-last)) + (set! last-gc-time-taken + (assq-ref (gc-stats) 'gc-time-taken)))))) + (while #t + (sleep 0.1)))) + (when update-datastore? (datastore-update (build-coordinator-datastore build-coordinator))) - (when pid-file - (call-with-output-file pid-file - (lambda (port) - (simple-format port "~A\n" (getpid))))) - - (set-build-coordinator-allocator-thread! + (set-build-coordinator-trigger-build-allocation! build-coordinator (make-build-allocator-thread build-coordinator)) - (set-build-coordinator-hook-condvars! - build-coordinator - (start-hook-processing-threads build-coordinator - parallel-hooks)) - (when trigger-build-allocation? (trigger-build-allocation build-coordinator))) (define %default-agent-uri (string->uri "http://0.0.0.0:8745")) (define %default-client-uri (string->uri "http://127.0.0.1:8746")) +(define %default-repl-server-port + ;; Default port to run REPL server on, if --listen-repl is provided + ;; but no port is mentioned + 37146) + (define* (run-coordinator-service build-coordinator #:key (update-datastore? #t) @@ -471,7 +546,10 @@ (agent-communication-uri %default-agent-uri) (client-communication-uri %default-client-uri) secret-key-base - (parallel-hooks '())) + (parallel-hooks '()) + listen-repl) + (install-suspendable-ports!) + (with-fluids ((%file-port-name-canonicalization 'none)) (perform-coordinator-service-startup build-coordinator @@ -479,33 +557,65 @@ #:pid-file pid-file #:parallel-hooks parallel-hooks) + (when listen-repl + (parameterize ((%build-coordinator build-coordinator)) + (cond + ((or (eq? #t listen-repl) + (number? listen-repl)) + (let ((listen-repl + (if (eq? #t listen-repl) + %default-repl-server-port + listen-repl))) + (format (current-error-port) + "REPL server listening on port ~a~%" + listen-repl) + (spawn-server (make-tcp-server-socket + #:port + (if (eq? #t listen-repl) + %default-repl-server-port + listen-repl))))) + (else + (format (current-error-port) + "REPL server listening on ~a~%" + listen-repl) + (spawn-server (make-unix-domain-server-socket #:path listen-repl)))))) + ;; Create some worker thread channels, which need to be created prior ;; to run-fibers being called. - (let ((chunked-request-channel - ;; There are fibers issues when trying to read the chunked - ;; requests, so do this in dedicated threads. - (make-worker-thread-channel - (const '()) - #:name "chunked request" - #:parallelism 16 - #:log-exception? - (lambda (exn) - (not - (chunked-input-ended-prematurely-error? - exn))) - #:delay-logger - (lambda (seconds-delayed) - (log-delay "chunked request channel" - seconds-delayed) - (when (> seconds-delayed 0.1) - (format - (current-error-port) - "warning: chunked request channel delayed by ~1,2f seconds~%" - seconds-delayed))))) - - (output-hash-channel + (let ((output-hash-channel (make-output-hash-channel - build-coordinator))) + build-coordinator)) + + (utility-thread-pool + (make-thread-pool + 18 + #:name "utility" + #:delay-logger + (let ((delay-metric + (make-histogram-metric + (build-coordinator-metrics-registry build-coordinator) + "utility_thread_pool_delay_seconds"))) + (lambda (seconds-delayed proc) + (log-delay "utility thread channel" + seconds-delayed) + (metric-observe delay-metric seconds-delayed) + (when (> seconds-delayed 0.1) + (format + (current-error-port) + "warning: utility thread channel delayed by ~1,2f seconds~%" + seconds-delayed))))))) + + (let ((metric + (make-gauge-metric + (build-coordinator-metrics-registry build-coordinator) + "utility_thread_pool_thread_total"))) + (metric-set metric + (vector-length + (thread-pool-proc-vector utility-thread-pool)))) + + (set-build-coordinator-utility-thread-pool! + build-coordinator + utility-thread-pool) (let ((finished? (make-condition))) (call-with-sigint @@ -528,6 +638,9 @@ (iota (length schedulers)) schedulers)) + (set-build-coordinator-scheduler! build-coordinator + (current-scheduler)) + (log-msg (build-coordinator-logger build-coordinator) 'INFO "initialising metrics") @@ -539,10 +652,18 @@ (datastore-spawn-fibers (build-coordinator-datastore build-coordinator)) + (set-build-coordinator-hook-condvars! + build-coordinator + (start-hook-processing-threads build-coordinator + parallel-hooks)) + + (set-build-coordinator-background-job-conditions! + build-coordinator + (start-background-job-processing-fibers build-coordinator)) + (spawn-fiber-to-watch-for-deferred-builds build-coordinator) - (set-build-coordinator-scheduler! build-coordinator - (current-scheduler)) + (spawn-build-allocation-plan-management-fiber build-coordinator) (let ((events-channel get-state-id @@ -566,7 +687,6 @@ host secret-key-base build-coordinator - chunked-request-channel output-hash-channel) (log-msg 'INFO "listening on " host ":" port)))) @@ -575,7 +695,13 @@ secret-key-base (uri-host client-communication-uri) (uri-port client-communication-uri) - build-coordinator) + build-coordinator + utility-thread-pool) + + (when pid-file + (call-with-output-file pid-file + (lambda (port) + (simple-format port "~A\n" (getpid))))) ;; Guile seems to just stop listening on ports, so try to ;; monitor that internally and just quit if it happens @@ -585,8 +711,8 @@ finished?) (wait finished?)) - #:hz 10 - #:parallelism 2)) + #:hz 0 + #:parallelism 1)) finished?))))) (define* (submit-build build-coordinator derivation-file @@ -596,6 +722,7 @@ (ignore-if-build-for-derivation-exists? #f) (ignore-if-build-for-outputs-exists? #f) (ensure-all-related-derivation-outputs-have-builds? #f) + skip-updating-derived-priorities? (tags '()) defer-until (read-drv read-derivation-from-file*)) @@ -608,35 +735,15 @@ #:include-canceled? #f) 0)) - (define (build-for-output-already-exists?) - ;; Handle the derivation not existing in the database here, so that adding - ;; it to the database isn't required for this code to work - (let* ((system-from-database (datastore-find-derivation-system datastore - derivation-file)) - - (derivation-exists-in-database? (not (eq? #f system-from-database))) - - (derivation - (if derivation-exists-in-database? - #f ; unnecessary to fetch derivation - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file)))) - - (system - (or system-from-database - (derivation-system derivation))) - - (outputs - (if derivation-exists-in-database? - (datastore-find-derivation-outputs datastore - derivation-file) - (map - (match-lambda - ((name . output) - `((name . ,name) - (output . ,(derivation-output-path output))))) - (derivation-outputs derivation))))) + (define (build-for-output-already-exists/with-derivation? derivation) + (let ((system (derivation-system derivation)) + (outputs + (map + (match-lambda + ((name . output) + `((name . ,name) + (output . ,(derivation-output-path output))))) + (derivation-outputs derivation)))) (any (lambda (output-details) (let ((builds-for-output @@ -648,13 +755,37 @@ (not (null? builds-for-output)))) outputs))) - (define (check-whether-to-store-build) + (define (build-for-output-already-exists?) + (let ((system (datastore-find-derivation-system datastore + derivation-file))) + (if (eq? #f system) ; derivation does not exist in database? + (build-for-output-already-exists/with-derivation? + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))) + (any + (lambda (output-details) + (let ((builds-for-output + (datastore-list-builds-for-output-and-system + datastore + (assq-ref output-details 'output) + system + #:include-canceled? #f))) + (not (null? builds-for-output)))) + (datastore-find-derivation-outputs datastore + derivation-file))))) + + (define* (check-whether-to-store-build #:optional derivation) (cond ((and ignore-if-build-for-derivation-exists? (build-for-derivation-exists?)) '((no-build-submitted . build-already-exists-for-this-derivation))) ((and ignore-if-build-for-outputs-exists? - (call-with-delay-logging build-for-output-already-exists?)) + (if derivation + (call-with-delay-logging + build-for-output-already-exists/with-derivation? + #:args (list derivation)) + (call-with-delay-logging build-for-output-already-exists?))) '((no-build-submitted . build-already-exists-for-a-output))) (else 'continue))) @@ -684,18 +815,20 @@ (or requested-uuid (random-v4-uuid))) - (define (build-perform-datastore-changes derivations-lacking-builds) + (define (build-perform-datastore-changes derivation derivations-lacking-builds) (lambda (_) ;; Check again now, since new builds could have been added since the ;; checks were made before the start of the transaction. - (match (check-whether-to-store-build) + (match (check-whether-to-store-build derivation) ('continue ;; Actually create a build, do this first so the derived priorities ;; for the builds inserted below are informed by this build. (store-build derivation-file build-id priority - tags) + tags + #:skip-updating-other-build-derived-priorities + skip-updating-derived-priorities?) (for-each (match-lambda @@ -733,65 +866,84 @@ (build-coordinator-metrics-registry build-coordinator) "coordinator_submit_build_duration_seconds" (lambda () + (unless (derivation-path? derivation-file) + (raise-exception + (make-client-error 'invalid-derivation-file))) + + (string-for-each + (lambda (c) + (unless (or (char-alphabetic? c) + (char-numeric? c) + (member c '(#\+ #\- #\. #\_ #\? #\=))) + (raise-exception + (make-client-error 'invalid-character-in-derivation-file)))) + (store-path-base derivation-file)) + (match (check-whether-to-store-build) ('continue - ;; Store the derivation first, so that listing related derivations - ;; with no builds works - (unless (datastore-find-derivation datastore derivation-file) - (datastore-store-derivation - datastore - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file)))) - - (let ((related-derivations-lacking-builds - (if ensure-all-related-derivation-outputs-have-builds? - (datastore-list-related-derivations-with-no-build-for-outputs + (let ((drv + ;; If the dervation is missing from the database, read it and + ;; enter it in to the database, so that listing related + ;; derivations with no builds works + (if (datastore-find-derivation datastore derivation-file) + #f + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))))) + (when drv + (datastore-store-derivation datastore drv)) + + (let ((related-derivations-lacking-builds + (if ensure-all-related-derivation-outputs-have-builds? + (datastore-list-related-derivations-with-no-build-for-outputs + datastore + derivation-file) + '()))) + (match (datastore-call-with-transaction datastore - derivation-file) - '()))) - (match (datastore-call-with-transaction - datastore - (build-perform-datastore-changes - ;; Do this here so it doesn't take time in the writer thread - (map - (lambda (drv) - ;; Generate the UUID's outside the transaction to save - ;; time too. - (cons drv (random-v4-uuid))) - related-derivations-lacking-builds)) - #:duration-metric-name - "store_build") - (#t ; build submitted - (build-coordinator-prompt-hook-processing-for-event - build-coordinator - 'build-submitted) - - (build-coordinator-send-event - build-coordinator - 'build-submitted - `((id . ,build-id) - (derivation . ,derivation-file) - (priority . ,priority) - (tags - . ,(list->vector + (build-perform-datastore-changes + drv + ;; Do this here so it doesn't take time in the writer thread (map - (match-lambda - ((key . value) - `((key . ,key) - (value . ,value)))) - (if (vector? tags) - (vector->list tags) - tags)))) - (defer_until . ,defer-until))) - - (trigger-build-allocation build-coordinator) - - `((build-submitted . ,build-id))) - (stop-condition - stop-condition)))) + (lambda (related-drv) + ;; Generate the UUID's outside the transaction to save + ;; time too. + (cons related-drv (random-v4-uuid))) + related-derivations-lacking-builds)) + #:priority 'low + #:duration-metric-name "store_build" + #:duration-metric-buckets %command-duration-histogram-buckets) + (#t ; build submitted + (build-coordinator-prompt-hook-processing-for-event + build-coordinator + 'build-submitted) + + (build-coordinator-send-event + build-coordinator + 'build-submitted + `((id . ,build-id) + (derivation . ,derivation-file) + (priority . ,priority) + (tags + . ,(list->vector + (map + (match-lambda + ((key . value) + `((key . ,key) + (value . ,value)))) + (if (vector? tags) + (vector->list tags) + tags)))) + (defer_until . ,defer-until))) + + (trigger-build-allocation build-coordinator) + + `((build-submitted . ,build-id))) + (stop-condition + stop-condition))))) (stop-condition - stop-condition))))) + stop-condition))) + #:buckets %command-duration-histogram-buckets)) (define* (cancel-build build-coordinator uuid #:key (ignore-if-build-required-by-another? #t) @@ -804,6 +956,10 @@ datastore (lambda (db) (let ((build-details (datastore-find-build datastore uuid))) + (unless build-details + (raise-exception + (make-client-error 'build-unknown))) + (when (assq-ref build-details 'canceled) (raise-exception (make-client-error 'build-already-canceled))) @@ -819,12 +975,17 @@ (make-transaction-rollback-exception 'skipped-as-build-required-by-another))) - (datastore-remove-build-from-allocation-plan datastore uuid) + (build-coordinator-remove-build-from-allocation-plan + build-coordinator + uuid) (datastore-cancel-build datastore uuid) (datastore-insert-unprocessed-hook-event datastore "build-canceled" (list uuid)) - 'build-canceled)))) + 'build-canceled) + #:priority 'low + #:duration-metric-name "cancel_build" + #:duration-metric-buckets %command-duration-histogram-buckets))) (when (eq? val 'build-canceled) (unless skip-updating-derived-priorities? @@ -845,6 +1006,10 @@ val)) + (unless (datastore-find-build datastore uuid) + (raise-exception + (make-client-error 'build-unknown))) + (if ignore-if-build-required-by-another? (let ((build-required ;; Do this check here outside the transaction to avoid having to @@ -867,6 +1032,10 @@ datastore (lambda (db) (let ((build-details (datastore-find-build datastore uuid))) + (unless build-details + (raise-exception + (make-client-error 'build-unknown))) + (when (assq-ref build-details 'canceled) (raise-exception (make-client-error 'build-already-canceled))) @@ -881,7 +1050,10 @@ new-priority #:skip-updating-derived-priorities? skip-updating-derived-priorities? - #:override-derived-priority override-derived-priority))) + #:override-derived-priority override-derived-priority)) + #:priority 'low + #:duration-metric-name "update_build_priority" + #:duration-metric-buckets %command-duration-histogram-buckets) (trigger-build-allocation build-coordinator) @@ -957,14 +1129,24 @@ (processor_count . ,processor-count)))) (define (trigger-build-allocation build-coordinator) - ((build-coordinator-allocator-thread build-coordinator))) + ((build-coordinator-trigger-build-allocation build-coordinator))) (define (build-coordinator-prompt-hook-processing-for-event build-coordinator event-name) (and=> (assoc-ref (build-coordinator-hook-condvars build-coordinator) event-name) (lambda (condvar) - (signal-condition-variable condvar) + (cond + ((condition-variable? condvar) + (signal-condition-variable condvar)) + ((reusable-condition? condvar) + (signal-reusable-condition! + condvar + (build-coordinator-scheduler build-coordinator))) + (else + (error + (simple-format #f "unrecognised condvar ~A" + condvar)))) #t))) (define (update-build-allocation-plan build-coordinator) @@ -978,16 +1160,20 @@ (allocator-proc datastore #:metrics-registry (build-coordinator-metrics-registry build-coordinator))))) - (datastore-replace-build-allocation-plan datastore new-plan) + (build-coordinator-replace-build-allocation-plan + build-coordinator + new-plan) - (let ((build-count-per-agent - (datastore-count-build-allocation-plan-entries - datastore))) - (build-coordinator-send-event - build-coordinator - "allocation-plan-update" - `((allocation_plan_counts . ,build-count-per-agent))))) - #t) + (build-coordinator-send-event + build-coordinator + "allocation-plan-update" + `((allocation_plan_counts + . ,(map + (match-lambda + ((agent-id . builds) + (cons agent-id (length builds)))) + new-plan)))) + #t)) (define (make-build-allocator-thread build-coordinator) (define mtx (make-mutex)) @@ -1022,24 +1208,19 @@ (lambda () (with-exception-handler (lambda (exn) - (simple-format - (current-error-port) - "build-allocator-thread: exception: ~A\n" - exn) (metric-increment failure-counter-metric) (atomic-box-set! allocation-needed #t)) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "error in build allocator thread\n") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) (lambda () (update-build-allocation-plan build-coordinator) - (metric-increment success-counter-metric)) - (lambda (key . args) - (simple-format - (current-error-port) - "error in build allocator thread: ~A ~A\n" - key - args) - (backtrace)))) + (metric-increment success-counter-metric)))) #:unwind? #t)) #:buckets ((@@ (prometheus) exponential-histogram-buckets) ; TODO #:start 1 @@ -1059,31 +1240,239 @@ exn) (exit 1)) (lambda () - (let ((build-allocation-plan-total - (make-gauge-metric - (build-coordinator-metrics-registry build-coordinator) - "build_allocation_plan_total" - #:labels '(agent_id)))) - (with-time-logging - "allocator initialise metrics" - (for-each (match-lambda - ((agent-id . count) - (metric-set build-allocation-plan-total - count - #:label-values - `((agent_id . ,agent-id))))) - (datastore-count-build-allocation-plan-entries - (build-coordinator-datastore build-coordinator))))) - (update-build-allocation-plan-loop))))) trigger-build-allocation) +(define (spawn-build-allocation-plan-management-fiber coordinator) + (define allocation-plan '()) + + (define allocation-plan-metric + (make-gauge-metric + (build-coordinator-metrics-registry coordinator) + "build_allocation_plan_total" + #:labels '(agent_id))) + + (define (update-build-allocation-plan-metrics!) + (for-each + (match-lambda + ((agent-id . builds) + (metric-set allocation-plan-metric + (length builds) + #:label-values + `((agent_id . ,agent-id))))) + allocation-plan) + #t) + + (spawn-fiber + (lambda () + (while #t + (with-exception-handler + (lambda _ #f) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "exception in allocation plan fiber\n") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (match (get-message (build-coordinator-allocator-channel coordinator)) + (('stats reply) + (put-message + reply + (map + (match-lambda + ((agent-id . builds) + (cons agent-id (length builds)))) + allocation-plan))) + (('fetch-agent-plan agent-id reply) + (put-message + reply + (or (list-copy (assoc-ref allocation-plan agent-id)) + '()))) + (('fetch-plan reply) + (put-message reply + (map (match-lambda + ((agent-id . builds) + (cons agent-id + (list-copy builds)))) + allocation-plan))) + (('remove-build uuid reply) + (set! + allocation-plan + (map + (match-lambda + ((agent-id . builds) + (cons agent-id + (remove + (lambda (build-uuid) + (string=? build-uuid uuid)) + builds)))) + allocation-plan)) + + (put-message reply #t)) + (('replace new-plan reply) + + (set! allocation-plan new-plan) + + (update-build-allocation-plan-metrics!) + + (put-message reply #t)))))) + #:unwind? #t))))) + +(define (build-coordinator-allocation-plan-stats coordinator) + (let ((reply (make-channel))) + (put-message (build-coordinator-allocator-channel coordinator) + (list 'stats reply)) + (get-message reply))) + +(define (build-coordinator-count-allocation-plan-builds coordinator agent-id) + (or (assoc-ref (build-coordinator-allocation-plan-stats coordinator) + agent-id) + 0)) + +(define (build-coordinator-fetch-agent-allocation-plan coordinator agent-id) + (let ((reply (make-channel))) + (put-message (build-coordinator-allocator-channel coordinator) + (list 'fetch-agent-plan agent-id reply)) + (get-message reply))) + +(define (build-coordinator-allocation-plan coordinator) + (let ((reply (make-channel))) + (put-message (build-coordinator-allocator-channel coordinator) + (list 'fetch-plan reply)) + (get-message reply))) + +(define (build-coordinator-build-in-allocation-plan? coordinator uuid) + (any + (match-lambda + ((agent-id . build-uuids) + (->bool (member uuid build-uuids string=?)))) + (build-coordinator-allocation-plan coordinator))) + +(define (build-coordinator-remove-build-from-allocation-plan coordinator uuid) + (let ((reply (make-channel))) + (put-message (build-coordinator-allocator-channel coordinator) + (list 'remove-build uuid reply)) + (get-message reply))) + +(define (build-coordinator-replace-build-allocation-plan coordinator plan) + (let ((reply (make-channel))) + (put-message (build-coordinator-allocator-channel coordinator) + (list 'replace plan reply)) + (get-message reply))) + +(define (build-coordinator-fetch-build-to-allocate coordinator + agent-id) + (define datastore + (build-coordinator-datastore coordinator)) + + (let ((all-planned-builds + (build-coordinator-fetch-agent-allocation-plan coordinator + agent-id))) + + (log-msg (build-coordinator-logger coordinator) + 'DEBUG + "fetching build to allocate to " agent-id ", " + (length all-planned-builds) " to consider") + + (let loop ((planned-builds all-planned-builds)) + (if (null? planned-builds) + #f + (match (datastore-fetch-build-to-allocate datastore + (first planned-builds)) + (#f + (log-msg (build-coordinator-logger coordinator) + 'DEBUG + agent-id ": ignoring " (first planned-builds)) + (loop (cdr planned-builds))) + (#(uuid derivation-id derivation-name derived_priority) + + (if (datastore-check-if-derivation-conflicts? + datastore + agent-id + derivation-id) + (begin + (log-msg (build-coordinator-logger coordinator) + 'DEBUG + agent-id ": " uuid " conflicts with allocated build") + (loop (cdr planned-builds))) + (begin + (log-msg (build-coordinator-logger coordinator) + 'DEBUG + agent-id ": " uuid " selected") + + `((uuid . ,uuid) + (derivation_name . ,derivation-name) + (derived_priority . ,derived_priority)))))))))) + +(define* (build-coordinator-list-allocation-plan-builds coordinator + agent-id + #:key limit + filter?) + (define (take* lst i) + (if (< (length lst) i) + lst + (take lst i))) + + (define datastore + (build-coordinator-datastore coordinator)) + + (define (build-data uuid + derivation-name + derived-priority + build-details) + `((uuid . ,uuid) + (derivation_name . ,derivation-name) + (system . ,(datastore-find-build-derivation-system + datastore + uuid)) + (priority . ,(assq-ref build-details 'priority)) + (derived_priority . ,derived-priority) + (tags . ,(vector-map + (lambda (_ tag) + (match tag + ((key . value) + `((key . ,key) + (value . ,value))))) + (datastore-fetch-build-tags + datastore + uuid))))) + + (let ((build-ids + (build-coordinator-fetch-agent-allocation-plan coordinator + agent-id))) + (filter-map + (lambda (build-id) + (if filter? + (match (datastore-fetch-build-to-allocate datastore build-id) + (#(uuid derivation_id derivation_name derived_priority) + (let ((build-details (datastore-find-build datastore uuid))) + (build-data uuid + derivation_name + derived_priority + build-details))) + (#f #f)) + (let ((build-details (datastore-find-build datastore build-id)) + (unprocessed-builds-entry + (datastore-find-unprocessed-build-entry + datastore + build-id))) + (build-data build-id + (assq-ref build-details 'derivation-name) + (assq-ref unprocessed-builds-entry + 'derived-priority) + build-details)))) + (if limit + (take* build-ids limit) + build-ids)))) + (define (spawn-fiber-to-watch-for-deferred-builds coordinator) (spawn-fiber (lambda () + (sleep 10) (while #t - (sleep 60) ; 1 minute (with-exception-handler (lambda (exn) (simple-format (current-error-port) @@ -1091,14 +1480,21 @@ exn)) (lambda () (let ((first-deferred-build - (datastore-find-first-unallocated-deferred-build - (build-coordinator-datastore coordinator)))) - (when (and first-deferred-build - (time<=? (date->time-utc - (assq-ref first-deferred-build 'deferred-until)) - (current-time))) - (simple-format #t "guix-build-coordinator: triggering build allocation for deferred build: ~A\n" (assq-ref first-deferred-build 'uuid)) - (trigger-build-allocation coordinator)))) + (datastore-find-deferred-build + (build-coordinator-datastore coordinator) + (lambda (build-details) + (build-coordinator-build-in-allocation-plan? + coordinator + (assq-ref build-details 'uuid)))))) + (if (and first-deferred-build + (time<=? (date->time-utc + (assq-ref first-deferred-build 'deferred-until)) + (current-time))) + (begin + (simple-format #t "guix-build-coordinator: triggering build allocation for deferred build: ~A\n" (assq-ref first-deferred-build 'uuid)) + (trigger-build-allocation coordinator) + (sleep 10)) + (sleep 60)))) #:unwind? #t))) #:parallel? #t)) @@ -1118,6 +1514,18 @@ "hook_failure_total" #:labels '(event))) + (define process-events-thread-pool-thread-total-metric + (make-gauge-metric + (build-coordinator-metrics-registry build-coordinator) + "hook_thread_pool_thread_total" + #:labels '(event))) + + (define process-events-thread-pool-used-thread-total-metric + (make-gauge-metric + (build-coordinator-metrics-registry build-coordinator) + "hook_thread_pool_used_thread_total" + #:labels '(event))) + (define (process-event id event arguments handler) (log-msg (build-coordinator-logger build-coordinator) 'DEBUG @@ -1126,10 +1534,6 @@ (and (with-exception-handler (lambda (exn) - (log-msg (build-coordinator-logger build-coordinator) - 'ERROR - "error running " event " (" id ") hook: " - exn) (metric-increment failure-counter-metric #:label-values `((event . ,event))) @@ -1140,25 +1544,34 @@ (build-coordinator-metrics-registry build-coordinator) "hook_duration_seconds" (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (let* ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))))) + (backtrace + (call-with-output-string + (lambda (port) + (print-frames (stack->vector stack) + port + #:count (stack-length stack)) + (print-exception + port + (stack-ref stack 4) + '%exception + (list exn)))))) + (log-msg (build-coordinator-logger build-coordinator) + 'ERROR + "error running " event " (" id ") hook\n" + backtrace)) + (raise-exception exn)) (lambda () (start-stack - 'hook - (apply handler build-coordinator arguments))) - (lambda (key . args) - (log-msg (build-coordinator-logger build-coordinator) - 'ERROR - "error running " event " (" id ") hook: " - key " " args) - (let* ((stack (make-stack #t 3)) - (backtrace - (call-with-output-string - (lambda (port) - (display-backtrace stack port) - (newline port))))) - (display - backtrace - (current-output-port)))))) + #t + (apply handler build-coordinator arguments))))) #:labels '(event) #:label-values `((event . ,event))) #t) @@ -1184,78 +1597,116 @@ (define (single-thread-process-events event-name handler) (let ((mtx (make-mutex)) (condvar (make-condition-variable))) - (call-with-new-thread + (call-with-default-io-waiters (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (symbol->string event-name))) - (const #t)) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (symbol->string event-name))) + (const #t)) + + (lock-mutex mtx) + (while #t + (with-exception-handler + (lambda (exn) + (sleep 10)) + (lambda () + (with-exception-handler + (lambda (exn) + (log-msg + (build-coordinator-logger build-coordinator) + 'CRITICAL + "error in " event-name " hook processing thread") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (while #t + (match (datastore-list-unprocessed-hook-events datastore event-name 1) + (() + (wait-condition-variable condvar mtx)) + (((id event arguments)) + (process-event id event arguments handler))))))) + #:unwind? #t)))))) + condvar)) - (lock-mutex mtx) - (while #t - (with-exception-handler - (lambda (exn) - (with-exception-handler - (lambda _ - ;; Things are really going wrong if logging about - ;; the hook processing thread crashing, also raises - ;; an exception, so just try and sleep and hope - ;; things go better next time - (sleep 10)) - (lambda () - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "hook processing thread " event-name - " exception: " exn)) - #:unwind? #t) - (sleep 10)) - (lambda () - (with-throw-handler #t + (define (thread-pool-process-events event-name handler thread-count) + (let ((thread-pool + (make-thread-pool + thread-count + #:name (simple-format #f "~A" event-name))) + (reusable-condition + (make-reusable-condition)) + (coordination-channel + (make-channel))) + + (metric-set process-events-thread-pool-thread-total-metric + thread-count + #:label-values `((event . ,event-name))) + + (spawn-fiber + (lambda () + (let loop ((running-ids '())) + (metric-set process-events-thread-pool-used-thread-total-metric + (vector-count + (lambda (_ proc) + (->bool proc)) + (thread-pool-proc-vector thread-pool)) + #:label-values `((event . ,event-name))) + (match (get-message coordination-channel) + (('process id event arguments) + (if (member id running-ids) + ;; Ignore already running jobs + (loop running-ids) + (begin + (spawn-fiber + (lambda () + (call-with-thread + thread-pool + (lambda () + (process-event id event arguments handler)) + ;; TODO Make this the default through knots + #:timeout #f) + (put-message coordination-channel + (list 'finished id)))) + (loop (cons id running-ids))))) + (('finished id) + (when (< (length running-ids) + (* 2 thread-count)) + (signal-reusable-condition! reusable-condition)) + (loop (delete id running-ids))) + (('count-running reply) + (let ((count (length running-ids))) + (spawn-fiber (lambda () - (while #t - (match (datastore-list-unprocessed-hook-events datastore event-name 1) - (() - (wait-condition-variable condvar mtx)) - (((id event arguments)) - (process-event id event arguments handler))))) - (lambda (key . args) - (log-msg - (build-coordinator-logger build-coordinator) - 'CRITICAL - "error in " event-name " hook processing thread: " key " " args) - (backtrace)))) - #:unwind? #t)))) - condvar)) + (put-message reply count)))) + (loop running-ids)))))) - (define (work-queue-process-events event-name handler thread-count) - (let-values (((pool-mutex job-available count-threads list-jobs) - (create-thread-pool - (lambda () - (max - 1 - (length - (datastore-list-unprocessed-hook-events - datastore - event-name - thread-count)))) - (lambda (running-jobs) - (let* ((in-progress-ids - (map car running-jobs)) - (potential-jobs - (datastore-list-unprocessed-hook-events - datastore - event-name - (+ 1 (length in-progress-ids))))) - (find - (match-lambda - ((id rest ...) - (not (member id in-progress-ids)))) - potential-jobs))) - (lambda (id event arguments) - (process-event id event arguments handler)) - #:name (symbol->string event-name)))) - job-available)) + (spawn-fiber + (lambda () + (while #t + (let ((count + (let ((channel (make-channel))) + (put-message coordination-channel + (list 'count-running channel)) + (get-message channel)))) + (when (< count (* 2 thread-count)) + (log-msg (build-coordinator-logger build-coordinator) + 'DEBUG "submitting batch of " event-name " hook events") + (for-each + (match-lambda + ((id event arguments) + (put-message coordination-channel + (list 'process id event arguments)))) + (datastore-list-unprocessed-hook-events + datastore + event-name + (* 20 thread-count))))) + (reusable-condition-wait reusable-condition + #:timeout 60)))) + + reusable-condition)) (map (match-lambda @@ -1264,25 +1715,30 @@ (or (and=> (assq-ref parallel-hooks event-name) (lambda (thread-count) - (work-queue-process-events event-name - handler - thread-count))) + (thread-pool-process-events event-name + handler + thread-count))) (single-thread-process-events event-name handler))))) (build-coordinator-hooks build-coordinator))) -(define (fetch-builds build-coordinator agent systems - max-builds deprecated-requested-count) +(define (fetch-builds build-coordinator agent systems max-builds) (define datastore (build-coordinator-datastore build-coordinator)) (define (allocate-one-build agent-id) - (let ((build-details (datastore-fetch-build-to-allocate datastore agent-id))) + (let ((build-details (build-coordinator-fetch-build-to-allocate + build-coordinator agent-id))) (if build-details (let ((build-id (assq-ref build-details 'uuid))) - (datastore-insert-to-allocated-builds datastore agent-id (list build-id)) - (datastore-remove-builds-from-plan datastore (list build-id)) - build-details) + (datastore-insert-to-allocated-builds + datastore + agent-id + build-id) + (build-coordinator-remove-build-from-allocation-plan + build-coordinator build-id) + `(,@build-details + (submit_outputs . null))) #f))) (define (allocate-several-builds agent-id count) @@ -1306,42 +1762,68 @@ (datastore-list-agent-builds datastore agent)) (start-count (length initially-allocated-builds)) - (target-count (or max-builds - (+ start-count - deprecated-requested-count)))) + (target-count max-builds)) (if (< start-count target-count) (let ((new-builds (allocate-several-builds agent (- target-count start-count)))) - (unless (null? new-builds) - (let ((allocation-plan-metric - (metrics-registry-fetch-metric - (slot-ref datastore 'metrics-registry) - "build_allocation_plan_total"))) - (for-each - (match-lambda - ((agent-id . count) - (metric-set allocation-plan-metric - count - #:label-values - `((agent_id . ,agent-id))))) - (datastore-count-build-allocation-plan-entries datastore)))) - - ;; Previously allocate builds just returned newly allocated - ;; builds, but if max-builds is provided, return all the - ;; builds. This means the agent can handle this in a idempotent - ;; manor. - (if max-builds - (append initially-allocated-builds - new-builds) - new-builds)) - ;; Previously allocate builds just returned newly allocated builds, - ;; but if max-builds is provided, return all the builds. This means - ;; the agent can handle this in a idempotent manor. - (if max-builds - initially-allocated-builds - '())))) - #:duration-metric-name "allocate_builds_to_agent")) + (if (null? new-builds) + (values initially-allocated-builds + #f) + (values (append initially-allocated-builds + new-builds) + #t))) + (values initially-allocated-builds + #f)))) + #:duration-metric-name "allocate_builds_to_agent" + #:duration-metric-buckets %command-duration-histogram-buckets)) + + (define (send-agent-builds-allocated-event builds) + (build-coordinator-send-event + build-coordinator + "agent-builds-allocated" + `((agent_id . ,agent) + (builds . ,(list->vector + (map + (lambda (build) + `(,@build + (tags + . ,(list->vector + (map + (match-lambda + ((key . value) + `((key . ,key) + (value . ,value)))) + (vector->list + (datastore-fetch-build-tags + datastore + (assq-ref build 'uuid)))))))) + builds)))))) + + (define (submit-outputs? build) + (with-exception-handler + (lambda (exn) + (log-msg (build-coordinator-logger build-coordinator) + 'CRITICAL + "build-submit-outputs hook raised exception") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (let ((hook-result + (call-with-delay-logging + (lambda () + (build-submit-outputs-hook + build-coordinator + (assq-ref build 'uuid)))))) + (if (boolean? hook-result) + hook-result + (begin + (log-msg + (build-coordinator-logger build-coordinator) + 'CRITICAL + "build-submit-outputs hook returned non boolean: " + hook-result) + #t)))))) (call-with-duration-metric (build-coordinator-metrics-registry build-coordinator) @@ -1357,111 +1839,42 @@ (trigger-build-allocation build-coordinator))) (let ((builds - (get-builds))) - - (build-coordinator-send-event - build-coordinator - "agent-builds-allocated" - `((agent_id . ,agent) - (builds . ,(list->vector - (map - (lambda (build) - `(,@build - (tags - . ,(list->vector - (map - (match-lambda - ((key . value) - `((key . ,key) - (value . ,value)))) - (vector->list - (datastore-fetch-build-tags - datastore - (assq-ref build 'uuid)))))))) - builds))))) - - (map (lambda (build) - (define submit-outputs? - (with-exception-handler - (lambda (exn) - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook raised exception: " - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (let ((hook-result - (call-with-delay-logging - (lambda () - (build-submit-outputs-hook - build-coordinator - (assq-ref build 'uuid)))))) - (if (boolean? hook-result) - hook-result - (begin - (log-msg - (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook returned non boolean: " - hook-result) - #t)))) - (lambda (key . args) - (backtrace)))) - #:unwind? #t)) - - `(,@build - ;; TODO This needs reconsidering when things having been built in - ;; the past doesn't necessarily mean they're still available. - (submit_outputs . ,submit-outputs?))) - builds))))))) + new-builds-allocated? + (if (= 0 + (build-coordinator-count-allocation-plan-builds + build-coordinator + agent)) + (values + (datastore-list-agent-builds datastore agent) + #f) + (get-builds)))) + + (when new-builds-allocated? + (send-agent-builds-allocated-event builds)) + + (map + (lambda (build) + (if (eq? 'null (assq-ref build 'submit_outputs)) + (let ((submit-outputs? (submit-outputs? build))) + (datastore-update-allocated-build-submit-outputs + (build-coordinator-datastore build-coordinator) + (assq-ref build 'uuid) + submit-outputs?) + + `(,@(alist-delete 'submit_outputs build) + (submit_outputs . ,submit-outputs?))) + build)) + builds))))))) (define (agent-details build-coordinator agent-id) (define datastore (build-coordinator-datastore build-coordinator)) - (define build-submit-outputs-hook - (assq-ref (build-coordinator-hooks build-coordinator) - 'build-submit-outputs)) - - (define (submit-outputs? build) - (with-exception-handler - (lambda (exn) - (log-msg (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook raised exception: " - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (let ((hook-result - (call-with-delay-logging - (lambda () - (build-submit-outputs-hook - build-coordinator - (assq-ref build 'uuid)))))) - (if (boolean? hook-result) - hook-result - (begin - (log-msg - (build-coordinator-logger build-coordinator) - 'CRITICAL - "build-submit-outputs hook returned non boolean: " - hook-result) - #t)))) - (lambda (key . args) - (backtrace)))) - #:unwind? #t)) - (let ((agent (datastore-find-agent datastore agent-id)) (allocated-builds (datastore-list-agent-builds datastore agent-id))) `(,@agent ; description - (builds . ,(list->vector - (map (lambda (build) - `(,@build - (submit_outputs . ,(submit-outputs? build)))) - allocated-builds)))))) + (builds . ,(list->vector allocated-builds))))) (define (build-data-location build-id ) (string-append (%config 'builds-dir) "/" @@ -1610,10 +2023,11 @@ (list build-id)) (when success? - (datastore-delete-relevant-outputs-from-unbuilt-outputs + (datastore-insert-background-job datastore - build-id) - (datastore-update-unprocessed-builds-for-build-success + 'build-success + (list build-id)) + (datastore-delete-relevant-outputs-from-unbuilt-outputs datastore build-id) (datastore-store-output-metadata @@ -1623,7 +2037,8 @@ (assoc-ref result-json "outputs")))) #f)))) - #:duration-metric-name "store_build_result"))) + #:duration-metric-name "store_build_result" + #:duration-metric-buckets %command-duration-histogram-buckets))) (when exception ;; Raise the exception here to avoid aborting the transaction (raise-exception exception))) @@ -1637,6 +2052,11 @@ 'build-success 'build-failure)) + (when success? + (build-coordinator-trigger-background-job-processing + build-coordinator + 'build-success)) + (build-coordinator-send-event build-coordinator (if success? @@ -1649,7 +2069,8 @@ ;; could change the allocation (trigger-build-allocation build-coordinator) - #t)))) + #t)) + #:buckets %command-duration-histogram-buckets)) (define (handle-build-start-report build-coordinator agent-id @@ -1668,7 +2089,8 @@ build-coordinator 'build-started `((build_id . ,build-id) - (agent_id . ,agent-id)))))) + (agent_id . ,agent-id)))) + #:buckets %command-duration-histogram-buckets)) (define (handle-setup-failure-report build-coordinator agent-id build-id report-json) @@ -1704,3 +2126,142 @@ ;; Trigger build allocation, so that the allocator can handle this setup ;; failure (trigger-build-allocation build-coordinator)) + +(define (build-coordinator-trigger-background-job-processing + build-coordinator + type) + (let ((condition + (assq-ref (build-coordinator-background-job-conditions + build-coordinator) + type))) + (unless condition + (error + (simple-format #f "unknown condition ~A" type))) + (signal-reusable-condition! condition))) + +(define (start-background-job-processing-fibers build-coordinator) + (define %background-job-duration-histogram-buckets + (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf))) + + (define* (start-job-fibers type proc #:key (parallelism 1)) + (let ((coordination-channel + (make-channel)) + (condition + (make-reusable-condition)) + (process-in-fiber + (fiberize + (lambda args + (call-with-duration-metric + (build-coordinator-metrics-registry build-coordinator) + "coordinator_background_job_duration_seconds" + (lambda () + (call-with-delay-logging proc #:args args)) + #:labels '(name) + #:label-values `((name . ,type)) + #:buckets %background-job-duration-histogram-buckets)) + #:parallelism parallelism)) + (job-exception-counter-metric + (make-counter-metric + (build-coordinator-metrics-registry build-coordinator) + "coordinator_background_job_failures_total" + #:labels '(name)))) + + (define (process id . args) + (spawn-fiber + (lambda () + (let loop ((retry 0)) + (log-msg (build-coordinator-logger build-coordinator) + 'DEBUG + "processing " type " background job (id: " + id ", args: " args ", retry: " retry ")") + (let ((success? + (with-exception-handler + (lambda (exn) + (log-msg (build-coordinator-logger build-coordinator) + 'WARN + type " background job error (id: " + id "): " exn) + #f) + (lambda () + (apply process-in-fiber args)) + #:unwind? #t))) + (if success? + (begin + (datastore-delete-background-job + (build-coordinator-datastore build-coordinator) + id) + (put-message coordination-channel + (list 'job-finished id))) + (begin + (metric-increment job-exception-counter-metric + #:label-values `((name . ,type))) + (sleep 30) + (loop (+ 1 retry))))))))) + + (spawn-fiber + (lambda () + (while #t + (let ((job-details + (datastore-select-background-jobs + (build-coordinator-datastore build-coordinator) + type + #:limit (* 2 parallelism)))) + + (unless (null? job-details) + (put-message coordination-channel + (list 'process job-details))) + + (reusable-condition-wait condition + #:timeout 30))))) + + (spawn-fiber + (lambda () + (let loop ((running-job-ids '())) + (match (get-message coordination-channel) + (('process jobs) + (let* ((job-ids (map (lambda (job) + (assq-ref job 'id)) + jobs)) + (new-ids + (lset-difference = job-ids running-job-ids)) + (jobs-to-start + (take new-ids + (min + (- parallelism + (length running-job-ids)) + (length new-ids))))) + (for-each (lambda (job) + (apply process + (assq-ref job 'id) + (assq-ref job 'args))) + (filter + (lambda (job-details) + (member (assq-ref job-details 'id) + jobs-to-start)) + jobs)) + (loop (append running-job-ids + jobs-to-start)))) + (('job-finished id) + ;; Maybe not very efficient, but should work + (signal-reusable-condition! condition) + (log-msg (build-coordinator-logger build-coordinator) + 'DEBUG type + " background job " id + " finished successfully") + (loop (delete id running-job-ids))))))) + + condition)) + + `((build-success . ,(start-job-fibers + 'build-success + (lambda (build-id) + (datastore-update-unprocessed-builds-for-build-success + (build-coordinator-datastore build-coordinator) + build-id)) + #:parallelism 24)))) + +(define (build-coordinator-check-and-correct-unprocessed-builds-all-inputs-built + build-coordinator) + (datastore-check-and-correct-unprocessed-builds-all-inputs-built + (build-coordinator-datastore build-coordinator) + #:progress-reporter progress-reporter/bar)) diff --git a/guix-build-coordinator/datastore.scm b/guix-build-coordinator/datastore.scm index a29a993..5768630 100644 --- a/guix-build-coordinator/datastore.scm +++ b/guix-build-coordinator/datastore.scm @@ -6,7 +6,8 @@ #:use-module (guix-build-coordinator datastore postgresql) #:duplicates (merge-generics) #:export (database-uri->datastore - datastore-find-build-output)) + datastore-find-build-output + datastore-validate-datetime-string)) (re-export datastore-optimize) (re-export datastore-spawn-fibers) @@ -68,7 +69,7 @@ (re-export datastore-count-builds-for-derivation) (re-export datastore-list-processed-builds) (re-export datastore-list-unprocessed-builds) -(re-export datastore-find-first-unallocated-deferred-build) +(re-export datastore-find-deferred-build) (re-export datastore-fetch-prioritised-unprocessed-builds) (re-export datastore-insert-unprocessed-hook-event) (re-export datastore-count-unprocessed-hook-events) @@ -86,29 +87,30 @@ (re-export datastore-list-builds-for-output) (re-export datastore-list-builds-for-output-and-system) (re-export datastore-agent-for-build) -(re-export datastore-count-build-allocation-plan-entries) -(re-export datastore-replace-build-allocation-plan) -(re-export datastore-remove-build-from-allocation-plan) (re-export datastore-count-allocated-builds) (re-export datastore-agent-requested-systems) (re-export datastore-update-agent-requested-systems) (re-export datastore-fetch-build-to-allocate) +(re-export datastore-check-if-derivation-conflicts?) (re-export datastore-insert-to-allocated-builds) -(re-export datastore-remove-builds-from-plan) -(re-export datastore-list-allocation-plan-builds) +(re-export datastore-update-allocated-build-submit-outputs) +(re-export datastore-insert-background-job) +(re-export datastore-delete-background-job) +(re-export datastore-select-background-jobs) +(re-export datastore-check-and-correct-unprocessed-builds-all-inputs-built) (define* (database-uri->datastore database #:key metrics-registry - worker-thread-log-exception?) + thread-pool-log-exception?) (cond ((string-prefix? "pg://" database) (postgresql-datastore database)) ((string-prefix? "sqlite://" database) (sqlite-datastore database #:metrics-registry metrics-registry - #:worker-thread-log-exception? - worker-thread-log-exception?)) + #:thread-pool-log-exception? + thread-pool-log-exception?)) (else (error (simple-format #f "Unknown database ~A" database))))) @@ -123,3 +125,6 @@ (assq-ref output 'output) #f)) outputs))) + +(define (datastore-validate-datetime-string s) + (strftime "%F %T" (car (strptime "%F %T" s)))) diff --git a/guix-build-coordinator/datastore/abstract.scm b/guix-build-coordinator/datastore/abstract.scm index 68fc654..799485e 100644 --- a/guix-build-coordinator/datastore/abstract.scm +++ b/guix-build-coordinator/datastore/abstract.scm @@ -9,8 +9,7 @@ datastore-new-agent datastore-new-agent-password datastore-list-agent-builds - datastore-agent-password-exists? - datastore-list-allocation-plan-builds)) + datastore-agent-password-exists?)) (define-class <abstract-datastore> ()) @@ -24,5 +23,3 @@ (define-generic datastore-agent-password-exists?) (define-generic datastore-agent-list-unprocessed-builds) (define-generic datastore-list-agent-builds) -(define-generic datastore-agent-replace-build-allocation-plan) -(define-generic datastore-list-allocation-plan-builds) diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index e67a940..bb1d8e8 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -10,8 +10,11 @@ #:use-module (ice-9 exceptions) #:use-module (sqlite3) #:use-module (fibers) + #:use-module (knots parallelism) + #:use-module (knots thread-pool) #:use-module (prometheus) #:use-module (guix base16) + #:use-module (guix progress) #:use-module (guix derivations) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator utils fibers) @@ -87,7 +90,7 @@ datastore-replace-agent-tags datastore-list-processed-builds datastore-list-unprocessed-builds - datastore-find-first-unallocated-deferred-build + datastore-find-deferred-build datastore-fetch-prioritised-unprocessed-builds datastore-insert-unprocessed-hook-event datastore-count-unprocessed-hook-events @@ -96,28 +99,37 @@ datastore-delete-unprocessed-hook-event datastore-list-agent-builds datastore-agent-for-build - datastore-count-build-allocation-plan-entries datastore-replace-build-allocation-plan - datastore-remove-build-from-allocation-plan datastore-count-allocated-builds datastore-agent-requested-systems datastore-update-agent-requested-systems datastore-fetch-build-to-allocate + datastore-check-if-derivation-conflicts? datastore-insert-to-allocated-builds - datastore-remove-builds-from-plan - datastore-list-allocation-plan-builds)) + datastore-update-allocated-build-submit-outputs + datastore-insert-background-job + datastore-delete-background-job + datastore-select-background-jobs + datastore-check-and-correct-unprocessed-builds-all-inputs-built)) + +(define %transaction-duration-histogram-buckets + (list 0.1 0.25 0.5 1 2.5 5 10 15 30 45 60 120 240 (inf))) (define-class <sqlite-datastore> (<abstract-datastore>) database-file - worker-reader-thread-channel - worker-writer-thread-channel + reader-thread-pool + writer-thread-pool + low-priority-writer-thread-channel + default-priority-writer-thread-channel + high-priority-writer-thread-channel + writer-thread-channel-queue-stats metrics-registry) (define* (sqlite-datastore database-uri #:key update-database? metrics-registry - worker-thread-log-exception?) + thread-pool-log-exception?) (define database-file (string-drop database-uri (string-length "sqlite://"))) @@ -132,7 +144,8 @@ (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA optimize;") - (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") + (with-time-logging "truncating the WAL" + (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")) (sqlite-close db)) (let ((datastore (make <sqlite-datastore>))) @@ -140,141 +153,164 @@ (slot-set! datastore 'database-file database-file) (slot-set! datastore 'metrics-registry metrics-registry) - (slot-set! - datastore - 'worker-writer-thread-channel - (make-worker-thread-channel - (lambda () - (let ((db - (db-open database-file))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - (sqlite-exec db "PRAGMA synchronous = NORMAL;") - (sqlite-exec db "PRAGMA temp_store = MEMORY;") - (sqlite-exec db "PRAGMA foreign_keys = ON;") - - (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;") - - (sqlite-exec - db - " -CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( - build_id INTEGER NOT NULL, - agent_id TEXT NOT NULL, - ordering INTEGER NOT NULL, - PRIMARY KEY (agent_id, build_id) -);") - - (list db))) - #:name "ds write" - #:destructor - (let ((writer-thread-destructor-counter - (make-gauge-metric metrics-registry - "datastore_writer_thread_close_total"))) - (lambda (db) - (db-optimize db - database-file - metrics-registry - #:maybe-truncate-wal? #f) - - (metric-increment writer-thread-destructor-counter) - (sqlite-close db))) - #:lifetime 500 - #:expire-on-exception? #t - - ;; SQLite doesn't support parallel writes - #:parallelism 1 - #:delay-logger (let ((delay-metric - (make-histogram-metric - metrics-registry - "datastore_write_delay_seconds"))) - (lambda (seconds-delayed) - (metric-observe delay-metric - ;; TODO exact->inexact to work around - ;; a bug in guile-prometheus where - ;; the metric sum will output in the - ;; exact form including the /q - (exact->inexact seconds-delayed)) - (log-delay "datastore write" seconds-delayed) - (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database write delayed by ~1,2f seconds~%" - seconds-delayed)))) - #:duration-logger - (lambda (duration proc) - (when (> duration 10) - (format - (current-error-port) - "warning: database write took ~1,2f seconds (~a)~%" - duration - proc))) - #:log-exception? worker-thread-log-exception?)) - - ;; Make sure the worker thread has initialised, and created the in memory - ;; tables - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) - (const #t)) - - (slot-set! - datastore - 'worker-reader-thread-channel - (make-worker-thread-channel - (lambda () - (let ((db - (db-open database-file #:write? #f))) - (sqlite-exec db "PRAGMA temp_store = MEMORY;") - (sqlite-exec db "PRAGMA busy_timeout = 4000;") - (sqlite-exec db "PRAGMA cache_size = -16000;") - - (sqlite-exec db "ATTACH DATABASE 'file:/mem?vfs=memdb' AS mem;") - - (list db))) - #:name "ds read" - #:destructor - (let ((reader-thread-destructor-counter - (make-gauge-metric metrics-registry - "datastore_reader_thread_close_total"))) - (lambda (db) - (metric-increment reader-thread-destructor-counter) - (sqlite-close db))) - #:lifetime 50000 - #:expire-on-exception? #t - - ;; Use a minimum of 8 and a maximum of 16 threads - #:parallelism - (min (max (current-processor-count) - 8) - 16) - #:delay-logger (let ((delay-metric - (make-histogram-metric + (let ((writer-thread-pool + (make-thread-pool + ;; SQLite doesn't support parallel writes + 1 + #:thread-initializer + (lambda () + (let ((db + (db-open database-file))) + (sqlite-exec db "PRAGMA busy_timeout = 0;") + (sqlite-exec db "PRAGMA synchronous = NORMAL;") + (sqlite-exec db "PRAGMA temp_store = MEMORY;") + (sqlite-exec db "PRAGMA foreign_keys = ON;") + + (list db))) + #:name "ds write" + #:thread-destructor + (let ((writer-thread-destructor-counter + (make-gauge-metric metrics-registry + "datastore_writer_thread_close_total"))) + (lambda (db) + (db-optimize db + database-file metrics-registry - "datastore_read_delay_seconds"))) - (lambda (seconds-delayed) - (metric-observe delay-metric - ;; TODO exact->inexact to work around - ;; a bug in guile-prometheus where - ;; the metric sum will output in the - ;; exact form including the /q - (exact->inexact seconds-delayed)) - (log-delay "datastore read" seconds-delayed) - (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database read delayed by ~1,2f seconds~%" - seconds-delayed)))) - #:duration-logger - (lambda (duration proc) - (when (> duration 30) - (format - (current-error-port) - "warning: database read took ~1,2f seconds (~a)~%" - duration - proc))) - #:log-exception? worker-thread-log-exception?)) + #:maybe-truncate-wal? #f) + + (metric-increment writer-thread-destructor-counter) + (sqlite-close db))) + #:thread-lifetime 500 + #:expire-on-exception? #t + + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "datastore_write_delay_seconds" + #:buckets + %transaction-duration-histogram-buckets))) + (lambda (seconds-delayed proc) + (metric-observe delay-metric + ;; TODO exact->inexact to work around + ;; a bug in guile-prometheus where + ;; the metric sum will output in the + ;; exact form including the /q + (exact->inexact seconds-delayed)) + (log-delay "datastore write" seconds-delayed) + (when (> seconds-delayed 1) + (format/safe + (current-error-port) + "warning: database write delayed by ~1,2f seconds~%" + seconds-delayed)))) + #:duration-logger + (lambda (duration proc) + (when (> duration 10) + (format/safe + (current-error-port) + "warning: database write took ~1,2f seconds (~a)~%" + duration + proc))) + #:log-exception? thread-pool-log-exception?))) + + (slot-set! datastore + 'writer-thread-pool + writer-thread-pool) + ;; This is changed in datastore-spawn-fibers + (slot-set! datastore + 'low-priority-writer-thread-channel + (thread-pool-channel writer-thread-pool)) + (slot-set! datastore + 'default-priority-writer-thread-channel + (thread-pool-channel writer-thread-pool)) + (slot-set! datastore + 'high-priority-writer-thread-channel + (thread-pool-channel writer-thread-pool))) + + (let ((reader-thread-pool + (make-thread-pool + ;; Use a minimum of 8 and a maximum of 12 threads + (min (max (current-processor-count) + 8) + 12) + #:thread-initializer + (lambda () + (let ((db + (db-open database-file #:write? #f))) + (sqlite-exec db "PRAGMA temp_store = MEMORY;") + (sqlite-exec db "PRAGMA busy_timeout = 0;") + (sqlite-exec db "PRAGMA cache_size = -16000;") + + (list db))) + #:thread-destructor + (let ((reader-thread-destructor-counter + (make-gauge-metric metrics-registry + "datastore_reader_thread_close_total"))) + (lambda (db) + (metric-increment reader-thread-destructor-counter) + (sqlite-close db))) + #:name "ds read" + #:thread-lifetime 50000 + #:expire-on-exception? #t + + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "datastore_read_delay_seconds" + #:buckets + %transaction-duration-histogram-buckets))) + (lambda (seconds-delayed proc) + (metric-observe delay-metric + ;; TODO exact->inexact to work around + ;; a bug in guile-prometheus where + ;; the metric sum will output in the + ;; exact form including the /q + (exact->inexact seconds-delayed)) + (log-delay "datastore read" seconds-delayed) + (when (> seconds-delayed 1) + (format/safe + (current-error-port) + "warning: database read delayed by ~1,2f seconds~%" + seconds-delayed)))) + #:duration-logger + (lambda (duration proc) + (when (> duration 30) + (format/safe + (current-error-port) + "warning: database read took ~1,2f seconds (~a)~%" + duration + proc))) + #:log-exception? thread-pool-log-exception?))) + + (let ((metric (make-gauge-metric + metrics-registry + "datastore_reader_threads_total"))) + (metric-set metric + (vector-length + (thread-pool-proc-vector reader-thread-pool)))) + + + (slot-set! datastore + 'reader-thread-pool + reader-thread-pool)) datastore)) +(define* (call-with-writer-thread + datastore + proc + #:key priority duration-logger) + (call-with-thread + (slot-ref datastore 'writer-thread-pool) + proc + #:duration-logger duration-logger + #:channel + (slot-ref datastore + (assq-ref + '((high . high-priority-writer-thread-channel) + (default . default-priority-writer-thread-channel) + (low . low-priority-writer-thread-channel)) + (or priority 'default))))) + (define (sqlite-step-and-reset statement) (let ((val (sqlite-step statement))) (sqlite-reset statement) @@ -305,11 +341,6 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( metrics-registry checkpoint-duration-metric-name (lambda () - (if (> (wal-size) extreme-wal-size-threshold) - ;; Since the WAL is really getting too big, wait for much longer - (sqlite-exec db "PRAGMA busy_timeout = 300000;") - (sqlite-exec db "PRAGMA busy_timeout = 20;")) - (let* ((statement (sqlite-prepare db @@ -320,19 +351,17 @@ CREATE TABLE IF NOT EXISTS mem.build_allocation_plan ( (#(blocked? modified-page-count pages-moved-to-db) (if (= blocked? 1) (begin - (simple-format + (simple-format/safe (current-error-port) "warning: wal checkpoint blocked\n") #f) (begin - (simple-format + (simple-format/safe (current-error-port) "wal checkpoint completed (~A, ~A)\n" modified-page-count pages-moved-to-db) #t)))))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - result))) #t)) @@ -348,8 +377,8 @@ PRAGMA optimize;") (define-method (datastore-optimize (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (db-optimize db @@ -358,15 +387,49 @@ PRAGMA optimize;") (define-method (datastore-spawn-fibers (datastore <sqlite-datastore>)) + ;; Queue messages to the writer thread, so that they're handled in a first + ;; come first served manor + (let ((queues + queue-stats + (make-discrete-priority-queueing-channels + (thread-pool-channel + (slot-ref datastore 'writer-thread-pool)) + 3))) + (match queues + ((high default low) + (slot-set! datastore 'high-priority-writer-thread-channel high) + (slot-set! datastore 'default-priority-writer-thread-channel default) + (slot-set! datastore 'low-priority-writer-thread-channel low))) + (slot-set! datastore + 'writer-thread-channel-queue-stats + queue-stats)) + + (spawn-fiber + (lambda () + (while #t + (sleep 20) + (let ((procs + (vector->list + (thread-pool-proc-vector + (slot-ref datastore 'reader-thread-pool))))) + (when (every procedure? procs) + (for-each + (lambda (i proc) + (simple-format/safe (current-error-port) + "reader thread ~A running: ~A\n" + i proc)) + (iota (length procs)) + procs)))))) + (spawn-fiber (lambda () (while #t (sleep (* 60 10)) ; 10 minutes (with-exception-handler (lambda (exn) - (simple-format (current-error-port) - "exception when performing WAL checkpoint: ~A\n" - exn)) + (simple-format/safe (current-error-port) + "exception when performing WAL checkpoint: ~A\n" + exn)) (lambda () (with-time-logging "performing regular database maintenance" @@ -391,11 +454,18 @@ PRAGMA optimize;") (let ((setup-failures-total (make-gauge-metric registry "setup_failures_total" - #:labels '(agent_id reason)))) - - (letpar& ((setup-failure-counts - (with-time-logging "counting setup failures" - (datastore-count-setup-failures datastore)))) + #:labels '(agent_id reason))) + (background-jobs-inserted-total + (make-counter-metric registry + "coordinator_background_job_inserted_total" + #:labels '(name)))) + + (fibers-let ((setup-failure-counts + (with-time-logging "counting setup failures" + (datastore-count-setup-failures datastore))) + (background-job-counts + (with-time-logging "counting background jobs" + (datastore-count-background-jobs datastore)))) (for-each (match-lambda (((agent-id reason) . count) @@ -404,7 +474,14 @@ PRAGMA optimize;") #:label-values `((agent_id . ,agent-id) (reason . ,reason))))) - setup-failure-counts))) + setup-failure-counts) + + (for-each (match-lambda + ((type . count) + (metric-increment background-jobs-inserted-total + #:by count + #:label-values `((name . ,type))))) + background-job-counts))) #t) (define-method (datastore-update-metrics! @@ -437,12 +514,38 @@ PRAGMA optimize;") "datastore_wal_bytes") (make-gauge-metric registry "datastore_wal_bytes" - #:docstring "Size of the SQLite Write Ahead Log file")))) + #:docstring "Size of the SQLite Write Ahead Log file"))) + (reader-threads-used-metric + (or (metrics-registry-fetch-metric registry + "datastore_reader_threads_used_total") + (make-gauge-metric + registry "datastore_reader_threads_used_total"))) + (writer-queue-length-metric + (or (metrics-registry-fetch-metric registry + "datastore_writer_queue_total") + (make-gauge-metric + registry "datastore_writer_queue_total" + #:labels '(priority))))) - (letpar& ((build-counts - (datastore-count-builds datastore)) - (build-result-counts - (datastore-count-build-results datastore))) + (metric-set reader-threads-used-metric + (vector-count + (lambda (_ proc) + (->bool proc)) + (thread-pool-proc-vector + (slot-ref datastore 'reader-thread-pool)))) + + (for-each + (lambda (priority stats) + (metric-set writer-queue-length-metric + (assq-ref stats 'length) + #:label-values `((priority . ,priority)))) + '(high default low) + ((slot-ref datastore 'writer-thread-channel-queue-stats))) + + (fibers-let ((build-counts + (datastore-count-builds datastore)) + (build-result-counts + (datastore-count-build-results datastore))) (for-each (match-lambda ((system . count) (metric-set builds-total @@ -475,9 +578,10 @@ PRAGMA optimize;") internal-time-units-per-second)) (apply values vals))))) -(define (metric-observe-duration datastore - thing - duration-seconds) +(define* (metric-observe-duration datastore + thing + duration-seconds + #:key (buckets %default-histogram-buckets)) (define registry (slot-ref datastore 'metrics-registry)) (define metric-name (string-append "datastore_" thing "_duration_seconds")) @@ -485,15 +589,25 @@ PRAGMA optimize;") (let ((metric (or (metrics-registry-fetch-metric registry metric-name) (make-histogram-metric registry - metric-name)))) + metric-name + #:buckets buckets)))) (metric-observe metric duration-seconds))) -(define (call-with-worker-thread/delay-logging channel proc) - (call-with-worker-thread channel +(define (call-with-thread/delay-logging thread-pool proc) + (call-with-thread thread-pool + proc + #:duration-logger + (lambda (duration) + (log-delay proc duration)))) + +(define* (call-with-writer-thread/delay-logging datastore proc + #:key priority) + (call-with-writer-thread datastore proc #:duration-logger (lambda (duration) - (log-delay proc duration)))) + (log-delay proc duration)) + #:priority priority)) (define-exception-type &transaction-rollback-exception &exception make-transaction-rollback-exception @@ -507,21 +621,24 @@ PRAGMA optimize;") #:key readonly? (immediate? (not readonly?)) - duration-metric-name) + priority + duration-metric-name + (duration-metric-buckets + %transaction-duration-histogram-buckets)) (define (run-proc-within-transaction db) (define (attempt-begin) (with-exception-handler (lambda (exn) (match (exception-args exn) (('sqlite-exec 5 msg) - (simple-format + (simple-format/safe (current-error-port) "warning: issue starting transaction (code: 5, proc: ~A): ~A\n" proc msg) #f) (_ - (simple-format (current-error-port) - "exception starting transaction\n") + (simple-format/safe (current-error-port) + "exception starting transaction\n") (raise-exception exn)))) (lambda () (sqlite-exec db (if immediate? @@ -535,14 +652,14 @@ PRAGMA optimize;") (lambda (exn) (match (exception-args exn) (('sqlite-exec 5 msg) - (simple-format + (simple-format/safe (current-error-port) "warning: attempt commit (code: 5, proc: ~A): ~A\n" proc msg) #f) (_ - (simple-format (current-error-port) - "exception committing transaction\n") + (simple-format/safe (current-error-port) + "exception committing transaction\n") (raise-exception exn)))) (lambda () (sqlite-exec db "COMMIT TRANSACTION;") @@ -550,63 +667,88 @@ PRAGMA optimize;") #:unwind? #t)) (if (attempt-begin) - (call-with-values - (lambda () - (with-exception-handler - (lambda (exn) - (if (transaction-rollback-exception? exn) - (begin - (sqlite-exec db "ROLLBACK TRANSACTION;") - (transaction-rollback-exception-return-value exn)) - (begin - (simple-format (current-error-port) - "error: sqlite rolling back transaction (~A)\n" - exn) - (sqlite-exec db "ROLLBACK TRANSACTION;") - (raise-exception exn)))) + (with-exception-handler + (lambda (exn) + (if (transaction-rollback-exception? exn) + (begin + (sqlite-exec db "ROLLBACK TRANSACTION;") + (transaction-rollback-exception-return-value exn)) + (begin + (simple-format/safe + (current-error-port) + "error: sqlite rolling back transaction (~A)\n" + exn) + (sqlite-exec db "ROLLBACK TRANSACTION;") + (raise-exception exn)))) + (lambda () + (call-with-values (lambda () - (parameterize ((%current-transaction-proc proc)) - (call-with-delay-logging proc #:args (list db)))) - #:unwind? #t)) - (lambda vals - (let loop ((success? (attempt-commit))) - (if success? - (apply values vals) - (loop (attempt-commit)))))) + (with-exception-handler + (lambda (exn) + (unless (transaction-rollback-exception? exn) + (backtrace)) + (raise-exception exn)) + (lambda () + (parameterize ((%current-transaction-proc proc) + ;; Set the arguments parameter for the + ;; reader thread pool so that any nested + ;; calls to call-with-thread for the + ;; reader thread pool just use the writer + ;; db connection and thus this + ;; transaction + ((thread-pool-arguments-parameter + (slot-ref datastore 'reader-thread-pool)) + (list db))) + (call-with-delay-logging proc #:args (list db)))))) + (lambda vals + (let loop ((success? (attempt-commit))) + (if success? + (apply values vals) + (loop (attempt-commit))))))) + #:unwind? #t) ;; Database is busy, so retry (run-proc-within-transaction db))) - (call-with-worker-thread + (call-with-thread (slot-ref datastore (if readonly? - 'worker-reader-thread-channel - 'worker-writer-thread-channel)) + 'reader-thread-pool + 'writer-thread-pool)) (lambda (db) (if (%current-transaction-proc) (call-with-delay-logging proc #:args (list db)) ; already in transaction (run-proc-within-transaction db))) + #:channel + (if readonly? + (thread-pool-channel + (slot-ref datastore 'reader-thread-pool)) + (slot-ref datastore + (assq-ref + '((high . high-priority-writer-thread-channel) + (default . default-priority-writer-thread-channel) + (low . low-priority-writer-thread-channel)) + (or priority 'default)))) #:duration-logger (lambda (duration-seconds) (when (and (not readonly?) (> duration-seconds 2)) - (display - (format - #f - "warning: ~a:\n took ~4f seconds in transaction\n" - proc - duration-seconds) - (current-error-port)) - - (when duration-metric-name - (metric-observe-duration datastore - duration-metric-name - duration-seconds)))))) + (format/safe + (current-error-port) + "warning: ~a:\n took ~4f seconds in transaction\n" + proc + duration-seconds)) + + (when duration-metric-name + (metric-observe-duration datastore + duration-metric-name + duration-seconds + #:buckets duration-metric-buckets))))) (define-method (datastore-find-agent (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -634,8 +776,8 @@ SELECT description FROM agents WHERE id = :id" (define-method (datastore-find-agent-by-name (datastore <sqlite-datastore>) name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -662,8 +804,8 @@ SELECT id FROM agents WHERE name = :name" (define-method (datastore-insert-dynamic-auth-token (datastore <sqlite-datastore>) token) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -681,8 +823,8 @@ INSERT INTO dynamic_auth_tokens (token) VALUES (:token)" (define-method (datastore-dynamic-auth-token-exists? (datastore <sqlite-datastore>) token) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -709,8 +851,8 @@ SELECT 1 FROM dynamic_auth_tokens WHERE token = :token" (define-method (datastore-fetch-agent-tags (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -746,16 +888,16 @@ WHERE agent_tags.agent_id = :agent_id" uuid name description) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (insert-agent db uuid name description))) #t) (define-method (datastore-list-agents (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -783,8 +925,8 @@ SELECT id, name, description, active FROM agents ORDER BY id" (unless (boolean? active?) (error "datastore-set-agent-active called with non-boolean")) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -803,8 +945,8 @@ UPDATE agents SET active = :active WHERE id = :uuid" (define-method (datastore-find-agent-status (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -838,8 +980,8 @@ WHERE agent_id = :agent_id" 1min-load-average system-uptime processor-count) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -875,8 +1017,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr (datastore <sqlite-datastore>) agent-uuid password) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (insert-agent-password db agent-uuid password))) #t) @@ -885,8 +1027,8 @@ INSERT INTO agent_status (agent_id, status, load_average_1min, system_uptime, pr (datastore <sqlite-datastore>) uuid password) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -908,8 +1050,8 @@ WHERE agent_id = :agent_id AND password = :password" (define-method (datastore-agent-list-passwords (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1006,6 +1148,7 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)" (lambda (db) (insert-derivation-and-return-outputs db derivation) (hash-clear! %derivation-outputs-cache)) + #:priority 'low #:duration-metric-name "store_derivation") #t) @@ -1013,8 +1156,8 @@ INSERT INTO agent_tags (agent_id, tag_id) VALUES (:agent_id, :tag_id)" (define-method (datastore-build-exists-for-derivation-outputs? (datastore <sqlite-datastore>) derivation) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1046,8 +1189,8 @@ WHERE derivation_outputs.derivation_id = :derivation_id (define-method (datastore-build-required-by-another? (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1141,8 +1284,8 @@ SELECT name FROM derivations WHERE id = :id" (match (sqlite-step-and-reset statement) (#(name) name)))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let loop ((derivation-ids (list (db-find-derivation-id db derivation))) (result '())) @@ -1167,8 +1310,8 @@ SELECT name FROM derivations WHERE id = :id" args) (apply (lambda* (system #:key include-cancelled?) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1256,8 +1399,8 @@ FROM ( (define-method (datastore-list-builds-for-derivation-recursive-inputs (datastore <sqlite-datastore>) derivation) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1294,8 +1437,8 @@ INNER JOIN related_derivations (define-method (datastore-find-unprocessed-build-entry (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1324,8 +1467,8 @@ WHERE build_id = :build_id" (datastore <sqlite-datastore>) build-uuid tags) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((insert-tag-statement (sqlite-prepare @@ -1525,8 +1668,8 @@ WHERE build_id = :build_id" uuid explicit-priority-lower-bound) (define builds-to-consider - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) ;; Recursively find builds for all missing outputs that this build ;; takes as inputs. The order is important here, since we want to @@ -1676,8 +1819,8 @@ WHERE build_id = :build_id" override-derived-priority) (let ((build-id old-priority - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((build-id (db-find-build-id db uuid))) @@ -1734,46 +1877,10 @@ WHERE build_id = :build_id" #t) rest)) -(define-method (datastore-remove-build-from-allocation-plan - (datastore <sqlite-datastore>) - uuid) - (define (update-build-allocation-plan-metrics) - (let ((allocation-plan-metric - (metrics-registry-fetch-metric - (slot-ref datastore 'metrics-registry) - "build_allocation_plan_total"))) - (for-each - (match-lambda - ((agent-id . count) - (metric-set allocation-plan-metric - count - #:label-values - `((agent_id . ,agent-id))))) - (datastore-count-build-allocation-plan-entries datastore)))) - - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (let ((statement (sqlite-prepare - db - " -DELETE FROM build_allocation_plan WHERE build_id = :build_id" - #:cache? #t))) - - (sqlite-bind-arguments - statement - #:build_id (db-find-build-id db uuid)) - - (sqlite-step-and-reset statement) - - (unless (= 0 (changes-count db)) - (update-build-allocation-plan-metrics))))) - #t) - (define-method (datastore-count-build-results (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -1831,8 +1938,8 @@ VALUES (:agent_id, :result, 1)" #t)))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -1883,14 +1990,12 @@ LIMIT 1" (#f #t) (#(1) #f)))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (let ((builds-statement - (sqlite-prepare - db - " -SELECT DISTINCT unprocessed_builds.id + (define (all-build-ids db) + (let ((statement + (sqlite-prepare + db + " +SELECT input_builds.id FROM builds INNER JOIN derivation_outputs ON builds.derivation_id = derivation_outputs.derivation_id @@ -1898,45 +2003,62 @@ INNER JOIN derivation_outputs AS all_derivation_outputs ON all_derivation_outputs.output_id = derivation_outputs.output_id INNER JOIN derivation_inputs ON derivation_inputs.derivation_output_id = all_derivation_outputs.id -INNER JOIN builds AS unprocessed_builds - ON unprocessed_builds.processed = 0 - AND unprocessed_builds.derivation_id = derivation_inputs.derivation_id -INNER JOIN unprocessed_builds_with_derived_priorities - ON unprocessed_builds_with_derived_priorities.build_id = unprocessed_builds.id - AND unprocessed_builds_with_derived_priorities.all_inputs_built = 0 +INNER JOIN builds AS input_builds + ON input_builds.processed = 0 + AND input_builds.canceled = 0 + AND input_builds.derivation_id = derivation_inputs.derivation_id WHERE builds.id = :build_id" - #:cache? #t)) + #:cache? #t))) - (update-statement - (sqlite-prepare - db - " + (sqlite-bind-arguments + statement + #:build_id (db-find-build-id db build-uuid)) + + (let ((result + (sqlite-fold + (lambda (row result) + (match row + (#(build-id) + (if (all-inputs-built? db build-id) + (cons build-id result) + result)))) + '() + statement))) + (sqlite-reset statement) + + result))) + + (let ((build-ids + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (all-build-ids db))))) + (call-with-writer-thread/delay-logging + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " UPDATE unprocessed_builds_with_derived_priorities SET all_inputs_built = 1 WHERE build_id = :build_id" - #:cache? #t))) + #:cache? #t))) - (sqlite-bind-arguments builds-statement - #:build_id (db-find-build-id db build-uuid)) - - (sqlite-fold - (lambda (row result) - (match row - (#(build-id) - (when (all-inputs-built? db build-id) - (sqlite-bind-arguments update-statement - #:build_id build-id) - (sqlite-step-and-reset update-statement)))) - #f) - #f - builds-statement) - (sqlite-reset builds-statement))))) + (for-each + (lambda (build-id) + (sqlite-bind-arguments statement + #:build_id build-id) + (sqlite-step-and-reset statement)) + build-ids) + + #t))))) (define-method (datastore-remove-build-allocation (datastore <sqlite-datastore>) build-uuid agent-id) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -1958,8 +2080,8 @@ DELETE FROM allocated_builds (define-method (datastore-mark-build-as-processed (datastore <sqlite-datastore>) build-uuid end-time) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -1995,8 +2117,8 @@ DELETE FROM unprocessed_builds_with_derived_priorities (define-method (datastore-delete-relevant-outputs-from-unbuilt-outputs (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -2023,8 +2145,8 @@ WHERE output_id IN ( (datastore <sqlite-datastore>) build-uuid output-metadata) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (define (name->output-id name) (let ((statement @@ -2101,8 +2223,8 @@ INSERT INTO build_starts ( (define-method (datastore-find-build-starts (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2211,8 +2333,8 @@ INSERT INTO setup_failure_missing_inputs ( (define-method (datastore-list-setup-failure-missing-inputs (datastore <sqlite-datastore>) setup-failure-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2240,8 +2362,8 @@ WHERE setup_failure_id = :id" build-uuid agent-id failure-reason) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (insert-setup-failure-and-remove-allocation db (db-find-build-id db build-uuid) @@ -2258,8 +2380,8 @@ WHERE setup_failure_id = :id" (define-method (datastore-count-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2282,8 +2404,8 @@ FROM builds_counts" (define-method (datastore-for-each-build (datastore <sqlite-datastore>) proc) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2322,8 +2444,8 @@ SELECT id FROM builds WHERE uuid = :uuid" (define-method (datastore-find-build (datastore <sqlite-datastore>) uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2381,12 +2503,14 @@ WHERE uuid = :uuid" (canceled 'unset) (priority-> 'unset) (priority-< 'unset) + (created-at-> 'unset) + (created-at-< 'unset) (after-id #f) (limit #f) ;; other-builds-dependent or no-dependent-builds (relationship 'unset)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (define tag->expression (let ((statement @@ -2399,7 +2523,7 @@ SELECT id FROM tags WHERE key = :key AND value = :value" (sqlite-prepare db " -SELECT id FROM tags WHERE key = :key" +SELECT 1 FROM tags WHERE key = :key LIMIT 1" #:cache? #t))) (lambda (tag not?) (match tag @@ -2420,28 +2544,27 @@ SELECT id FROM tags WHERE key = :key" (sqlite-bind-arguments key-statement #:key key) - (let* ((tag-ids (sqlite-map - (match-lambda - (#(id) id)) - key-statement)) - (result - (string-append - "(" - (string-join - (map - (lambda (id) + (let ((tag-with-key-exists? + (->bool (sqlite-step-and-reset key-statement)))) + (if tag-with-key-exists? + (let ((result (string-append + "(" (if not? "NOT " "") - "EXISTS (SELECT 1 FROM build_tags " - "WHERE build_id = builds.id AND tag_id = " - (number->string id) - ")")) - tag-ids) - (if not? " AND " " OR ")) - ")"))) - (sqlite-reset key-statement) - - result)))))) + " +EXISTS ( + SELECT 1 + FROM build_tags + INNER JOIN tags ON build_tags.tag_id = tags.id + WHERE build_id = builds.id + AND tags.key = '" + key "' +)" + ")"))) + result) + (if not? + "TRUE" + "FALSE")))))))) (let ((tag-expressions (map (lambda (tag) @@ -2463,6 +2586,8 @@ SELECT id FROM tags WHERE key = :key" (not (null? not-systems)) (not (eq? priority-> 'unset)) (not (eq? priority-< 'unset)) + (not (eq? created-at-> 'unset)) + (not (eq? created-at-< 'unset)) (not (eq? processed 'unset)) (not (eq? canceled 'unset)) (not (eq? relationship 'unset)) @@ -2513,6 +2638,14 @@ INNER JOIN derivations (list (simple-format #f "priority < ~A" priority-<)) '()) + (if (string? created-at->) + (list + (simple-format #f "created_at > '~A'" created-at->)) + '()) + (if (string? created-at-<) + (list + (simple-format #f "created_at < '~A'" created-at-<)) + '()) (cond ((eq? processed #t) '("processed = 1")) ((eq? processed #f) '("processed = 0")) @@ -2616,8 +2749,8 @@ WHERE derivation_outputs.derivation_id = builds.derivation_id)")) (define-method (datastore-fetch-build-tags (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2651,8 +2784,8 @@ WHERE build_tags.build_id = :build_id" (define-method (datastore-find-build-result (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2677,8 +2810,8 @@ WHERE build_id = :build_id" (define-method (datastore-find-build-derivation-system (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2714,8 +2847,8 @@ WHERE builds.id = :build_id" datastore "list_builds_for_output" (lambda () - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2763,8 +2896,8 @@ WHERE derivation_outputs.output_id = :output_id" rest) (apply (lambda* (output system #:key include-canceled?) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2808,8 +2941,8 @@ WHERE derivation_outputs.output_id = :output_id rest) (apply (lambda* (derivation #:key (include-canceled? #t)) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2844,8 +2977,8 @@ WHERE derivations.name = :derivation" (define-method (datastore-count-setup-failures (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2869,8 +3002,8 @@ GROUP BY agent_id, failure_reason" (define-method (datastore-list-setup-failures-for-build (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2902,8 +3035,8 @@ WHERE build_id = :build_id" args) (apply (lambda* (#:key agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2954,8 +3087,8 @@ WHERE builds.processed = 0 (define-method (datastore-list-processed-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -2981,8 +3114,8 @@ WHERE processed = 1" (define-method (datastore-list-unprocessed-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3016,10 +3149,11 @@ ORDER BY priority DESC" builds))))) -(define-method (datastore-find-first-unallocated-deferred-build - (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) +(define-method (datastore-find-deferred-build + (datastore <sqlite-datastore>) + select?) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3032,25 +3166,29 @@ INNER JOIN derivations WHERE processed = 0 AND canceled = 0 AND deferred_until IS NOT NULL - AND builds.id NOT IN (SELECT build_id FROM build_allocation_plan) -ORDER BY deferred_until ASC -LIMIT 1" +ORDER BY deferred_until ASC" #:cache? #t))) - (match (sqlite-step-and-reset statement) - (#(uuid derivation_name priority created_at deferred_until) - `((uuid . ,uuid) - (derivation-name . ,derivation_name) - (priority . ,priority) - (created-at . ,(if (string? created_at) - (string->date created_at - "~Y-~m-~d ~H:~M:~S") - #f)) - (deferred-until . ,(if (string? deferred_until) - (string->date deferred_until - "~Y-~m-~d ~H:~M:~S") - #f)))) - (#f #f)))))) + (let loop ((row (sqlite-step statement))) + (match row + (#(uuid derivation_name priority created_at deferred_until) + (let ((res + (select? + `((uuid . ,uuid) + (derivation-name . ,derivation_name) + (priority . ,priority) + (created-at . ,(if (string? created_at) + (string->date created_at + "~Y-~m-~d ~H:~M:~S") + #f)) + (deferred-until . ,(if (string? deferred_until) + (string->date deferred_until + "~Y-~m-~d ~H:~M:~S") + #f)))))) + (if res + res + (loop (sqlite-step statement))))) + (#f #f))))))) (define-method (datastore-fetch-prioritised-unprocessed-builds (datastore <sqlite-datastore>)) @@ -3059,9 +3197,11 @@ LIMIT 1" (sqlite-prepare db " -SELECT builds.uuid +SELECT builds.uuid, systems.system FROM unprocessed_builds_with_derived_priorities INNER JOIN builds ON build_id = builds.id +INNER JOIN derivations ON builds.derivation_id = derivations.id +INNER JOIN systems ON derivations.system_id = systems.id WHERE all_inputs_built = 1 AND NOT EXISTS ( SELECT 1 @@ -3071,26 +3211,21 @@ WHERE all_inputs_built = 1 ) ORDER BY derived_priority ASC" #:cache? #t))) - (let ((result (sqlite-fold - (lambda (row result) - (cons (vector-ref row 0) - result)) - '() - statement))) + (let ((result (sqlite-fold cons '() statement))) (sqlite-reset statement) result))) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) fetch-prioritised-unprocessed-builds)) (define-method (datastore-insert-unprocessed-hook-event (datastore <sqlite-datastore>) event arguments) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (insert-unprocessed-hook-event db event @@ -3120,8 +3255,8 @@ VALUES (:event, :arguments)" (define-method (datastore-count-unprocessed-hook-events (datastore <sqlite-datastore>)) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3144,8 +3279,8 @@ SELECT event, COUNT(*) FROM unprocessed_hook_events GROUP BY event" (datastore <sqlite-datastore>) event limit) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3178,8 +3313,8 @@ LIMIT :limit" (define-method (datastore-find-unprocessed-hook-event (datastore <sqlite-datastore>) id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3205,8 +3340,8 @@ WHERE id = :id" (define-method (datastore-delete-unprocessed-hook-event (datastore <sqlite-datastore>) id) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread + datastore (lambda (db) (let ((statement (sqlite-prepare @@ -3219,110 +3354,14 @@ DELETE FROM unprocessed_hook_events WHERE id = :id" statement #:id id) - (sqlite-step-and-reset statement)))) - #t) - -(define-method (datastore-count-build-allocation-plan-entries - (datastore <sqlite-datastore>)) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (let ((statement - (sqlite-prepare - db - " -SELECT agent_id, COUNT(*) -FROM build_allocation_plan -GROUP BY agent_id" - #:cache? #t))) - - (let ((result - (sqlite-map - (match-lambda - (#(agent_id count) - (cons agent_id count))) - statement))) - (sqlite-reset statement) - - result))))) - -(define-method (datastore-replace-build-allocation-plan - (datastore <sqlite-datastore>) - planned-builds) - (define (clear-current-plan db) - (sqlite-exec - db - "DELETE FROM build_allocation_plan")) - - (define insert-sql - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (string-append - " -INSERT INTO build_allocation_plan (build_id, agent_id, ordering) VALUES " - (string-join - (map (match-lambda - ((build-uuid agent-id ordering) - (simple-format - #f - "('~A', '~A', ~A)" - (db-find-build-id db build-uuid) - agent-id - ordering))) - planned-builds) - ", ") - ";")))) - - (define (insert-new-plan db planned-builds) - (sqlite-exec - db - insert-sql)) - - (datastore-call-with-transaction - datastore - (lambda (db) - (clear-current-plan db) - (unless (null? planned-builds) - (insert-new-plan db planned-builds))) - #:duration-metric-name "replace_build_allocation_plan") - - (let* ((agent-ids - (map (lambda (agent-details) - (assq-ref agent-details 'uuid)) - (datastore-list-agents datastore))) - (counts-by-agent - (make-vector (length agent-ids) 0))) - (for-each - (match-lambda - ((_ agent-id _) - (let ((index (list-index (lambda (list-agent-id) - (string=? agent-id list-agent-id)) - agent-ids))) - (vector-set! counts-by-agent - index - (+ (vector-ref counts-by-agent - index) - 1))))) - planned-builds) - - (let ((metric - (metrics-registry-fetch-metric (slot-ref datastore 'metrics-registry) - "build_allocation_plan_total"))) - (for-each - (lambda (index agent-id) - (metric-set metric - (vector-ref counts-by-agent index) - #:label-values - `((agent_id . ,agent-id)))) - (iota (length agent-ids)) - agent-ids))) + (sqlite-step-and-reset statement))) + #:priority 'high) #t) (define-method (datastore-count-allocated-builds (datastore <sqlite-datastore>)) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3344,8 +3383,8 @@ SELECT agent_id, COUNT(*) FROM allocated_builds GROUP BY agent_id" (define-method (datastore-agent-requested-systems (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3418,34 +3457,40 @@ INSERT INTO build_allocation_agent_requested_systems (agent_id, system_id) VALUE (define-method (datastore-fetch-build-to-allocate (datastore <sqlite-datastore>) - agent-id) - (datastore-call-with-transaction - datastore + build-id) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare db - ;; This needs to guard against the plan being out of date " SELECT builds.uuid, derivations.id, derivations.name, unprocessed_builds_with_derived_priorities.derived_priority FROM builds -INNER JOIN build_allocation_plan - ON builds.id = build_allocation_plan.build_id INNER JOIN derivations ON builds.derivation_id = derivations.id -INNER JOIN build_allocation_agent_requested_systems - ON build_allocation_agent_requested_systems.agent_id = :agent_id - AND build_allocation_agent_requested_systems.system_id = derivations.system_id LEFT JOIN unprocessed_builds_with_derived_priorities ON unprocessed_builds_with_derived_priorities.build_id = builds.id -WHERE build_allocation_plan.agent_id = :agent_id +WHERE builds.uuid = :uuid AND builds.processed = 0 AND builds.canceled = 0 - AND builds.id NOT IN (SELECT build_id FROM allocated_builds) -ORDER BY build_allocation_plan.ordering ASC" - #:cache? #t)) - (output-conflicts-statement + AND builds.id NOT IN (SELECT build_id FROM allocated_builds)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:uuid build-id) + + (sqlite-step-and-reset statement))))) + +(define-method (datastore-check-if-derivation-conflicts? + (datastore <sqlite-datastore>) + agent-id + derivation-id) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement (sqlite-prepare db " @@ -3463,149 +3508,63 @@ WHERE build_derivation_outputs.derivation_id = :derivation_id allocated_builds_derivation_outputs.output_id" #:cache? #t))) - (define (get-build-to-allocate) - (let loop ((build-details (sqlite-step statement))) - (match build-details - (#f #f) - (#(uuid derivation-id derivation-name derived_priority) + (sqlite-bind-arguments statement + #:agent_id agent-id + #:derivation_id derivation-id) - (sqlite-bind-arguments output-conflicts-statement - #:agent_id agent-id - #:derivation_id derivation-id) + (->bool (sqlite-step-and-reset statement)))))) - (if (eq? (sqlite-step-and-reset output-conflicts-statement) - #f) - `((uuid . ,uuid) - (derivation_name . ,derivation-name) - (derived_priority . ,derived_priority)) - (loop (sqlite-step statement))))))) +(define-method (datastore-insert-to-allocated-builds + (datastore <sqlite-datastore>) + agent-id + build-uuid) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO allocated_builds (build_id, agent_id) + VALUES (:build_id, :agent_id)" + #:cache? #t))) (sqlite-bind-arguments statement + #:build_id (db-find-build-id db build-uuid) #:agent_id agent-id) - (let ((result (get-build-to-allocate))) - (sqlite-reset statement) - - result))) - - #:readonly? #t - #:duration-metric-name "fetch_builds_to_allocate")) - -(define-method (datastore-insert-to-allocated-builds - (datastore <sqlite-datastore>) - agent-id - build-uuids) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) - (lambda (db) - (sqlite-exec - db - (string-append - " -INSERT INTO allocated_builds (build_id, agent_id) VALUES " - (string-join - (map (lambda (build-uuid) - (simple-format - #f - "(~A, '~A')" - (db-find-build-id db build-uuid) - agent-id)) - build-uuids) - ", ") - ";"))))) + (sqlite-step-and-reset statement)))) + #t) -(define-method (datastore-remove-builds-from-plan +(define-method (datastore-update-allocated-build-submit-outputs (datastore <sqlite-datastore>) - build-uuids) - (call-with-worker-thread - (slot-ref datastore 'worker-writer-thread-channel) + build-uuid + submit-outputs?) + (call-with-writer-thread + datastore (lambda (db) - (sqlite-exec - db - (string-append - " -DELETE FROM build_allocation_plan -WHERE build_id IN (" - (string-join - (map (lambda (build-uuid) - (number->string (db-find-build-id db build-uuid))) - build-uuids) - ", ") - ")"))))) - -(define-method (datastore-list-allocation-plan-builds - (datastore <sqlite-datastore>) - . - rest) - (apply - (lambda* (agent-id #:key limit) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) - (lambda (db) - (let ((statement - (sqlite-prepare - db - (string-append - " -SELECT builds.uuid, derivations.name, systems.system, - builds.priority, - unprocessed_builds_with_derived_priorities.derived_priority -FROM builds -INNER JOIN derivations - ON builds.derivation_id = derivations.id -INNER JOIN systems - ON derivations.system_id = systems.id -INNER JOIN build_allocation_plan - ON builds.id = build_allocation_plan.build_id -LEFT JOIN unprocessed_builds_with_derived_priorities - ON builds.id = unprocessed_builds_with_derived_priorities.build_id -WHERE build_allocation_plan.agent_id = :agent_id - AND builds.processed = 0 - AND builds.id NOT IN (SELECT build_id FROM allocated_builds) -ORDER BY build_allocation_plan.ordering ASC" - (if limit - " -LIMIT :limit" - "")) - #:cache? #t))) - - (apply sqlite-bind-arguments - statement - #:agent_id agent-id - (if limit - (list #:limit limit) - '())) - - (let ((builds (sqlite-map - (match-lambda - (#(uuid derivation_name system - priority derived_priority) - `((uuid . ,uuid) - (derivation_name . ,derivation_name) - (system . ,system) - (priority . ,priority) - (derived_priority . ,derived_priority) - (tags . ,(vector-map - (lambda (_ tag) - (match tag - ((key . value) - `((key . ,key) - (value . ,value))))) - (datastore-fetch-build-tags - datastore - uuid)))))) - statement))) - (sqlite-reset statement) + (let ((statement + (sqlite-prepare + db + " +UPDATE allocated_builds +SET submit_outputs = :submit_outputs +WHERE build_id = :build_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:build_id (db-find-build-id db build-uuid) + #:submit_outputs (if submit-outputs? 1 0)) - builds))))) - rest)) + (sqlite-step-and-reset statement)))) + #t) (define-method (datastore-list-agent-builds (datastore <sqlite-datastore>) agent-id) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3613,7 +3572,7 @@ LIMIT :limit" " SELECT builds.uuid, derivations.name, unprocessed_builds_with_derived_priorities.derived_priority, - builds.canceled + builds.canceled, allocated_builds.submit_outputs FROM builds INNER JOIN derivations ON builds.derivation_id = derivations.id @@ -3630,11 +3589,17 @@ WHERE allocated_builds.agent_id = :agent_id" (let ((builds (sqlite-map (match-lambda - (#(uuid derivation_name derived_priority canceled) + (#(uuid derivation_name derived_priority canceled + submit_outputs) `((uuid . ,uuid) (derivation_name . ,derivation_name) (derived_priority . ,derived_priority) - (canceled . ,(= 1 canceled))))) + (canceled . ,(= 1 canceled)) + (submit_outputs . ,(cond + ((not submit_outputs) + 'null) + (else + (= 1 submit_outputs))))))) statement))) (sqlite-reset statement) @@ -3643,8 +3608,8 @@ WHERE allocated_builds.agent_id = :agent_id" (define-method (datastore-agent-for-build (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3706,11 +3671,11 @@ WHERE build_results.build_id = :build_id" "_sqitch_registry.db") (string-append "db:sqlite:" database-file)))) - (simple-format #t "running command: ~A\n" - (string-join command)) + (simple-format/safe #t "running command: ~A\n" + (string-join command)) (let ((pid (spawn (%config 'sqitch) command))) (unless (zero? (cdr (waitpid pid))) - (simple-format + (simple-format/safe (current-error-port) "error: sqitch command failed\n") (exit 1))))) @@ -3800,16 +3765,16 @@ WHERE name = :name" (define-method (datastore-find-derivation (datastore <sqlite-datastore>) name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (db-find-derivation db name)))) (define-method (datastore-find-derivation-outputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3841,8 +3806,8 @@ WHERE derivation_id = :derivation_id" (define-method (datastore-find-derivation-output-details (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3884,8 +3849,8 @@ WHERE derivation_id = :derivation_id" (define-method (datastore-list-unbuilt-derivation-outputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3918,8 +3883,8 @@ WHERE derivation_id = :derivation_id" (define-method (datastore-list-build-outputs (datastore <sqlite-datastore>) build-uuid) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3963,8 +3928,8 @@ WHERE builds.id = :build_id" (define-method (datastore-find-derivation-system (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread/delay-logging + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -3987,8 +3952,8 @@ WHERE name = :name" (define-method (datastore-find-derivation-inputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -4028,8 +3993,8 @@ WHERE derivations.id = :derivation_id" (define-method (datastore-find-recursive-derivation-input-outputs (datastore <sqlite-datastore>) derivation-name) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -4070,8 +4035,8 @@ INNER JOIN outputs (datastore <sqlite-datastore>) start-derivation-name output) - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (let ((statement (sqlite-prepare @@ -4154,20 +4119,6 @@ SELECT system FROM systems WHERE id = :id" (define derivation-name (derivation-file-name derivation)) - (define (maybe-fix-fixed-output-field derivation-details) - (let ((fixed-output? (fixed-output-derivation? derivation))) - (unless (equal? (assq-ref derivation-details 'fixed-output?) - fixed-output?) - (let ((statement (sqlite-prepare - db - " -UPDATE derivations SET fixed_output = :fixed_output WHERE name = :name" - #:cache? #t))) - (sqlite-bind-arguments statement - #:name derivation-name - #:fixed_output (if fixed-output? 1 0)) - (sqlite-step-and-reset statement))))) - (define (insert-derivation) (let ((statement (sqlite-prepare @@ -4198,8 +4149,6 @@ INSERT INTO derivations (name, system_id, fixed_output) (db-find-derivation db derivation-name))) (if derivation-details (begin - (maybe-fix-fixed-output-field derivation-details) - (let ((derivation-outputs (select-derivation-outputs db derivation-name))) @@ -4508,15 +4457,15 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)" (lambda* (uuid drv-name priority defer-until #:key skip-updating-other-build-derived-priorities) (define system-id - (call-with-worker-thread - (slot-ref datastore 'worker-reader-thread-channel) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) (lambda (db) (db-system->system-id db (datastore-find-derivation-system datastore drv-name))))) - (call-with-worker-thread/delay-logging - (slot-ref datastore 'worker-writer-thread-channel) + (call-with-writer-thread/delay-logging + datastore (lambda (db) (let* ((build-id (insert-build db drv-name uuid priority defer-until)) @@ -4542,7 +4491,8 @@ VALUES (:build_id, :derived_priority, :all_inputs_built)" #:args (list db build-id - derived-priority))))))) + derived-priority))))) + #:priority 'low)) rest) #t) @@ -4578,3 +4528,212 @@ VALUES (:agent_id, :password)" #:password password) (sqlite-step-and-reset statement))) + +(define-method (datastore-insert-background-job + (datastore <sqlite-datastore>) + type + args) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO background_jobs_queue (type, args) +VALUES (:type, :args) +RETURNING id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:type (symbol->string type) + #:args (call-with-output-string + (lambda (port) + (write args port)))) + + (match (sqlite-step-and-reset statement) + (#(id) + + (metric-increment + (metrics-registry-fetch-metric + (slot-ref datastore 'metrics-registry) + "coordinator_background_job_inserted_total") + #:label-values `((name . ,type))) + + id)))))) + +(define-method (datastore-delete-background-job + (datastore <sqlite-datastore>) + id) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +DELETE FROM background_jobs_queue WHERE id = :id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:id id) + + (sqlite-step-and-reset statement)) + #t) + #:priority 'high)) + +(define-method (datastore-select-background-jobs + (datastore <sqlite-datastore>) + . + args) + (apply + (lambda* (type #:key (limit 1)) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT id, args +FROM background_jobs_queue +WHERE type = :type +ORDER BY id ASC +LIMIT :limit" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:type (symbol->string type) + #:limit limit) + + (let ((result + (sqlite-map + (match-lambda + (#(id args) + `((id . ,id) + (args . ,(call-with-input-string args read))))) + statement))) + (sqlite-reset statement) + result))))) + args)) + +(define-method (datastore-count-background-jobs + (datastore <sqlite-datastore>)) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT type, COUNT(*) +FROM background_jobs_queue +GROUP BY type" + #:cache? #t))) + + (let ((result + (sqlite-map + (match-lambda + (#(type count) + (cons type count))) + statement))) + (sqlite-reset statement) + + result))))) + +(define-method (datastore-check-and-correct-unprocessed-builds-all-inputs-built + (datastore <sqlite-datastore>) + . + args) + (define entries-to-check + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT build_id, builds.derivation_id +FROM unprocessed_builds_with_derived_priorities +INNER JOIN builds ON builds.id = build_id +WHERE all_inputs_built = 0 +ORDER BY build_id DESC" + #:cache? #t))) + (let ((result (sqlite-map identity statement))) + (sqlite-reset statement) + result))))) + + (define (all-inputs-built? derivation-id) + (call-with-thread + (slot-ref datastore 'reader-thread-pool) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT 1 +FROM derivation_inputs +INNER JOIN derivation_outputs + ON derivation_inputs.derivation_output_id = derivation_outputs.id +INNER JOIN unbuilt_outputs + ON unbuilt_outputs.output_id = derivation_outputs.output_id +WHERE derivation_inputs.derivation_id = :derivation_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:derivation_id derivation-id) + + (match (sqlite-step-and-reset statement) + (#(1) #f) + (#f #t)))))) + + (define (update build-id) + (call-with-writer-thread + datastore + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +UPDATE unprocessed_builds_with_derived_priorities +SET all_inputs_built = 1 +WHERE build_id = :build_id AND all_inputs_built = 0 +RETURNING 1" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:build_id build-id) + + ;; This is to cope with the check not being transactional, so we + ;; might go to update a record which has just been changed to have + ;; all_inputs_built = 1 + (match (sqlite-step-and-reset statement) + (#(1) #t) + (#f #f)))))) + + (apply + (lambda* (#:key (progress-reporter (const progress-reporter/silent))) + (let ((reporter + (progress-reporter (length entries-to-check)))) + (start-progress-reporter! reporter) + (let loop ((entries entries-to-check) + (updated-count 0)) + (if (null? entries) + (begin + (stop-progress-reporter! reporter) + updated-count) + (match (car entries) + (#(build-id derivation-id) + (progress-reporter-report! reporter) + (if (all-inputs-built? derivation-id) + (loop (cdr entries) + (+ updated-count + (if (update build-id) + 1 + 0))) + (loop (cdr entries) + updated-count)))))))) + args)) diff --git a/guix-build-coordinator/guix-data-service.scm b/guix-build-coordinator/guix-data-service.scm index e2ecbe7..584bbbd 100644 --- a/guix-build-coordinator/guix-data-service.scm +++ b/guix-build-coordinator/guix-data-service.scm @@ -27,6 +27,7 @@ #:use-module (json) #:use-module (web client) #:use-module (web response) + #:use-module (knots timeout) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator datastore) #:export (send-build-event-to-guix-data-service @@ -51,7 +52,8 @@ #:body body ;; Guile doesn't treat JSON as text, so decode the ;; body manually - #:decode-body? #f)))) + #:decode-body? #f)) + #:timeout 10)) (code (response-code response))) (unless (and (>= code 200) diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm index bf4b576..3cd1c59 100644 --- a/guix-build-coordinator/hooks.scm +++ b/guix-build-coordinator/hooks.scm @@ -26,14 +26,16 @@ #:use-module (gcrypt pk-crypto) #:use-module (zlib) #:use-module (lzlib) + #:use-module (knots timeout) #:use-module (guix pki) #:use-module (guix store) #:use-module (guix base32) #:use-module (guix config) #:use-module (guix derivations) #:use-module (guix serialization) - #:use-module ((guix utils) #:select (default-keyword-arguments)) - #:use-module (guix build utils) + #:use-module ((guix utils) #:select (default-keyword-arguments + call-with-decompressed-port)) + #:use-module ((guix build utils) #:select (dump-port mkdir-p)) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator datastore) @@ -81,6 +83,20 @@ build-id agent-id) (current-error-port)))) +(define* (default-nar-compressions #:key output-filename nar-size + source-compression source-size) + (define MiB (* (expt 2 20) 1.)) + + (if (eq? 'none source-compression) + ;; If the agent didn't compress the nar, don't change that here + (list 'none) + (let* ((compression-proportion + (/ source-size nar-size))) + (if (or (> compression-proportion 0.95) + (< nar-size (* 0.05 MiB))) + '(none) + (list source-compression))))) + (define* (build-success-publish-hook publish-directory #:key @@ -94,7 +110,8 @@ post-publish-hook combined-post-publish-hook (publish-referenced-derivation-source-files? #t) - derivation-substitute-urls) + derivation-substitute-urls + (nar-compressions-proc default-nar-compressions)) (mkdir-p (string-append publish-directory "/nar/lzip")) (lambda (build-coordinator build-id) @@ -140,7 +157,8 @@ (substitute-derivation store drv-name #:substitute-urls - derivation-substitute-urls))) + derivation-substitute-urls)) + #:timeout 120) (add-temp-root store drv-name)) (let* ((drv (read-derivation-from-file* drv-name)) @@ -218,8 +236,7 @@ nar-filename)) #:unwind? #t)) - (cons narinfo-filename - nar-filename))))) + narinfo-filename)))) referenced-source-files)))))) (define (process-output drv-name output) @@ -230,12 +247,6 @@ (nar-location (build-output-file-location datastore build-id output-name)) - (nar-filename - (string-append "nar/lzip/" - (basename output-filename))) - (nar-destination - (string-append publish-directory "/" - nar-filename)) (narinfo-filename (string-append (string-take (basename output-filename) 32) ".narinfo")) @@ -246,8 +257,78 @@ (if (skip-publishing-proc narinfo-filename narinfo-directory) #f - (begin - (copy-file nar-location nar-destination) + (let ((compressions + (nar-compressions-proc + #:output-filename output-filename + #:nar-size (assq-ref output 'size) + ;; TODO Don't hardcode this + #:source-compression 'lzip + #:source-size (stat:size (stat nar-location #f))))) + + (for-each + (lambda (compression) + (if (or (and (pair? compression) + (eq? (car compression) 'lzip)) + (eq? compression 'lzip)) + ;; TODO If the agents start uploading uncompressed files + ;; or files compressed differently, this might not be + ;; right + (let ((nar-destination + (string-append publish-directory "/" + "nar/lzip/" + (basename output-filename)))) + (copy-file nar-location nar-destination)) + (let* ((target-compression + (if (pair? compression) + (car compression) + compression)) + ;; TODO This logic should sit elsewhere + (nar-destination + (string-append + publish-directory "/" + "nar/" + (if (eq? compression 'none) + "" + (string-append + (symbol->string target-compression) "/")) + (basename output-filename))) + (temporary-destination + (string-append nar-destination ".tmp"))) + (mkdir-p (dirname temporary-destination)) + (call-with-input-file nar-location + (lambda (source-port) + (call-with-decompressed-port + ;; TODO Don't assume the source compression + 'lzip + source-port + (lambda (decompressed-source-port) + (let ((call-with-compressed-output-port* + (match target-compression + ('gzip + (@ (zlib) call-with-gzip-output-port)) + ('lzip + (@ (lzlib) call-with-lzip-output-port)) + ('zstd + (@ (zstd) call-with-zstd-output-port)) + ('none + (lambda (port proc) + (proc port) + (close-port port)))))) + (when (file-exists? temporary-destination) + (delete-file temporary-destination)) + (apply + call-with-compressed-output-port* + (open-output-file temporary-destination + #:binary #t) + (lambda (compressed-port) + (dump-port decompressed-source-port + compressed-port)) + (if (pair? compression) + (cdr compression) + '()))))))) + (rename-file temporary-destination + nar-destination)))) + compressions) (call-with-output-file narinfo-location (lambda (port) @@ -257,7 +338,23 @@ (assq-ref output 'size) (vector->list (assq-ref output 'references)) - `((lzip ,(stat:size (stat nar-location #f)))) + (map + (lambda (compression-details) + (let* ((compression + (if (pair? compression-details) + (car compression-details) + compression-details)) + ;; TODO This logic should sit elsewhere + (file (string-append + publish-directory "/" + "nar/" + (if (eq? compression 'none) + "" + (string-append + (symbol->string compression) "/")) + (basename output-filename)))) + (list compression (stat:size (stat file #f))))) + compressions) #:system (datastore-find-derivation-system datastore drv-name) @@ -276,18 +373,16 @@ (raise-exception exn)) (lambda () (post-publish-hook publish-directory - narinfo-filename - nar-filename)) + narinfo-filename)) #:unwind? #t)) - (cons narinfo-filename - nar-filename))))) + narinfo-filename)))) (let* ((build-details (datastore-find-build datastore build-id)) (drv-name (assq-ref build-details 'derivation-name)) - (narinfos-and-nars + (narinfos (append (if publish-referenced-derivation-source-files? (process-referenced-derivation-source-files drv-name) @@ -297,23 +392,22 @@ (process-output drv-name output)) (datastore-list-build-outputs datastore build-id))))) (when (and combined-post-publish-hook - (not (null? narinfos-and-nars))) + (not (null? narinfos))) (with-exception-handler (lambda (exn) ;; Rollback narinfo creation, to make this more ;; transactional (for-each - (match-lambda - ((narinfo-filename . _) - (delete-file - (string-append - narinfo-directory "/" narinfo-filename)))) - narinfos-and-nars) + (lambda + (narinfo-filename) + (delete-file + (string-append + narinfo-directory "/" narinfo-filename))) + narinfos) (raise-exception exn)) (lambda () - (combined-post-publish-hook publish-directory - narinfos-and-nars)) + (combined-post-publish-hook publish-directory narinfos)) #:unwind? #t))))) (define* (build-success-s3-publish-hook @@ -528,24 +622,27 @@ (unless (eq? source-compression recompress-to) (when (file-exists? tmp-output-log-file) (delete-file tmp-output-log-file)) - (with-timeout timeout - (raise-exception - (make-exception-with-message "timeout recompressing log file")) - (call-with-compressed-input-file - source-log-file - source-compression - (lambda (input-port) - (call-with-compressed-output-file - tmp-output-log-file - recompress-to - (lambda (output-port) - (dump-port input-port output-port)))))) + (call-with-compressed-input-file + source-log-file + source-compression + (lambda (input-port) + (call-with-compressed-output-file + tmp-output-log-file + recompress-to + (lambda (output-port) + (dump-port input-port output-port))))) (rename-file tmp-output-log-file output-log-file) (delete-file source-log-file))))) -(define (default-build-missing-inputs-hook build-coordinator - build-id missing-inputs) +(define* (default-build-missing-inputs-hook + build-coordinator + build-id missing-inputs + #:key (submit-build? (lambda* (missing-input + #:key successful-builds + pending-builds) + (and (null? successful-builds) + (null? pending-builds))))) (define datastore (build-coordinator-datastore build-coordinator)) @@ -579,8 +676,9 @@ (not (assq-ref build-details 'processed)) (not (assq-ref build-details 'canceled)))) builds-for-output))) - (if (and (null? successful-builds) - (null? pending-builds)) + (if (submit-build? missing-input + #:successful-builds successful-builds + #:pending-builds pending-builds) (begin (simple-format #t "submitting build for ~A\n" @@ -590,8 +688,7 @@ #:tags (datastore-fetch-build-tags datastore build-id))) - (simple-format #t "~A builds exist for ~A, skipping\n" - (length builds-for-output) + (simple-format #t "skipping submitting build for ~A\n" missing-input))) (begin (simple-format (current-error-port) diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index a549f20..f4f15dc 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -14,10 +14,11 @@ #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) - #:use-module (ice-9 suspendable-ports) - #:use-module ((ice-9 ports internal) #:select (port-poll)) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) + #:use-module (oop goops) + #:use-module (logging logger) + #:use-module (logging port-log) #:use-module (web uri) #:use-module (web http) #:use-module (web client) @@ -39,24 +40,12 @@ #:use-module ((guix http-client) #:select (http-fetch)) #:use-module (guix serialization) - #:use-module ((guix build download) - #:select ((open-connection-for-uri - . guix:open-connection-for-uri))) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix scripts substitute) + #:use-module (guix-build-coordinator utils timeout) #:export (random-v4-uuid - &port-timeout - &port-read-timeout - &port-write-timeout - - port-timeout-error? - port-read-timeout-error? - port-write-timeout-error? - - with-port-timeouts - request-query-parameters call-with-streaming-http-request @@ -66,7 +55,7 @@ read-derivation-from-file* - with-store/non-blocking + non-blocking-port substitute-derivation read-derivation-through-substitutes @@ -86,9 +75,6 @@ create-work-queue create-thread-pool - with-timeout - reset-timeout - throttle get-load-average @@ -100,7 +86,13 @@ open-socket-for-uri* - check-locale!)) + check-locale! + + display/safe + simple-format/safe + format/safe + + <custom-port-log>)) (eval-when (eval load compile) (begin @@ -188,74 +180,6 @@ (parse-query-string query)) '()))) -(define &port-timeout - (make-exception-type '&port-timeout - &external-error - '(port))) - -(define make-port-timeout-error - (record-constructor &port-timeout)) - -(define port-timeout-error? - (record-predicate &port-timeout)) - -(define &port-read-timeout - (make-exception-type '&port-read-timeout - &port-timeout - '())) - -(define make-port-read-timeout-error - (record-constructor &port-read-timeout)) - -(define port-read-timeout-error? - (record-predicate &port-read-timeout)) - -(define &port-write-timeout - (make-exception-type '&port-write-timeout - &port-timeout - '())) - -(define make-port-write-timeout-error - (record-constructor &port-write-timeout)) - -(define port-write-timeout-error? - (record-predicate &port-write-timeout)) - -(define* (with-port-timeouts thunk #:key (timeout (* 120 1000))) - - ;; When the GC runs, it restarts the poll syscall, but the timeout remains - ;; unchanged! When the timeout is longer than the time between the syscall - ;; restarting, I think this renders the timeout useless. Therefore, this - ;; code uses a short timeout, and repeatedly calls poll while watching the - ;; clock to see if it has timed out overall. - (define poll-timeout-ms 200) - - (define (wait port mode) - (let ((timeout-internal - (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) - (let loop ((poll-value - (port-poll port mode poll-timeout-ms))) - (if (= poll-value 0) - (if (> (get-internal-real-time) - timeout-internal) - (raise-exception - (if (string=? mode "r") - (make-port-read-timeout-error port) - (make-port-write-timeout-error port))) - (loop (port-poll port mode poll-timeout-ms))) - poll-value)))) - - (parameterize - ((current-read-waiter - (lambda (port) - (wait port "r"))) - (current-write-waiter - (lambda (port) - (wait port "w")))) - (thunk))) - (define* (call-with-streaming-http-request uri content-length callback @@ -282,8 +206,8 @@ (setvbuf port 'block (expt 2 13)) (with-exception-handler (lambda (exp) - (simple-format #t "error: ~A ~A: ~A\n" - method (uri-path uri) exp) + (simple-format/safe #t "error: ~A ~A: ~A\n" + method (uri-path uri) exp) (close-port port) (raise-exception exp)) (lambda () @@ -298,7 +222,8 @@ (let ((body (read-response-body response))) (close-port port) (values response - body))))))))))) + body))))))))) + #:timeout 120)) (define (find-missing-substitutes-for-output store substitute-urls output) (if (valid-path? store output) @@ -356,7 +281,7 @@ (when (file-exists? cache-file) (with-exception-handler (lambda (exn) - (simple-format + (simple-format/safe (current-error-port) "error: when deleting substitute cache file: ~A\n" exn)) @@ -368,7 +293,18 @@ (let ((substitute-urls (append-map (lambda (substitute-url) (let ((log-port (open-output-string))) - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format/safe + (current-error-port) + "exception in has-substiutes-no-cache? (~A): ~A\n" + substitute-url exn) + (display/safe (string-append + (get-output-string log-port) + "\n") + (current-error-port)) + (close-output-port log-port) + (raise-exception exn)) (lambda () (if (null? ;; I doubt the caching is thread safe, so @@ -378,17 +314,7 @@ (lookup-narinfos substitute-url (list file))))) '() - (list substitute-url))) - (lambda (key . args) - (simple-format - (current-error-port) - "exception in has-substiutes-no-cache? (~A) ~A: ~A\n" - substitute-url key args) - (display (string-append - (get-output-string log-port) - "\n") - (current-error-port)) - (close-output-port log-port))))) + (list substitute-url)))))) substitute-urls))) substitute-urls)) @@ -399,23 +325,6 @@ (fcntl port F_SETFL (logior O_NONBLOCK flags))) port)) -(define (ensure-non-blocking-store-connection store) - "Mark the file descriptor that backs STORE, a <store-connection>, as -O_NONBLOCK." - (match (store-connection-socket store) - ((? file-port? port) - (non-blocking-port port)) - (_ #f))) - -(define-syntax-rule (with-store/non-blocking store exp ...) - "Like 'with-store', bind STORE to a connection to the store, but ensure that -said connection is non-blocking (O_NONBLOCK). Evaluate EXP... in that -context." - (with-store store - (ensure-non-blocking-store-connection store) - (let () - exp ...))) - (define* (substitute-derivation store derivation-name #:key substitute-urls) @@ -439,7 +348,7 @@ context." (take-right lines 10) lines))) (close-output-port log-port) - (simple-format + (simple-format/safe (current-error-port) "exception when substituting derivation: ~A:\n ~A\n" exn (string-join last-n-lines "\n")) @@ -449,27 +358,23 @@ context." (ensure-path store derivation-name))) #:unwind? #t))) -(define read-derivation-from-file* - (let ((%derivation-cache - (@@ (guix derivations) %derivation-cache))) - (lambda (file) - (or (and file (hash-ref %derivation-cache file)) - (let ((drv - ;; read-derivation can call read-derivation-from-file, so to - ;; avoid having many open files when reading a derivation with - ;; inputs, read it in to a string first. - (call-with-input-string - ;; Avoid calling scm_i_relativize_path in - ;; fport_canonicalize_filename since this leads to lots - ;; of readlink calls - (with-fluids ((%file-port-name-canonicalization 'none)) - (call-with-input-file file - get-string-all)) - (lambda (port) - (set-port-filename! port file) - (read-derivation port read-derivation-from-file*))))) - (hash-set! %derivation-cache file drv) - drv))))) +(define* (read-derivation-from-file* file #:optional (drv-hash (make-hash-table))) + (or (and file (hash-ref drv-hash file)) + (let ((drv + ;; read-derivation can call read-derivation-from-file, so to + ;; avoid having many open files when reading a derivation with + ;; inputs, read it in to a string first. + (call-with-input-string + (call-with-input-file file + get-string-all) + (lambda (port) + (set-port-filename! port file) + (read-derivation port (lambda (file) + (read-derivation-from-file* + file + drv-hash))))))) + (hash-set! drv-hash file drv) + drv))) (define (read-derivation-through-substitutes derivation-name substitute-urls) @@ -487,10 +392,9 @@ context." (match (assoc-ref cache key) (#f (let ((socket - (guix:open-connection-for-uri + (open-socket-for-uri* uri - #:verify-certificate? verify-certificate? - #:timeout timeout))) + #:verify-certificate? verify-certificate?))) (set! cache (alist-cons key socket cache)) socket)) (socket @@ -591,9 +495,9 @@ context." (define* (store-item->recutils compression file-size) (let ((url (encode-and-join-uri-path `(,@(split-and-decode-uri-path nar-path) - ,@(if compression - (list (symbol->string compression)) - '()) + ,@(if (eq? compression 'none) + '() + (list (symbol->string compression))) ,(basename store-path))))) (format #f "URL: ~a~%Compression: ~a~%~@[FileSize: ~a~%~]" url @@ -663,7 +567,7 @@ References: ~a~%" #:unwind? #t) ((#t . return-values) (when (> attempt 1) - (simple-format + (simple-format/safe (current-error-port) "retry success: ~A\n on attempt ~A of ~A\n" f @@ -674,7 +578,7 @@ References: ~a~%" (if (>= attempt (- times 1)) (begin - (simple-format + (simple-format/safe (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n" f @@ -685,14 +589,14 @@ References: ~a~%" (when error-hook (error-hook attempt exn)) (sleep-impl delay) - (simple-format + (simple-format/safe (current-error-port) "running last retry of ~A after ~A failed attempts\n" f attempt) (f)) (begin - (simple-format + (simple-format/safe (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n" f @@ -920,27 +824,29 @@ References: ~a~%" (define (thread-process-job job-args) (with-exception-handler (lambda (exn) - (simple-format (current-error-port) - "~A work queue, job raised exception ~A: ~A\n" - name job-args exn)) + (simple-format/safe + (current-error-port) + "~A work queue, job raised exception ~A: ~A\n" + name job-args exn)) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format/safe + (current-error-port) + "~A work queue, exception when handling job: ~A\n" + name exn) + (let* ((stack (make-stack #t 3)) + (backtrace + (call-with-output-string + (lambda (port) + (display-backtrace stack port) + (newline port))))) + (display/safe + backtrace + (current-error-port))) + (raise-exception exn)) (lambda () - (apply proc job-args)) - (lambda (key . args) - (simple-format - (current-error-port) - "~A work queue, exception when handling job: ~A ~A\n" - name key args) - (let* ((stack (make-stack #t 3)) - (backtrace - (call-with-output-string - (lambda (port) - (display-backtrace stack port) - (newline port))))) - (display - backtrace - (current-error-port)))))) + (apply proc job-args)))) #:unwind? #t)) (define (start-thread thread-index) @@ -1110,36 +1016,29 @@ References: ~a~%" (define (thread-process-job job-args) (with-exception-handler (lambda (exn) - (with-exception-handler - (lambda _ - #f) - (lambda () - ;; Logging may raise an exception, so try and just keep going. - (display - (simple-format - #f - "~A thread pool, job raised exception ~A: ~A\n" - name job-args exn) - (current-error-port))) - #:unwind? #t)) + (simple-format/safe + (current-error-port) + "~A thread pool, job raised exception ~A: ~A\n" + name job-args exn)) (lambda () - (with-throw-handler #t - (lambda () - (apply proc job-args)) - (lambda (key . args) - (simple-format + (with-exception-handler + (lambda (exn) + (simple-format/safe (current-error-port) - "~A thread pool, exception when handling job: ~A ~A\n" - name key args) + "~A thread pool, exception when handling job: ~A\n" + name exn) (let* ((stack (make-stack #t 3)) (backtrace (call-with-output-string (lambda (port) (display-backtrace stack port) (newline port))))) - (display + (display/safe backtrace - (current-error-port)))))) + (current-error-port))) + (raise-exception exn)) + (lambda () + (apply proc job-args)))) #:unwind? #t)) (define (start-thread thread-index) @@ -1268,27 +1167,6 @@ References: ~a~%" (values pool-mutex job-available count-threads list-jobs))) -;; copied from (guix scripts substitute) -(define-syntax-rule (with-timeout duration handler body ...) - "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY -again." - (begin - (sigaction SIGALRM - (lambda (signum) - (sigaction SIGALRM SIG_DFL) - handler)) - (alarm duration) - (call-with-values - (lambda () - body ...) - (lambda result - (alarm 0) - (sigaction SIGALRM SIG_DFL) - (apply values result))))) - -(define (reset-timeout duration) - (alarm duration)) - (define (throttle min-duration thunk) (let ((next-min-runtime 0)) (lambda () @@ -1400,22 +1278,18 @@ again." (define (check-locale!) (with-exception-handler (lambda (exn) - (display - (simple-format - #f - "exception when calling setlocale: ~A + (simple-format/safe + (current-error-port) + "exception when calling setlocale: ~A falling back to en_US.utf8\n" - exn) - (current-error-port)) + exn) (with-exception-handler (lambda (exn) - (display - (simple-format - #f - "exception when calling setlocale with en_US.utf8: ~A\n" - exn) - (current-error-port)) + (simple-format/safe + (current-error-port) + "exception when calling setlocale with en_US.utf8: ~A\n" + exn) (exit 1)) (lambda _ @@ -1424,3 +1298,51 @@ falling back to en_US.utf8\n" (lambda _ (setlocale LC_ALL "")) #:unwind? #t)) + +(define* (display/safe obj #:optional (port (current-output-port))) + ;; Try to avoid the dreaded conversion to port encoding failed error #62590 + (put-bytevector + port + (string->utf8 + (call-with-output-string + (lambda (port) + (display obj port))))) + (force-output port)) + +(define (simple-format/safe port s . args) + (let ((str (apply simple-format #f s args))) + (if (eq? #f port) + str + (display/safe + str + (if (eq? #t port) + (current-output-port) + port))))) + +(define (format/safe port s . args) + (let ((str (apply format #f s args))) + (if (eq? #f port) + str + (display/safe + str + (if (eq? #t port) + (current-output-port) + port))))) + +(define-class <custom-port-log> (<log-handler>) + (port #:init-value #f #:accessor port #:init-keyword #:port)) + +(define-method (emit-log (self <custom-port-log>) str) + (when (port self) + (put-bytevector (port self) + (string->utf8 str)) + ;; Even though the port is line buffered, writing to it with + ;; put-bytevector doesn't cause the buffer to be flushed. + (force-output (port self)))) + +(define-method (flush-log (self <custom-port-log>)) + (and=> (port self) force-output)) + +(define-method (close-log! (self <custom-port-log>)) + (and=> (port self) close-port) + (set! (port self) #f)) diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index 5362b18..d836ceb 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -1,4 +1,6 @@ (define-module (guix-build-coordinator utils fibers) + #:use-module (srfi srfi-9) + #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 atomic) #:use-module (ice-9 threads) @@ -11,284 +13,20 @@ #:use-module (fibers scheduler) #:use-module (fibers operations) #:use-module (fibers conditions) + #:use-module (knots timeout) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix-build-coordinator utils) - #:export (make-worker-thread-channel - call-with-worker-thread - worker-thread-timeout-error? + #:export (spawn-port-monitoring-fiber - call-with-sigint + make-discrete-priority-queueing-channels - run-server/patched - - spawn-port-monitoring-fiber - - letpar& - - with-fibers-timeout - with-fibers-port-timeouts) + make-reusable-condition + reusable-condition? + signal-reusable-condition! + reusable-condition-wait) #:replace (retry-on-error)) -(define %worker-thread-args - (make-parameter #f)) - -(define* (make-worker-thread-channel initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - destructor - lifetime - (log-exception? (const #t)) - (expire-on-exception? #f) - (name "unnamed")) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." - - (define (initializer/safe) - (let ((args - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running initializer in worker thread (~A): ~A:\n ~A\n" - name - initializer - exn) - #f) - (lambda () - (with-throw-handler #t - initializer - (lambda args - (backtrace)))) - #:unwind? #t))) - - (if args - args - ;; never give up, just keep retrying - (begin - (sleep 5) - (initializer/safe))))) - - (define (destructor/safe args) - (let ((success? - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running destructor in worker thread (~A): ~A:\n ~A\n" - name - destructor - exn) - #f) - (lambda () - (with-throw-handler #t - (lambda () - (apply destructor args) - #t) - (lambda _ - (backtrace)))) - #:unwind? #t))) - - (or success? - #t - (begin - (sleep 5) - (destructor/safe args))))) - - (define (process channel args) - (let loop ((current-lifetime lifetime)) - (let ((exception? - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second)) - - (let* ((start-time (get-internal-real-time)) - (response - (with-exception-handler - (lambda (exn) - (list 'worker-thread-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (call-with-values - (lambda () - (start-stack - 'worker-thread - (apply proc args))) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - vals)))) - (lambda args - (when (match args - (('%exception exn) - (log-exception? exn)) - (_ #t)) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" args) - (backtrace))))) - #:unwind? #t))) - (put-message reply - response) - - (match response - (('worker-thread-error duration _) - (when duration-logger - (duration-logger duration proc)) - #t) - ((duration . _) - (when duration-logger - (duration-logger duration proc)) - #f)))))))) - (unless (and expire-on-exception? - exception?) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))))) - - (let ((channel (make-channel))) - (for-each - (lambda (thread-index) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append - name " w t " - (number->string thread-index)))) - (const #t)) - - (let init ((args (initializer/safe))) - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "worker-thread-channel: exception: ~A\n" exn)) - (lambda () - (parameterize ((%worker-thread-args args)) - (process channel args))) - #:unwind? #t) - - (when destructor - (destructor/safe args)) - - (init (initializer/safe)))))) - (iota parallelism)) - channel)) - -(define &worker-thread-timeout - (make-exception-type '&worker-thread-timeout - &error - '())) - -(define make-worker-thread-timeout-error - (record-constructor &worker-thread-timeout)) - -(define worker-thread-timeout-error? - (record-predicate &worker-thread-timeout)) - -(define* (call-with-worker-thread channel proc #:key duration-logger - (timeout 30)) - "Send PROC to the worker thread through CHANNEL. Return the result of PROC. -If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) - (if args - (call-with-delay-logging proc #:args args) - (let* ((reply (make-channel)) - (operation-success? - (perform-operation - (let ((put - (wrap-operation - (put-operation channel - (list reply - (get-internal-real-time) - proc)) - (const #t)))) - - (if timeout - (choice-operation - put - (wrap-operation (sleep-operation timeout) - (const #f))) - put))))) - - (unless operation-success? - (raise-exception - (make-worker-thread-timeout-error))) - - (match (get-message reply) - (('worker-thread-error duration exn) - (when duration-logger - (duration-logger duration)) - (raise-exception exn)) - ((duration . result) - (when duration-logger - (duration-logger duration)) - (apply values result))))))) - -;; Copied from (fibers web server) -(define (call-with-sigint thunk cvar) - (let ((handler #f)) - (dynamic-wind - (lambda () - (set! handler - (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) - thunk - (lambda () - (if handler - ;; restore Scheme handler, SIG_IGN or SIG_DFL. - (sigaction SIGINT (car handler) (cdr handler)) - ;; restore original C handler. - (sigaction SIGINT #f)))))) - -;; This variant of run-server from the fibers library supports running -;; multiple servers within one process. -(define run-server/patched - (let ((fibers-web-server-module - (resolve-module '(fibers web server)))) - - (define set-nonblocking! - (module-ref fibers-web-server-module 'set-nonblocking!)) - - (define make-default-socket - (module-ref fibers-web-server-module 'make-default-socket)) - - (define socket-loop - (module-ref fibers-web-server-module 'socket-loop)) - - (lambda* (handler - #:key - (host #f) - (family AF_INET) - (addr (if host - (inet-pton family host) - INADDR_LOOPBACK)) - (port 8080) - (socket (make-default-socket family addr port))) - ;; We use a large backlog by default. If the server is suddenly hit - ;; with a number of connections on a small backlog, clients won't - ;; receive confirmation for their SYN, leading them to retry -- - ;; probably successfully, but with a large latency. - (listen socket 1024) - (set-nonblocking! socket) - (sigaction SIGPIPE SIG_IGN) - (spawn-fiber (lambda () (socket-loop socket handler)))))) - (define (spawn-port-monitoring-fiber port error-condition) (spawn-fiber (lambda () @@ -304,232 +42,17 @@ If already in the worker thread, call PROC immediately." "port monitoring fiber error-condition unresponsive") (primitive-exit 1)) (lambda () - (with-fibers-port-timeouts + (with-port-timeouts (lambda () - (let ((sock (socket PF_INET SOCK_STREAM 0))) + (let ((sock + (non-blocking-port + (socket PF_INET SOCK_STREAM 0)))) (connect sock AF_INET INADDR_LOOPBACK port) (close-port sock))) #:timeout 20)) #:unwind? #t) (sleep 20))))) -(define (defer-to-fiber thunk) - (let ((reply (make-channel))) - (spawn-fiber - (lambda () - (put-message - reply - (with-exception-handler - (lambda (exn) - (cons 'worker-fiber-error exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "worker fiber: exception: ~A\n" - exn) - (backtrace) - (raise-exception exn)) - (lambda () - (call-with-values - thunk - (lambda vals - vals))))) - #:unwind? #t))) - #:parallel? #t) - reply)) - -(define (fetch-result-of-defered-thunks . reply-channels) - (let ((responses (map get-message reply-channels))) - (map - (match-lambda - (('worker-thread-error . exn) - (raise-exception exn)) - (result - (apply values result))) - responses))) - -(define-syntax parallel-via-fibers - (lambda (x) - (syntax-case x () - ((_ e0 ...) - (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) - #'(let ((tmp0 (defer-to-fiber - (lambda () - e0))) - ...) - (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) - -(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) - (call-with-values - (lambda () (parallel-via-fibers e ...)) - (lambda (v ...) - b0 b1 ...))) - -(define* (with-fibers-timeout thunk #:key timeout on-timeout) - (let ((channel (make-channel))) - (spawn-fiber - (lambda () - (with-exception-handler - (lambda (exn) - (perform-operation - (choice-operation - (put-operation channel (cons 'exception exn)) - (sleep-operation timeout)))) - (lambda () - (call-with-values thunk - (lambda vals - (perform-operation - (choice-operation - (put-operation channel vals) - (sleep-operation timeout)))))) - #:unwind? #t))) - - (match (perform-operation - (choice-operation - (get-operation channel) - (wrap-operation (sleep-operation timeout) - (const 'timeout)))) - ('timeout - (on-timeout)) - (('exception . exn) - (raise-exception exn)) - (vals - (apply values vals))))) - -(define &port-timeout - (make-exception-type '&port-timeout - &external-error - '(port))) - -(define make-port-timeout-error - (record-constructor &port-timeout)) - -(define port-timeout-error? - (record-predicate &port-timeout)) - -(define &port-read-timeout - (make-exception-type '&port-read-timeout - &port-timeout - '())) - -(define make-port-read-timeout-error - (record-constructor &port-read-timeout)) - -(define port-read-timeout-error? - (record-predicate &port-read-timeout)) - -(define &port-write-timeout - (make-exception-type '&port-write-timeout - &port-timeout - '())) - -(define make-port-write-timeout-error - (record-constructor &port-write-timeout)) - -(define port-write-timeout-error? - (record-predicate &port-write-timeout)) - -;; These procedure are subject to spurious wakeups. - -(define (readable? port) - "Test if PORT is writable." - (match (select (vector port) #() #() 0) - ((#() #() #()) #f) - ((#(_) #() #()) #t))) - -(define (writable? port) - "Test if PORT is writable." - (match (select #() (vector port) #() 0) - ((#() #() #()) #f) - ((#() #(_) #()) #t))) - -(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) - (make-base-operation #f - (lambda _ - (and (ready? (port-ready-fd port)) values)) - (lambda (flag sched resume) - (define (commit) - (match (atomic-box-compare-and-swap! flag 'W 'S) - ('W (resume values)) - ('C (commit)) - ('S #f))) - (schedule-when-ready - sched (port-ready-fd port) commit)))) - -(define (wait-until-port-readable-operation port) - "Make an operation that will succeed when PORT is readable." - (unless (input-port? port) - (error "refusing to wait forever for input on non-input port")) - (make-wait-operation readable? schedule-task-when-fd-readable port - port-read-wait-fd - wait-until-port-readable-operation)) - -(define (wait-until-port-writable-operation port) - "Make an operation that will succeed when PORT is writable." - (unless (output-port? port) - (error "refusing to wait forever for output on non-output port")) - (make-wait-operation writable? schedule-task-when-fd-writable port - port-write-wait-fd - wait-until-port-writable-operation)) - -(define* (with-fibers-port-timeouts thunk - #:key timeout - (read-timeout timeout) - (write-timeout timeout)) - (define (no-fibers-wait port mode timeout) - (define poll-timeout-ms 200) - - ;; When the GC runs, it restarts the poll syscall, but the timeout - ;; remains unchanged! When the timeout is longer than the time - ;; between the syscall restarting, I think this renders the - ;; timeout useless. Therefore, this code uses a short timeout, and - ;; repeatedly calls poll while watching the clock to see if it has - ;; timed out overall. - (let ((timeout-internal - (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) - (let loop ((poll-value - (port-poll port mode poll-timeout-ms))) - (if (= poll-value 0) - (if (> (get-internal-real-time) - timeout-internal) - (raise-exception - (if (string=? mode "r") - (make-port-read-timeout-error port) - (make-port-write-timeout-error port))) - (loop (port-poll port mode poll-timeout-ms))) - poll-value)))) - - (parameterize - ((current-read-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-readable-operation port) - (wrap-operation - (sleep-operation read-timeout) - (lambda () - (raise-exception - (make-port-read-timeout-error thunk port)))))) - (no-fibers-wait port "r" read-timeout)))) - (current-write-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-writable-operation port) - (wrap-operation - (sleep-operation write-timeout) - (lambda () - (raise-exception - (make-port-write-timeout-error thunk port)))))) - (no-fibers-wait port "w" write-timeout))))) - (thunk))) - ;; Use the fibers sleep (define (retry-on-error . args) (apply @@ -537,3 +60,95 @@ If already in the worker thread, call PROC immediately." (append args (list #:sleep-impl sleep)))) + +(define (make-discrete-priority-queueing-channels channel num-priorities) + (define all-queues + (map (lambda _ (make-q)) + (iota num-priorities))) + + (define queue-channels + (map (lambda _ (make-channel)) + (iota num-priorities))) + + (define (stats) + (map (lambda (queue) + `((length . ,(q-length queue)))) + all-queues)) + + (spawn-fiber + (lambda () + (while #t + (let loop ((queues all-queues)) + (let ((queue (car queues))) + (if (q-empty? queue) + (let ((next (cdr queues))) + (if (null? next) + (perform-operation + (apply + choice-operation + (map (lambda (queue queue-channel) + (wrap-operation (get-operation queue-channel) + (lambda (val) + (enq! queue val)))) + all-queues + queue-channels))) + (loop next))) + (let ((front (q-front queue))) + (perform-operation + (apply + choice-operation + (cons + (wrap-operation (put-operation channel front) + (lambda _ + (q-pop! queue))) + (map (lambda (queue queue-channel) + (wrap-operation (get-operation queue-channel) + (lambda (val) + (enq! queue val)))) + all-queues + queue-channels))))))))))) + (values (list-copy queue-channels) + stats)) + +(define-record-type <reusable-condition> + (%make-reusable-condition atomic-box channel) + reusable-condition? + (atomic-box reusable-condition-atomic-box) + (channel reusable-condition-channel)) + +(define (make-reusable-condition) + (%make-reusable-condition (make-atomic-box #f) + (make-channel))) + +(define* (signal-reusable-condition! reusable-condition + #:optional (scheduler (current-scheduler))) + (match (atomic-box-compare-and-swap! + (reusable-condition-atomic-box reusable-condition) + #f + #t) + (#f + (spawn-fiber + (lambda () + (put-message (reusable-condition-channel reusable-condition) + #t)) + scheduler) + #t) + (#t #f))) + +(define* (reusable-condition-wait reusable-condition + #:key (timeout #f)) + (let ((val + (if (atomic-box-ref (reusable-condition-atomic-box reusable-condition)) + #t + ;; Not great as this is subject to race conditions, but it should + ;; roughly work + (if timeout + (perform-operation + (choice-operation + (get-operation (reusable-condition-channel reusable-condition)) + (wrap-operation (sleep-operation timeout) + (const #f)))) + (get-message (reusable-condition-channel reusable-condition)))))) + (atomic-box-set! (reusable-condition-atomic-box reusable-condition) + #f) + val)) diff --git a/guix-build-coordinator/utils/timeout.scm b/guix-build-coordinator/utils/timeout.scm new file mode 100644 index 0000000..bb133d7 --- /dev/null +++ b/guix-build-coordinator/utils/timeout.scm @@ -0,0 +1,81 @@ +(define-module (guix-build-coordinator utils timeout) + #:use-module (ice-9 exceptions) + #:use-module (ice-9 suspendable-ports) + #:use-module ((ice-9 ports internal) #:select (port-poll)) + #:export (&port-timeout + &port-read-timeout + &port-write-timeout + + port-timeout-error? + port-read-timeout-error? + port-write-timeout-error? + + with-port-timeouts)) + +(define &port-timeout + (make-exception-type '&port-timeout + &external-error + '(port))) + +(define make-port-timeout-error + (record-constructor &port-timeout)) + +(define port-timeout-error? + (record-predicate &port-timeout)) + +(define &port-read-timeout + (make-exception-type '&port-read-timeout + &port-timeout + '())) + +(define make-port-read-timeout-error + (record-constructor &port-read-timeout)) + +(define port-read-timeout-error? + (record-predicate &port-read-timeout)) + +(define &port-write-timeout + (make-exception-type '&port-write-timeout + &port-timeout + '())) + +(define make-port-write-timeout-error + (record-constructor &port-write-timeout)) + +(define port-write-timeout-error? + (record-predicate &port-write-timeout)) + +(define* (with-port-timeouts thunk #:key timeout) + + ;; When the GC runs, it restarts the poll syscall, but the timeout remains + ;; unchanged! When the timeout is longer than the time between the syscall + ;; restarting, I think this renders the timeout useless. Therefore, this + ;; code uses a short timeout, and repeatedly calls poll while watching the + ;; clock to see if it has timed out overall. + (define poll-timeout-ms 200) + + (define (wait port mode) + (let ((timeout-internal + (+ (get-internal-real-time) + (* internal-time-units-per-second timeout)))) + (let loop ((poll-value + (port-poll port mode poll-timeout-ms))) + (if (= poll-value 0) + (if (> (get-internal-real-time) + timeout-internal) + (raise-exception + (if (string=? mode "r") + (make-port-read-timeout-error port) + (make-port-write-timeout-error port))) + (loop (port-poll port mode poll-timeout-ms))) + poll-value)))) + + (parameterize + ((current-read-waiter + (lambda (port) + (wait port "r"))) + (current-write-waiter + (lambda (port) + (wait port "w")))) + (thunk))) + diff --git a/guix-dev.scm b/guix-dev.scm index 35869dc..011d9e0 100644 --- a/guix-dev.scm +++ b/guix-dev.scm @@ -44,6 +44,39 @@ (gnu packages texinfo) (srfi srfi-1)) +(define guile-knots + (let ((commit "d572f591a3c136bfc7b23160e16381c92588f8d9") + (revision "1")) + (package + (name "guile-knots") + (version (git-version "0" revision commit)) + (source (origin + (method git-fetch) + (uri (git-reference + (url "https://git.cbaines.net/git/guile/knots") + (commit commit))) + (sha256 + (base32 + "0g85frfniblxb2cl81fg558ic3cxvla7fvml08scjgbbxn8151gv")) + (file-name (string-append name "-" version "-checkout")))) + (build-system gnu-build-system) + (native-inputs + (list pkg-config + autoconf + automake + guile-3.0 + guile-lib + guile-fibers)) + (inputs + (list guile-3.0)) + (propagated-inputs + (list guile-fibers)) + (home-page "https://git.cbaines.net/guile/knots") + (synopsis "Patterns and functionality to use with Guile Fibers") + (description + "") + (license license:gpl3+)))) + (package (name "guix-build-coordinator") (version "0.0.0") @@ -53,6 +86,7 @@ (list guix guile-json-4 guile-fibers-1.3 + guile-knots guile-gcrypt guile-readline guile-lib diff --git a/scripts/guix-build-coordinator-agent.in b/scripts/guix-build-coordinator-agent.in index 2f9e774..69ecf8a 100644 --- a/scripts/guix-build-coordinator-agent.in +++ b/scripts/guix-build-coordinator-agent.in @@ -126,6 +126,11 @@ (lambda (opt name arg result) (alist-cons 'metrics-file arg + result))) + (option '("timestamp-log-output") #t #f + (lambda (opt name arg result) + (alist-cons 'timestamp-log-output? + (string=? arg "true") result))))) (define %option-defaults @@ -140,7 +145,8 @@ (* 3 (/ (total-processor-count) 4))) 1)) (dynamic-auth-token - . ,(getenv "GUIX_BUILD_COORDINATOR_DYNAMIC_AUTH_TOKEN")))) + . ,(getenv "GUIX_BUILD_COORDINATOR_DYNAMIC_AUTH_TOKEN")) + (timestamp-log-output? . #t))) (define (parse-options options defaults args) (args-fold @@ -207,4 +213,5 @@ (or (assq-ref opts 'non-derivation-substitute-urls) (assq-ref opts 'substitute-urls)) (assq-ref opts 'metrics-file) - (assq-ref opts 'max-1min-load-average))))) + (assq-ref opts 'max-1min-load-average) + (assq-ref opts 'timestamp-log-output?))))) diff --git a/scripts/guix-build-coordinator.in b/scripts/guix-build-coordinator.in index 4756bea..0c06579 100644 --- a/scripts/guix-build-coordinator.in +++ b/scripts/guix-build-coordinator.in @@ -58,15 +58,6 @@ (guix-build-coordinator build-allocator) (guix-build-coordinator client-communication)) -(install-suspendable-ports!) - -;; TODO Work around this causing problems with backtraces -;; https://github.com/wingo/fibers/issues/76 -(set-record-type-printer! - (@@ (fibers scheduler) <scheduler>) - (lambda (scheduler port) - (display "#<scheduler>" port))) - (define %base-options ;; Specifications of the command-line options (list (option '("secret-key-base-file") #t #f @@ -83,6 +74,13 @@ (option '("update-database") #f #f (lambda (opt name _ result) (alist-cons 'update-database #t result))) + (option '("listen-repl") #f #t + (lambda (opt name arg result) + (alist-cons 'listen-repl + (if arg + (string->number arg) + #t) + (alist-delete 'listen-repl result)))) (option '("show-error-details") #f #f (lambda (opt name _ result) (alist-cons 'show-error-details #t result))))) @@ -295,6 +293,16 @@ (or (assq-ref result 'not-systems) '())) (alist-delete 'not-systems result)))) + (option '("created-at-gt") #t #f + (lambda (opt name arg result) + (alist-cons 'created-at-> + (datastore-validate-datetime-string arg) + result))) + (option '("created-at-lt") #t #f + (lambda (opt name arg result) + (alist-cons 'created-at-< + (datastore-validate-datetime-string arg) + result))) (option '("skip-updating-derived-priorities") #f #f (lambda (opt name _ result) (alist-cons 'skip-updating-derived-priorities @@ -414,7 +422,12 @@ "error: ~A is not a known allocation strategy\n" arg) (exit 1))) - result)))) + result))) + (option '("timestamp-log-output") #t #f + (lambda (opt name arg result) + (alist-cons 'timestamp-log-output? + (string=? arg "true") + result)))) (append-map (lambda (hook) (list @@ -502,6 +515,7 @@ canceled?: ~A (vector->list (assoc-ref build-details "tags"))) "\n")) + (newline) (let ((derivation-inputs (vector->list @@ -659,6 +673,8 @@ tags: #:not-tags (assq-ref opts 'not-tags) #:systems (assq-ref opts 'systems) #:not-systems (assq-ref opts 'not-systems) + #:created-at-< (assq-ref opts 'created-at-<) + #:created-at-> (assq-ref opts 'created-at->) #:processed #f #:canceled #f #:relationship (assq-ref opts 'relationship))) @@ -695,7 +711,14 @@ tags: (assq-ref opts 'ignore-if-build-required-by-another))) #:times 6 - #:delay 5))) + #:delay 5 + #:ignore + (lambda (exn) + (and (exception-with-message? exn) + (string=? + "build-already-canceled" + (assoc-ref (exception-message exn) + "error"))))))) (unless (string=? (assoc-ref result "result") "build-canceled") (simple-format #t "~A\n" @@ -794,7 +817,8 @@ tags: (let ((response (send-submit-build-request (assq-ref opts 'coordinator) - derivation-file + ;; Whitespace like \r can creep in here, so strip it + (string-trim-both derivation-file) (assq-ref opts 'derivation-substitute-urls) #f ; TODO requested uuid (assq-ref opts 'priority) @@ -1083,7 +1107,9 @@ tags: #:database-uri-string (assq-ref opts 'database) #:hooks hooks-with-defaults #:allocation-strategy - (assq-ref opts 'allocation-strategy)))) + (assq-ref opts 'allocation-strategy) + #:timestamp-log-output? + (assq-ref opts 'timestamp-log-output?)))) (parameterize ((%show-error-details (assoc-ref opts 'show-error-details))) @@ -1094,4 +1120,5 @@ tags: #:pid-file (assq-ref opts 'pid-file) #:agent-communication-uri (assq-ref opts 'agent-communication) #:client-communication-uri (assq-ref opts 'client-communication) - #:parallel-hooks (assq-ref opts 'parallel-hooks))))))) + #:parallel-hooks (assq-ref opts 'parallel-hooks) + #:listen-repl (assoc-ref opts 'listen-repl))))))) diff --git a/sqitch/pg/deploy/allocated_builds_submit_outputs.sql b/sqitch/pg/deploy/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..a2ebe1e --- /dev/null +++ b/sqitch/pg/deploy/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Deploy guix-build-coordinator:allocated_builds_submit_outputs to pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/deploy/background-jobs-queue.sql b/sqitch/pg/deploy/background-jobs-queue.sql new file mode 100644 index 0000000..63dd8d4 --- /dev/null +++ b/sqitch/pg/deploy/background-jobs-queue.sql @@ -0,0 +1,7 @@ +-- Deploy guix-build-coordinator:background-jobs-queue to pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/deploy/builds_replace_unprocessed_index.sql b/sqitch/pg/deploy/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..782cf6b --- /dev/null +++ b/sqitch/pg/deploy/builds_replace_unprocessed_index.sql @@ -0,0 +1,7 @@ +-- Deploy guix-build-coordinator:builds_replace_unprocessed_index to pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/revert/allocated_builds_submit_outputs.sql b/sqitch/pg/revert/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..255efb1 --- /dev/null +++ b/sqitch/pg/revert/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:allocated_builds_submit_outputs from pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/revert/background-jobs-queue.sql b/sqitch/pg/revert/background-jobs-queue.sql new file mode 100644 index 0000000..46b0761 --- /dev/null +++ b/sqitch/pg/revert/background-jobs-queue.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:background-jobs-queue from pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/revert/builds_replace_unprocessed_index.sql b/sqitch/pg/revert/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..4395f90 --- /dev/null +++ b/sqitch/pg/revert/builds_replace_unprocessed_index.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:builds_replace_unprocessed_index from pg + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/pg/verify/allocated_builds_submit_outputs.sql b/sqitch/pg/verify/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..e95a717 --- /dev/null +++ b/sqitch/pg/verify/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:allocated_builds_submit_outputs on pg + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; diff --git a/sqitch/pg/verify/background-jobs-queue.sql b/sqitch/pg/verify/background-jobs-queue.sql new file mode 100644 index 0000000..a36097e --- /dev/null +++ b/sqitch/pg/verify/background-jobs-queue.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:background-jobs-queue on pg + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; diff --git a/sqitch/pg/verify/builds_replace_unprocessed_index.sql b/sqitch/pg/verify/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..050ef75 --- /dev/null +++ b/sqitch/pg/verify/builds_replace_unprocessed_index.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:builds_replace_unprocessed_index on pg + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; diff --git a/sqitch/sqitch.plan b/sqitch/sqitch.plan index cfb5b9d..f4f538b 100644 --- a/sqitch/sqitch.plan +++ b/sqitch/sqitch.plan @@ -46,3 +46,6 @@ agent_status_add_processor_count 2023-03-24T09:28:47Z Chris <chris@felis> # Add remove_build_allocation_plan 2023-04-23T19:50:23Z Chris <chris@felis> # Remove build_allocation_plan system_uptime 2023-05-05T18:18:35Z Chris <chris@felis> # Add system uptime build_starts_index 2023-11-24T16:30:13Z Chris <chris@felis> # build_starts index +background-jobs-queue 2025-02-06T10:49:08Z Chris <chris@fang> # Add background_jobs_queue +builds_replace_unprocessed_index 2025-02-19T11:19:42Z Chris <chris@fang> # Replace builds_unprocessed +allocated_builds_submit_outputs 2025-03-02T08:22:48Z Chris <chris@fang> # Add allocated_builds.submit_outputs diff --git a/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql b/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..66d6b45 --- /dev/null +++ b/sqitch/sqlite/deploy/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Deploy guix-build-coordinator:allocated_builds_submit_outputs to sqlite + +BEGIN; + +ALTER TABLE allocated_builds ADD COLUMN submit_outputs BOOLEAN DEFAULT NULL; + +COMMIT; diff --git a/sqitch/sqlite/deploy/background-jobs-queue.sql b/sqitch/sqlite/deploy/background-jobs-queue.sql new file mode 100644 index 0000000..1cb35f0 --- /dev/null +++ b/sqitch/sqlite/deploy/background-jobs-queue.sql @@ -0,0 +1,11 @@ +-- Deploy guix-build-coordinator:background-jobs-queue to sqlite + +BEGIN; + +CREATE TABLE background_jobs_queue ( + id INTEGER PRIMARY KEY, + type TEXT NOT NULL, + args TEXT NOT NULL +); + +COMMIT; diff --git a/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql b/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..a404f31 --- /dev/null +++ b/sqitch/sqlite/deploy/builds_replace_unprocessed_index.sql @@ -0,0 +1,9 @@ +-- Deploy guix-build-coordinator:builds_replace_unprocessed_index to sqlite + +BEGIN; + +DROP INDEX builds_unprocessed; + +CREATE INDEX builds_live ON builds (id) WHERE processed = 0 AND canceled = 0; + +COMMIT; diff --git a/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql b/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..240de22 --- /dev/null +++ b/sqitch/sqlite/revert/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:allocated_builds_submit_outputs from sqlite + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/sqlite/revert/background-jobs-queue.sql b/sqitch/sqlite/revert/background-jobs-queue.sql new file mode 100644 index 0000000..10ef1ff --- /dev/null +++ b/sqitch/sqlite/revert/background-jobs-queue.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:background-jobs-queue from sqlite + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql b/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..b02dd3c --- /dev/null +++ b/sqitch/sqlite/revert/builds_replace_unprocessed_index.sql @@ -0,0 +1,7 @@ +-- Revert guix-build-coordinator:builds_replace_unprocessed_index from sqlite + +BEGIN; + +-- XXX Add DDLs here. + +COMMIT; diff --git a/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql b/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql new file mode 100644 index 0000000..0b1331e --- /dev/null +++ b/sqitch/sqlite/verify/allocated_builds_submit_outputs.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:allocated_builds_submit_outputs on sqlite + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; diff --git a/sqitch/sqlite/verify/background-jobs-queue.sql b/sqitch/sqlite/verify/background-jobs-queue.sql new file mode 100644 index 0000000..1cf9965 --- /dev/null +++ b/sqitch/sqlite/verify/background-jobs-queue.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:background-jobs-queue on sqlite + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; diff --git a/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql b/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql new file mode 100644 index 0000000..d0dd086 --- /dev/null +++ b/sqitch/sqlite/verify/builds_replace_unprocessed_index.sql @@ -0,0 +1,7 @@ +-- Verify guix-build-coordinator:builds_replace_unprocessed_index on sqlite + +BEGIN; + +-- XXX Add verifications here. + +ROLLBACK; |