diff options
Diffstat (limited to 'guix-build-coordinator')
-rw-r--r-- | guix-build-coordinator/agent-messaging/http/server.scm | 21 | ||||
-rw-r--r-- | guix-build-coordinator/agent.scm | 34 | ||||
-rw-r--r-- | guix-build-coordinator/client-communication.scm | 9 | ||||
-rw-r--r-- | guix-build-coordinator/coordinator.scm | 52 | ||||
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 25 | ||||
-rw-r--r-- | guix-build-coordinator/hooks.scm | 24 | ||||
-rw-r--r-- | guix-build-coordinator/utils.scm | 104 | ||||
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 63 |
8 files changed, 165 insertions, 167 deletions
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm index 7bfc0b5..85ba0d4 100644 --- a/guix-build-coordinator/agent-messaging/http/server.scm +++ b/guix-build-coordinator/agent-messaging/http/server.scm @@ -137,13 +137,14 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." build-coordinator chunked-request-channel output-hash-channel) + (define plain-metrics-registry + (make-metrics-registry)) + (define gc-metrics-updater - (get-gc-metrics-updater - (build-coordinator-metrics-registry build-coordinator))) + (get-gc-metrics-updater plain-metrics-registry)) - (define port-metrics-updater - (get-port-metrics-updater - (build-coordinator-metrics-registry build-coordinator))) + (define process-metrics-updater + (get-process-metrics-updater plain-metrics-registry)) (define thread-metric (make-gauge-metric @@ -157,7 +158,7 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (call-with-delay-logging gc-metrics-updater) (metric-set thread-metric (length (all-threads))) - (call-with-delay-logging port-metrics-updater) + (call-with-delay-logging process-metrics-updater) (call-with-delay-logging datastore-metrics-updater)) (with-exception-handler @@ -184,7 +185,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." build-coordinator chunked-request-channel output-hash-channel - update-managed-metrics!))) + update-managed-metrics! + plain-metrics-registry))) #:host host #:port port)) #:unwind? #t)) @@ -614,7 +616,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." build-coordinator chunked-request-channel output-hash-channel - update-managed-metrics!) + update-managed-metrics! + plain-metrics-registry) (define (authenticated? uuid request) (let* ((authorization-base64 (match (assq-ref (request-headers request) @@ -1015,6 +1018,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f." (lambda (port) (write-metrics (build-coordinator-metrics-registry build-coordinator) + port) + (write-metrics plain-metrics-registry port)))))) (_ (render-json diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index 8144947..a64a61c 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -46,7 +46,7 @@ #:use-module (guix base32) #:use-module (guix serialization) #:use-module ((guix build syscalls) - #:select (set-thread-name)) + #:select (set-thread-name free-disk-space)) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator agent-messaging) #:use-module (guix-build-coordinator agent-messaging abstract) @@ -82,11 +82,14 @@ (define port-log (make <port-log> #:port (current-output-port) #:formatter - (lambda (lvl time str) + ;; In guile-lib v0.2.8 onwards, the formatter is + ;; called with more arguments + (lambda args ; lvl, time, str (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime time)) - lvl - str)))) + (strftime "%F %H:%M:%S" (localtime + (second args))) + (first args) + (third args))))) (define metrics-enabled? (and (not (string-null? metrics-file)) @@ -513,12 +516,21 @@ '()) (lambda () - (fetch-builds-for-agent - coordinator-interface - systems - (+ (max current-threads 1) - (count-post-build-jobs)) - #:log (build-log-procedure lgr))) + (let ((free-space (free-disk-space "/gnu/store"))) + (if (< free-space (* 2 (expt 2 30))) ; 2G + (begin + (log-msg + lgr 'WARN + "low space on /gnu/store, " + "not fetching new builds") + (sleep 30) + '()) + (fetch-builds-for-agent + coordinator-interface + systems + (+ (max current-threads 1) + (count-post-build-jobs)) + #:log (build-log-procedure lgr))))) #:unwind? #t)) (new-builds (remove (lambda (build) diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm index c0118aa..46535e4 100644 --- a/guix-build-coordinator/client-communication.scm +++ b/guix-build-coordinator/client-communication.scm @@ -151,7 +151,8 @@ (alist-delete 'end-time build-details)) - ,@(if (assq-ref build-details 'processed) + ,@(if (or (assq-ref build-details 'processed) + (assq-ref build-details 'canceled)) '() (datastore-find-unprocessed-build-entry datastore uuid)) (created-at . ,(or (and=> @@ -491,7 +492,11 @@ derivation-file substitute-urls))) (lambda () - (read-drv/substitute derivation-file)) + (with-throw-handler #t + (lambda () + (read-drv/substitute derivation-file)) + (lambda args + (backtrace)))) #:unwind? #t)) ,@(let ((priority (assoc-ref body "priority"))) (if priority diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index 3830d88..34b05c9 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -50,6 +50,7 @@ #:use-module (prometheus) #:use-module ((guix build syscalls) #:select (set-thread-name)) + #:use-module (guix store) #:use-module (guix derivations) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) @@ -394,11 +395,14 @@ (port-log (make <custom-port-log> #:port (current-output-port) #:formatter - (lambda (lvl time str) + ;; In guile-lib v0.2.8 onwards, the formatter is + ;; called with more arguments + (lambda args ; lvl, time, str (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime time)) - lvl - str)))) + (strftime "%F %H:%M:%S" (localtime + (second args))) + (first args) + (third args))))) (build-coordinator (make-build-coordinator-record datastore hooks @@ -420,6 +424,11 @@ ;; The logger assumes this (set-port-encoding! (current-output-port) "UTF-8") + ;; Work around my broken with-store/non-blocking in Guix + (let ((socket-file (%daemon-socket-uri))) + (%daemon-socket-uri + (string-append "file://" socket-file))) + (with-exception-handler (lambda (exn) (simple-format #t "failed enabling core dumps: ~A\n" exn)) @@ -441,14 +450,14 @@ (lambda (scheduler port) (display "#<scheduler>" port))) - (when update-datastore? - (datastore-update (build-coordinator-datastore build-coordinator))) - (when pid-file (call-with-output-file pid-file (lambda (port) (simple-format port "~A\n" (getpid))))) + (when update-datastore? + (datastore-update (build-coordinator-datastore build-coordinator))) + (set-build-coordinator-allocator-thread! build-coordinator (make-build-allocator-thread build-coordinator)) @@ -585,8 +594,8 @@ finished?) (wait finished?)) - #:hz 10 - #:parallelism 2)) + #:hz 0 + #:parallelism 1)) finished?))))) (define* (submit-build build-coordinator derivation-file @@ -619,9 +628,19 @@ (derivation (if derivation-exists-in-database? #f ; unnecessary to fetch derivation - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file)))) + ;; Bit of a hack, but offload reading the derivation to a + ;; thread so that it doesn't block the fibers thread, since + ;; local I/O doesn't cooperate with fibers + (datastore-call-with-transaction + datastore + (lambda _ + (with-fibers-port-timeouts + (lambda () + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))) + #:timeout 240)) + #:readonly? #t))) (system (or system-from-database @@ -740,9 +759,12 @@ (unless (datastore-find-derivation datastore derivation-file) (datastore-store-derivation datastore - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file)))) + (with-fibers-port-timeouts + (lambda () + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))) + #:timeout 30))) (let ((related-derivations-lacking-builds (if ensure-all-related-derivation-outputs-have-builds? diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index 5e39849..96751a5 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -123,7 +123,11 @@ (string-length "sqlite://"))) (when update-database? - (run-sqitch database-file)) + (retry-on-error + (lambda () + (run-sqitch database-file)) + #:times 2 + #:delay 5)) (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") @@ -354,6 +358,14 @@ PRAGMA optimize;") (define-method (datastore-spawn-fibers (datastore <sqlite-datastore>)) + ;; Queue messages to the writer thread, so that they're handled in a first + ;; come first served manor + (slot-set! + datastore + 'worker-writer-thread-channel + (make-queueing-channel + (slot-ref datastore 'worker-writer-thread-channel))) + (spawn-fiber (lambda () (while #t @@ -3704,11 +3716,12 @@ WHERE build_results.build_id = :build_id" (simple-format #t "running command: ~A\n" (string-join command)) - (unless (zero? (apply system* command)) - (simple-format - (current-error-port) - "error: sqitch command failed\n") - (exit 1)))) + (let ((pid (spawn (%config 'sqitch) command))) + (unless (zero? (cdr (waitpid pid))) + (simple-format + (current-error-port) + "error: sqitch command failed\n") + (exit 1))))) (define (changes-count db) (let ((statement diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm index bf4b576..bc055c2 100644 --- a/guix-build-coordinator/hooks.scm +++ b/guix-build-coordinator/hooks.scm @@ -528,18 +528,18 @@ (unless (eq? source-compression recompress-to) (when (file-exists? tmp-output-log-file) (delete-file tmp-output-log-file)) - (with-timeout timeout - (raise-exception - (make-exception-with-message "timeout recompressing log file")) - (call-with-compressed-input-file - source-log-file - source-compression - (lambda (input-port) - (call-with-compressed-output-file - tmp-output-log-file - recompress-to - (lambda (output-port) - (dump-port input-port output-port)))))) + (with-port-timeouts + (lambda () + (call-with-compressed-input-file + source-log-file + source-compression + (lambda (input-port) + (call-with-compressed-output-file + tmp-output-log-file + recompress-to + (lambda (output-port) + (dump-port input-port output-port)))))) + #:timeout timeout) (rename-file tmp-output-log-file output-log-file) (delete-file source-log-file))))) diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index 74b4539..0acd62a 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -39,9 +39,6 @@ #:use-module ((guix http-client) #:select (http-fetch)) #:use-module (guix serialization) - #:use-module ((guix build download) - #:select ((open-connection-for-uri - . guix:open-connection-for-uri))) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix scripts substitute) @@ -66,7 +63,7 @@ read-derivation-from-file* - with-store/non-blocking + non-blocking-port substitute-derivation read-derivation-through-substitutes @@ -86,9 +83,6 @@ create-work-queue create-thread-pool - with-timeout - reset-timeout - throttle get-load-average @@ -96,8 +90,6 @@ running-on-the-hurd? - get-gc-metrics-updater - get-port-metrics-updater get-guix-memory-metrics-updater open-socket-for-uri* @@ -401,23 +393,6 @@ (fcntl port F_SETFL (logior O_NONBLOCK flags))) port)) -(define (ensure-non-blocking-store-connection store) - "Mark the file descriptor that backs STORE, a <store-connection>, as -O_NONBLOCK." - (match (store-connection-socket store) - ((? file-port? port) - (non-blocking-port port)) - (_ #f))) - -(define-syntax-rule (with-store/non-blocking store exp ...) - "Like 'with-store', bind STORE to a connection to the store, but ensure that -said connection is non-blocking (O_NONBLOCK). Evaluate EXP... in that -context." - (with-store store - (ensure-non-blocking-store-connection store) - (let () - exp ...))) - (define* (substitute-derivation store derivation-name #:key substitute-urls) @@ -489,10 +464,9 @@ context." (match (assoc-ref cache key) (#f (let ((socket - (guix:open-connection-for-uri + (open-socket-for-uri* uri - #:verify-certificate? verify-certificate? - #:timeout timeout))) + #:verify-certificate? verify-certificate?))) (set! cache (alist-cons key socket cache)) socket)) (socket @@ -633,7 +607,7 @@ References: ~a~%" compressed-files)))) (define* (retry-on-error f #:key times delay ignore no-retry error-hook - sleep-impl) + (sleep-impl sleep)) (let loop ((attempt 1)) (match (with-exception-handler (lambda (exn) @@ -1270,27 +1244,6 @@ References: ~a~%" (values pool-mutex job-available count-threads list-jobs))) -;; copied from (guix scripts substitute) -(define-syntax-rule (with-timeout duration handler body ...) - "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY -again." - (begin - (sigaction SIGALRM - (lambda (signum) - (sigaction SIGALRM SIG_DFL) - handler)) - (alarm duration) - (call-with-values - (lambda () - body ...) - (lambda result - (alarm 0) - (sigaction SIGALRM SIG_DFL) - (apply values result))))) - -(define (reset-timeout duration) - (alarm duration)) - (define (throttle min-duration thunk) (let ((next-min-runtime 0)) (lambda () @@ -1329,55 +1282,6 @@ again." (set! cached-system (utsname:sysname (uname)))) (string=? cached-system "GNU"))) -(define (get-gc-metrics-updater registry) - (define metrics - `((gc-time-taken - . ,(make-gauge-metric registry "guile_gc_time_taken")) - (heap-size - . ,(make-gauge-metric registry "guile_heap_size")) - (heap-free-size - . ,(make-gauge-metric registry "guile_heap_free_size")) - (heap-total-allocated - . ,(make-gauge-metric registry "guile_heap_total_allocated")) - (heap-allocated-since-gc - . ,(make-gauge-metric registry "guile_allocated_since_gc")) - (protected-objects - . ,(make-gauge-metric registry "guile_gc_protected_objects")) - (gc-times - . ,(make-gauge-metric registry "guile_gc_times")))) - - (lambda () - (let ((stats (gc-stats))) - (for-each - (match-lambda - ((name . metric) - (let ((value (assq-ref stats name))) - (metric-set metric value)))) - metrics)))) - -(define (get-port-metrics-updater registry) - (let ((ports-metric - (make-gauge-metric registry "guile_ports_total")) - (fds-metric - (make-gauge-metric registry "file_descriptors_total"))) - (lambda () - (let ((count 0)) - (port-for-each - (lambda _ - (set! count (+ 1 count)))) - - (metric-set ports-metric count)) - - (metric-set - fds-metric - (length - ;; In theory 'scandir' cannot return #f, but in practice, - ;; we've seen it before. - (or (scandir "/proc/self/fd" - (lambda (file) - (not (member file '("." ".."))))) - '())))))) - (define (get-guix-memory-metrics-updater registry) (define %memoization-tables (@@ (guix memoization) %memoization-tables)) diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index 965948a..293429c 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -1,4 +1,5 @@ (define-module (guix-build-coordinator utils fibers) + #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 atomic) #:use-module (ice-9 threads) @@ -15,6 +16,7 @@ #:select (set-thread-name)) #:use-module (guix-build-coordinator utils) #:export (make-worker-thread-channel + %worker-thread-default-timeout call-with-worker-thread worker-thread-timeout-error? @@ -26,8 +28,13 @@ letpar& + port-timeout-error? + port-read-timeout-error? + port-write-timeout-error? with-fibers-timeout - with-fibers-port-timeouts) + with-fibers-port-timeouts + + make-queueing-channel) #:replace (retry-on-error)) (define %worker-thread-args @@ -202,8 +209,11 @@ arguments of the worker thread procedure." (define worker-thread-timeout-error? (record-predicate &worker-thread-timeout)) +(define %worker-thread-default-timeout + (make-parameter 30)) + (define* (call-with-worker-thread channel proc #:key duration-logger - (timeout 30)) + (timeout (%worker-thread-default-timeout))) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." (let ((args (%worker-thread-args))) @@ -298,11 +308,17 @@ If already in the worker thread, call PROC immediately." (simple-format (current-error-port) "port monitoring fiber failed to connect to ~A: ~A\n" port exn) - (signal-condition! error-condition)) + (signal-condition! error-condition) + (sleep 10) + (simple-format (current-error-port) + "port monitoring fiber error-condition unresponsive") + (primitive-exit 1)) (lambda () (with-fibers-port-timeouts (lambda () - (let ((sock (socket PF_INET SOCK_STREAM 0))) + (let ((sock + (non-blocking-port + (socket PF_INET SOCK_STREAM 0)))) (connect sock AF_INET INADDR_LOOPBACK port) (close-port sock))) #:timeout 20)) @@ -397,7 +413,7 @@ If already in the worker thread, call PROC immediately." (define &port-timeout (make-exception-type '&port-timeout &external-error - '(port))) + '(thunk port))) (define make-port-timeout-error (record-constructor &port-timeout)) @@ -474,7 +490,7 @@ If already in the worker thread, call PROC immediately." #:key timeout (read-timeout timeout) (write-timeout timeout)) - (define (no-fibers-wait port mode timeout) + (define (no-fibers-wait thunk port mode timeout) (define poll-timeout-ms 200) ;; When the GC runs, it restarts the poll syscall, but the timeout @@ -485,8 +501,7 @@ If already in the worker thread, call PROC immediately." ;; timed out overall. (let ((timeout-internal (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) + (* internal-time-units-per-second timeout)))) (let loop ((poll-value (port-poll port mode poll-timeout-ms))) (if (= poll-value 0) @@ -494,8 +509,8 @@ If already in the worker thread, call PROC immediately." timeout-internal) (raise-exception (if (string=? mode "r") - (make-port-read-timeout-error port) - (make-port-write-timeout-error port))) + (make-port-read-timeout-error thunk port) + (make-port-write-timeout-error thunk port))) (loop (port-poll port mode poll-timeout-ms))) poll-value)))) @@ -511,7 +526,7 @@ If already in the worker thread, call PROC immediately." (lambda () (raise-exception (make-port-read-timeout-error thunk port)))))) - (no-fibers-wait port "r" read-timeout)))) + (no-fibers-wait thunk port "r" read-timeout)))) (current-write-waiter (lambda (port) (if (current-scheduler) @@ -523,7 +538,7 @@ If already in the worker thread, call PROC immediately." (lambda () (raise-exception (make-port-write-timeout-error thunk port)))))) - (no-fibers-wait port "w" write-timeout))))) + (no-fibers-wait thunk port "w" write-timeout))))) (thunk))) ;; Use the fibers sleep @@ -532,4 +547,26 @@ If already in the worker thread, call PROC immediately." (@ (guix-build-coordinator utils) retry-on-error) (append args - (list #:sleep sleep)))) + (list #:sleep-impl sleep)))) + +(define (make-queueing-channel channel) + (define queue (make-q)) + + (let ((queue-channel (make-channel))) + (spawn-fiber + (lambda () + (while #t + (if (q-empty? queue) + (enq! queue + (perform-operation + (get-operation queue-channel))) + (let ((front (q-front queue))) + (perform-operation + (choice-operation + (wrap-operation (get-operation queue-channel) + (lambda (val) + (enq! queue val))) + (wrap-operation (put-operation channel front) + (lambda _ + (q-pop! queue)))))))))) + queue-channel)) |