aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guile.am2
-rw-r--r--guix-build-coordinator/agent-messaging/http/server.scm21
-rw-r--r--guix-build-coordinator/agent.scm34
-rw-r--r--guix-build-coordinator/client-communication.scm9
-rw-r--r--guix-build-coordinator/coordinator.scm39
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm25
-rw-r--r--guix-build-coordinator/hooks.scm24
-rw-r--r--guix-build-coordinator/utils.scm104
-rw-r--r--guix-build-coordinator/utils/fibers.scm63
-rw-r--r--scripts/guix-build-coordinator.in1
10 files changed, 154 insertions, 168 deletions
diff --git a/guile.am b/guile.am
index 7f07ca2..bb3ce42 100644
--- a/guile.am
+++ b/guile.am
@@ -19,4 +19,4 @@ EXTRA_DIST = $(SOURCES) $(NOCOMP_SOURCES)
GUILE_WARNINGS = -Wunbound-variable -Warity-mismatch -Wformat
SUFFIXES = .scm .go
.scm.go:
- $(AM_V_GEN)$(top_builddir)/pre-inst-env $(GUILE_TOOLS) compile $(GUILE_TARGET) $(GUILE_WARNINGS) -o "$@" "$<"
+ $(AM_V_GEN)$(top_builddir)/pre-inst-env $(GUILE_TOOLS) compile -O1 $(GUILE_TARGET) $(GUILE_WARNINGS) -o "$@" "$<"
diff --git a/guix-build-coordinator/agent-messaging/http/server.scm b/guix-build-coordinator/agent-messaging/http/server.scm
index 7bfc0b5..85ba0d4 100644
--- a/guix-build-coordinator/agent-messaging/http/server.scm
+++ b/guix-build-coordinator/agent-messaging/http/server.scm
@@ -137,13 +137,14 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
build-coordinator
chunked-request-channel
output-hash-channel)
+ (define plain-metrics-registry
+ (make-metrics-registry))
+
(define gc-metrics-updater
- (get-gc-metrics-updater
- (build-coordinator-metrics-registry build-coordinator)))
+ (get-gc-metrics-updater plain-metrics-registry))
- (define port-metrics-updater
- (get-port-metrics-updater
- (build-coordinator-metrics-registry build-coordinator)))
+ (define process-metrics-updater
+ (get-process-metrics-updater plain-metrics-registry))
(define thread-metric
(make-gauge-metric
@@ -157,7 +158,7 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(call-with-delay-logging gc-metrics-updater)
(metric-set thread-metric
(length (all-threads)))
- (call-with-delay-logging port-metrics-updater)
+ (call-with-delay-logging process-metrics-updater)
(call-with-delay-logging datastore-metrics-updater))
(with-exception-handler
@@ -184,7 +185,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
build-coordinator
chunked-request-channel
output-hash-channel
- update-managed-metrics!)))
+ update-managed-metrics!
+ plain-metrics-registry)))
#:host host
#:port port))
#:unwind? #t))
@@ -614,7 +616,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
build-coordinator
chunked-request-channel
output-hash-channel
- update-managed-metrics!)
+ update-managed-metrics!
+ plain-metrics-registry)
(define (authenticated? uuid request)
(let* ((authorization-base64
(match (assq-ref (request-headers request)
@@ -1015,6 +1018,8 @@ INTERVAL (a time-duration object), otherwise does nothing and returns #f."
(lambda (port)
(write-metrics (build-coordinator-metrics-registry
build-coordinator)
+ port)
+ (write-metrics plain-metrics-registry
port))))))
(_
(render-json
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index 8144947..a64a61c 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -46,7 +46,7 @@
#:use-module (guix base32)
#:use-module (guix serialization)
#:use-module ((guix build syscalls)
- #:select (set-thread-name))
+ #:select (set-thread-name free-disk-space))
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator agent-messaging)
#:use-module (guix-build-coordinator agent-messaging abstract)
@@ -82,11 +82,14 @@
(define port-log (make <port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
(format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str))))
+ (strftime "%F %H:%M:%S" (localtime
+ (second args)))
+ (first args)
+ (third args)))))
(define metrics-enabled?
(and (not (string-null? metrics-file))
@@ -513,12 +516,21 @@
'())
(lambda ()
- (fetch-builds-for-agent
- coordinator-interface
- systems
- (+ (max current-threads 1)
- (count-post-build-jobs))
- #:log (build-log-procedure lgr)))
+ (let ((free-space (free-disk-space "/gnu/store")))
+ (if (< free-space (* 2 (expt 2 30))) ; 2G
+ (begin
+ (log-msg
+ lgr 'WARN
+ "low space on /gnu/store, "
+ "not fetching new builds")
+ (sleep 30)
+ '())
+ (fetch-builds-for-agent
+ coordinator-interface
+ systems
+ (+ (max current-threads 1)
+ (count-post-build-jobs))
+ #:log (build-log-procedure lgr)))))
#:unwind? #t))
(new-builds
(remove (lambda (build)
diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm
index c0118aa..46535e4 100644
--- a/guix-build-coordinator/client-communication.scm
+++ b/guix-build-coordinator/client-communication.scm
@@ -151,7 +151,8 @@
(alist-delete
'end-time
build-details))
- ,@(if (assq-ref build-details 'processed)
+ ,@(if (or (assq-ref build-details 'processed)
+ (assq-ref build-details 'canceled))
'()
(datastore-find-unprocessed-build-entry datastore uuid))
(created-at . ,(or (and=>
@@ -491,7 +492,11 @@
derivation-file
substitute-urls)))
(lambda ()
- (read-drv/substitute derivation-file))
+ (with-throw-handler #t
+ (lambda ()
+ (read-drv/substitute derivation-file))
+ (lambda args
+ (backtrace))))
#:unwind? #t))
,@(let ((priority (assoc-ref body "priority")))
(if priority
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index 3830d88..e859f8a 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -394,11 +394,14 @@
(port-log (make <custom-port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
(format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str))))
+ (strftime "%F %H:%M:%S" (localtime
+ (second args)))
+ (first args)
+ (third args)))))
(build-coordinator
(make-build-coordinator-record datastore
hooks
@@ -441,14 +444,14 @@
(lambda (scheduler port)
(display "#<scheduler>" port)))
- (when update-datastore?
- (datastore-update (build-coordinator-datastore build-coordinator)))
-
(when pid-file
(call-with-output-file pid-file
(lambda (port)
(simple-format port "~A\n" (getpid)))))
+ (when update-datastore?
+ (datastore-update (build-coordinator-datastore build-coordinator)))
+
(set-build-coordinator-allocator-thread!
build-coordinator
(make-build-allocator-thread build-coordinator))
@@ -585,8 +588,8 @@
finished?)
(wait finished?))
- #:hz 10
- #:parallelism 2))
+ #:hz 0
+ #:parallelism 1))
finished?)))))
(define* (submit-build build-coordinator derivation-file
@@ -619,9 +622,12 @@
(derivation
(if derivation-exists-in-database?
#f ; unnecessary to fetch derivation
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file))))
+ (with-fibers-port-timeouts
+ (lambda ()
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
+ #:timeout 240)))
(system
(or system-from-database
@@ -740,9 +746,12 @@
(unless (datastore-find-derivation datastore derivation-file)
(datastore-store-derivation
datastore
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file))))
+ (with-fibers-port-timeouts
+ (lambda ()
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
+ #:timeout 30)))
(let ((related-derivations-lacking-builds
(if ensure-all-related-derivation-outputs-have-builds?
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index 5e39849..96751a5 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -123,7 +123,11 @@
(string-length "sqlite://")))
(when update-database?
- (run-sqitch database-file))
+ (retry-on-error
+ (lambda ()
+ (run-sqitch database-file))
+ #:times 2
+ #:delay 5))
(let ((db (db-open database-file)))
(sqlite-exec db "PRAGMA journal_mode=WAL;")
@@ -354,6 +358,14 @@ PRAGMA optimize;")
(define-method (datastore-spawn-fibers
(datastore <sqlite-datastore>))
+ ;; Queue messages to the writer thread, so that they're handled in a first
+ ;; come first served manor
+ (slot-set!
+ datastore
+ 'worker-writer-thread-channel
+ (make-queueing-channel
+ (slot-ref datastore 'worker-writer-thread-channel)))
+
(spawn-fiber
(lambda ()
(while #t
@@ -3704,11 +3716,12 @@ WHERE build_results.build_id = :build_id"
(simple-format #t "running command: ~A\n"
(string-join command))
- (unless (zero? (apply system* command))
- (simple-format
- (current-error-port)
- "error: sqitch command failed\n")
- (exit 1))))
+ (let ((pid (spawn (%config 'sqitch) command)))
+ (unless (zero? (cdr (waitpid pid)))
+ (simple-format
+ (current-error-port)
+ "error: sqitch command failed\n")
+ (exit 1)))))
(define (changes-count db)
(let ((statement
diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm
index bf4b576..bc055c2 100644
--- a/guix-build-coordinator/hooks.scm
+++ b/guix-build-coordinator/hooks.scm
@@ -528,18 +528,18 @@
(unless (eq? source-compression recompress-to)
(when (file-exists? tmp-output-log-file)
(delete-file tmp-output-log-file))
- (with-timeout timeout
- (raise-exception
- (make-exception-with-message "timeout recompressing log file"))
- (call-with-compressed-input-file
- source-log-file
- source-compression
- (lambda (input-port)
- (call-with-compressed-output-file
- tmp-output-log-file
- recompress-to
- (lambda (output-port)
- (dump-port input-port output-port))))))
+ (with-port-timeouts
+ (lambda ()
+ (call-with-compressed-input-file
+ source-log-file
+ source-compression
+ (lambda (input-port)
+ (call-with-compressed-output-file
+ tmp-output-log-file
+ recompress-to
+ (lambda (output-port)
+ (dump-port input-port output-port))))))
+ #:timeout timeout)
(rename-file tmp-output-log-file
output-log-file)
(delete-file source-log-file)))))
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index 74b4539..0acd62a 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -39,9 +39,6 @@
#:use-module ((guix http-client)
#:select (http-fetch))
#:use-module (guix serialization)
- #:use-module ((guix build download)
- #:select ((open-connection-for-uri
- . guix:open-connection-for-uri)))
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:use-module (guix scripts substitute)
@@ -66,7 +63,7 @@
read-derivation-from-file*
- with-store/non-blocking
+ non-blocking-port
substitute-derivation
read-derivation-through-substitutes
@@ -86,9 +83,6 @@
create-work-queue
create-thread-pool
- with-timeout
- reset-timeout
-
throttle
get-load-average
@@ -96,8 +90,6 @@
running-on-the-hurd?
- get-gc-metrics-updater
- get-port-metrics-updater
get-guix-memory-metrics-updater
open-socket-for-uri*
@@ -401,23 +393,6 @@
(fcntl port F_SETFL (logior O_NONBLOCK flags)))
port))
-(define (ensure-non-blocking-store-connection store)
- "Mark the file descriptor that backs STORE, a <store-connection>, as
-O_NONBLOCK."
- (match (store-connection-socket store)
- ((? file-port? port)
- (non-blocking-port port))
- (_ #f)))
-
-(define-syntax-rule (with-store/non-blocking store exp ...)
- "Like 'with-store', bind STORE to a connection to the store, but ensure that
-said connection is non-blocking (O_NONBLOCK). Evaluate EXP... in that
-context."
- (with-store store
- (ensure-non-blocking-store-connection store)
- (let ()
- exp ...)))
-
(define* (substitute-derivation store
derivation-name
#:key substitute-urls)
@@ -489,10 +464,9 @@ context."
(match (assoc-ref cache key)
(#f
(let ((socket
- (guix:open-connection-for-uri
+ (open-socket-for-uri*
uri
- #:verify-certificate? verify-certificate?
- #:timeout timeout)))
+ #:verify-certificate? verify-certificate?)))
(set! cache (alist-cons key socket cache))
socket))
(socket
@@ -633,7 +607,7 @@ References: ~a~%"
compressed-files))))
(define* (retry-on-error f #:key times delay ignore no-retry error-hook
- sleep-impl)
+ (sleep-impl sleep))
(let loop ((attempt 1))
(match (with-exception-handler
(lambda (exn)
@@ -1270,27 +1244,6 @@ References: ~a~%"
(values pool-mutex job-available count-threads list-jobs)))
-;; copied from (guix scripts substitute)
-(define-syntax-rule (with-timeout duration handler body ...)
- "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY
-again."
- (begin
- (sigaction SIGALRM
- (lambda (signum)
- (sigaction SIGALRM SIG_DFL)
- handler))
- (alarm duration)
- (call-with-values
- (lambda ()
- body ...)
- (lambda result
- (alarm 0)
- (sigaction SIGALRM SIG_DFL)
- (apply values result)))))
-
-(define (reset-timeout duration)
- (alarm duration))
-
(define (throttle min-duration thunk)
(let ((next-min-runtime 0))
(lambda ()
@@ -1329,55 +1282,6 @@ again."
(set! cached-system (utsname:sysname (uname))))
(string=? cached-system "GNU")))
-(define (get-gc-metrics-updater registry)
- (define metrics
- `((gc-time-taken
- . ,(make-gauge-metric registry "guile_gc_time_taken"))
- (heap-size
- . ,(make-gauge-metric registry "guile_heap_size"))
- (heap-free-size
- . ,(make-gauge-metric registry "guile_heap_free_size"))
- (heap-total-allocated
- . ,(make-gauge-metric registry "guile_heap_total_allocated"))
- (heap-allocated-since-gc
- . ,(make-gauge-metric registry "guile_allocated_since_gc"))
- (protected-objects
- . ,(make-gauge-metric registry "guile_gc_protected_objects"))
- (gc-times
- . ,(make-gauge-metric registry "guile_gc_times"))))
-
- (lambda ()
- (let ((stats (gc-stats)))
- (for-each
- (match-lambda
- ((name . metric)
- (let ((value (assq-ref stats name)))
- (metric-set metric value))))
- metrics))))
-
-(define (get-port-metrics-updater registry)
- (let ((ports-metric
- (make-gauge-metric registry "guile_ports_total"))
- (fds-metric
- (make-gauge-metric registry "file_descriptors_total")))
- (lambda ()
- (let ((count 0))
- (port-for-each
- (lambda _
- (set! count (+ 1 count))))
-
- (metric-set ports-metric count))
-
- (metric-set
- fds-metric
- (length
- ;; In theory 'scandir' cannot return #f, but in practice,
- ;; we've seen it before.
- (or (scandir "/proc/self/fd"
- (lambda (file)
- (not (member file '("." "..")))))
- '()))))))
-
(define (get-guix-memory-metrics-updater registry)
(define %memoization-tables
(@@ (guix memoization) %memoization-tables))
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 965948a..293429c 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,4 +1,5 @@
(define-module (guix-build-coordinator utils fibers)
+ #:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
@@ -15,6 +16,7 @@
#:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
#:export (make-worker-thread-channel
+ %worker-thread-default-timeout
call-with-worker-thread
worker-thread-timeout-error?
@@ -26,8 +28,13 @@
letpar&
+ port-timeout-error?
+ port-read-timeout-error?
+ port-write-timeout-error?
with-fibers-timeout
- with-fibers-port-timeouts)
+ with-fibers-port-timeouts
+
+ make-queueing-channel)
#:replace (retry-on-error))
(define %worker-thread-args
@@ -202,8 +209,11 @@ arguments of the worker thread procedure."
(define worker-thread-timeout-error?
(record-predicate &worker-thread-timeout))
+(define %worker-thread-default-timeout
+ (make-parameter 30))
+
(define* (call-with-worker-thread channel proc #:key duration-logger
- (timeout 30))
+ (timeout (%worker-thread-default-timeout)))
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
(let ((args (%worker-thread-args)))
@@ -298,11 +308,17 @@ If already in the worker thread, call PROC immediately."
(simple-format (current-error-port)
"port monitoring fiber failed to connect to ~A: ~A\n"
port exn)
- (signal-condition! error-condition))
+ (signal-condition! error-condition)
+ (sleep 10)
+ (simple-format (current-error-port)
+ "port monitoring fiber error-condition unresponsive")
+ (primitive-exit 1))
(lambda ()
(with-fibers-port-timeouts
(lambda ()
- (let ((sock (socket PF_INET SOCK_STREAM 0)))
+ (let ((sock
+ (non-blocking-port
+ (socket PF_INET SOCK_STREAM 0))))
(connect sock AF_INET INADDR_LOOPBACK port)
(close-port sock)))
#:timeout 20))
@@ -397,7 +413,7 @@ If already in the worker thread, call PROC immediately."
(define &port-timeout
(make-exception-type '&port-timeout
&external-error
- '(port)))
+ '(thunk port)))
(define make-port-timeout-error
(record-constructor &port-timeout))
@@ -474,7 +490,7 @@ If already in the worker thread, call PROC immediately."
#:key timeout
(read-timeout timeout)
(write-timeout timeout))
- (define (no-fibers-wait port mode timeout)
+ (define (no-fibers-wait thunk port mode timeout)
(define poll-timeout-ms 200)
;; When the GC runs, it restarts the poll syscall, but the timeout
@@ -485,8 +501,7 @@ If already in the worker thread, call PROC immediately."
;; timed out overall.
(let ((timeout-internal
(+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
+ (* internal-time-units-per-second timeout))))
(let loop ((poll-value
(port-poll port mode poll-timeout-ms)))
(if (= poll-value 0)
@@ -494,8 +509,8 @@ If already in the worker thread, call PROC immediately."
timeout-internal)
(raise-exception
(if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
+ (make-port-read-timeout-error thunk port)
+ (make-port-write-timeout-error thunk port)))
(loop (port-poll port mode poll-timeout-ms)))
poll-value))))
@@ -511,7 +526,7 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(raise-exception
(make-port-read-timeout-error thunk port))))))
- (no-fibers-wait port "r" read-timeout))))
+ (no-fibers-wait thunk port "r" read-timeout))))
(current-write-waiter
(lambda (port)
(if (current-scheduler)
@@ -523,7 +538,7 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(raise-exception
(make-port-write-timeout-error thunk port))))))
- (no-fibers-wait port "w" write-timeout)))))
+ (no-fibers-wait thunk port "w" write-timeout)))))
(thunk)))
;; Use the fibers sleep
@@ -532,4 +547,26 @@ If already in the worker thread, call PROC immediately."
(@ (guix-build-coordinator utils) retry-on-error)
(append
args
- (list #:sleep sleep))))
+ (list #:sleep-impl sleep))))
+
+(define (make-queueing-channel channel)
+ (define queue (make-q))
+
+ (let ((queue-channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (if (q-empty? queue)
+ (enq! queue
+ (perform-operation
+ (get-operation queue-channel)))
+ (let ((front (q-front queue)))
+ (perform-operation
+ (choice-operation
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val)))
+ (wrap-operation (put-operation channel front)
+ (lambda _
+ (q-pop! queue))))))))))
+ queue-channel))
diff --git a/scripts/guix-build-coordinator.in b/scripts/guix-build-coordinator.in
index 4756bea..86f604f 100644
--- a/scripts/guix-build-coordinator.in
+++ b/scripts/guix-build-coordinator.in
@@ -502,6 +502,7 @@ canceled?: ~A
(vector->list
(assoc-ref build-details "tags")))
"\n"))
+ (newline)
(let ((derivation-inputs
(vector->list