aboutsummaryrefslogtreecommitdiff
path: root/guix-build-coordinator
diff options
context:
space:
mode:
Diffstat (limited to 'guix-build-coordinator')
-rw-r--r--guix-build-coordinator/agent.scm23
-rw-r--r--guix-build-coordinator/client-communication.scm9
-rw-r--r--guix-build-coordinator/coordinator.scm29
-rw-r--r--guix-build-coordinator/datastore/sqlite.scm8
-rw-r--r--guix-build-coordinator/hooks.scm24
-rw-r--r--guix-build-coordinator/utils.scm43
-rw-r--r--guix-build-coordinator/utils/fibers.scm55
7 files changed, 109 insertions, 82 deletions
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm
index 0520da2..a64a61c 100644
--- a/guix-build-coordinator/agent.scm
+++ b/guix-build-coordinator/agent.scm
@@ -46,7 +46,7 @@
#:use-module (guix base32)
#:use-module (guix serialization)
#:use-module ((guix build syscalls)
- #:select (set-thread-name))
+ #:select (set-thread-name free-disk-space))
#:use-module (guix-build-coordinator utils)
#:use-module (guix-build-coordinator agent-messaging)
#:use-module (guix-build-coordinator agent-messaging abstract)
@@ -516,12 +516,21 @@
'())
(lambda ()
- (fetch-builds-for-agent
- coordinator-interface
- systems
- (+ (max current-threads 1)
- (count-post-build-jobs))
- #:log (build-log-procedure lgr)))
+ (let ((free-space (free-disk-space "/gnu/store")))
+ (if (< free-space (* 2 (expt 2 30))) ; 2G
+ (begin
+ (log-msg
+ lgr 'WARN
+ "low space on /gnu/store, "
+ "not fetching new builds")
+ (sleep 30)
+ '())
+ (fetch-builds-for-agent
+ coordinator-interface
+ systems
+ (+ (max current-threads 1)
+ (count-post-build-jobs))
+ #:log (build-log-procedure lgr)))))
#:unwind? #t))
(new-builds
(remove (lambda (build)
diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm
index c0118aa..46535e4 100644
--- a/guix-build-coordinator/client-communication.scm
+++ b/guix-build-coordinator/client-communication.scm
@@ -151,7 +151,8 @@
(alist-delete
'end-time
build-details))
- ,@(if (assq-ref build-details 'processed)
+ ,@(if (or (assq-ref build-details 'processed)
+ (assq-ref build-details 'canceled))
'()
(datastore-find-unprocessed-build-entry datastore uuid))
(created-at . ,(or (and=>
@@ -491,7 +492,11 @@
derivation-file
substitute-urls)))
(lambda ()
- (read-drv/substitute derivation-file))
+ (with-throw-handler #t
+ (lambda ()
+ (read-drv/substitute derivation-file))
+ (lambda args
+ (backtrace))))
#:unwind? #t))
,@(let ((priority (assoc-ref body "priority")))
(if priority
diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm
index b75b40f..34b05c9 100644
--- a/guix-build-coordinator/coordinator.scm
+++ b/guix-build-coordinator/coordinator.scm
@@ -50,6 +50,7 @@
#:use-module (prometheus)
#:use-module ((guix build syscalls)
#:select (set-thread-name))
+ #:use-module (guix store)
#:use-module (guix derivations)
#:use-module (guix build utils)
#:use-module (guix-build-coordinator utils)
@@ -423,6 +424,11 @@
;; The logger assumes this
(set-port-encoding! (current-output-port) "UTF-8")
+ ;; Work around my broken with-store/non-blocking in Guix
+ (let ((socket-file (%daemon-socket-uri)))
+ (%daemon-socket-uri
+ (string-append "file://" socket-file)))
+
(with-exception-handler
(lambda (exn)
(simple-format #t "failed enabling core dumps: ~A\n" exn))
@@ -588,8 +594,8 @@
finished?)
(wait finished?))
- #:hz 10
- #:parallelism 2))
+ #:hz 0
+ #:parallelism 1))
finished?)))))
(define* (submit-build build-coordinator derivation-file
@@ -622,12 +628,19 @@
(derivation
(if derivation-exists-in-database?
#f ; unnecessary to fetch derivation
- (with-fibers-port-timeouts
- (lambda ()
- (call-with-delay-logging read-drv
- #:threshold 10
- #:args (list derivation-file)))
- #:timeout 30)))
+ ;; Bit of a hack, but offload reading the derivation to a
+ ;; thread so that it doesn't block the fibers thread, since
+ ;; local I/O doesn't cooperate with fibers
+ (datastore-call-with-transaction
+ datastore
+ (lambda _
+ (with-fibers-port-timeouts
+ (lambda ()
+ (call-with-delay-logging read-drv
+ #:threshold 10
+ #:args (list derivation-file)))
+ #:timeout 240))
+ #:readonly? #t)))
(system
(or system-from-database
diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm
index e67a940..96751a5 100644
--- a/guix-build-coordinator/datastore/sqlite.scm
+++ b/guix-build-coordinator/datastore/sqlite.scm
@@ -358,6 +358,14 @@ PRAGMA optimize;")
(define-method (datastore-spawn-fibers
(datastore <sqlite-datastore>))
+ ;; Queue messages to the writer thread, so that they're handled in a first
+ ;; come first served manor
+ (slot-set!
+ datastore
+ 'worker-writer-thread-channel
+ (make-queueing-channel
+ (slot-ref datastore 'worker-writer-thread-channel)))
+
(spawn-fiber
(lambda ()
(while #t
diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm
index bf4b576..bc055c2 100644
--- a/guix-build-coordinator/hooks.scm
+++ b/guix-build-coordinator/hooks.scm
@@ -528,18 +528,18 @@
(unless (eq? source-compression recompress-to)
(when (file-exists? tmp-output-log-file)
(delete-file tmp-output-log-file))
- (with-timeout timeout
- (raise-exception
- (make-exception-with-message "timeout recompressing log file"))
- (call-with-compressed-input-file
- source-log-file
- source-compression
- (lambda (input-port)
- (call-with-compressed-output-file
- tmp-output-log-file
- recompress-to
- (lambda (output-port)
- (dump-port input-port output-port))))))
+ (with-port-timeouts
+ (lambda ()
+ (call-with-compressed-input-file
+ source-log-file
+ source-compression
+ (lambda (input-port)
+ (call-with-compressed-output-file
+ tmp-output-log-file
+ recompress-to
+ (lambda (output-port)
+ (dump-port input-port output-port))))))
+ #:timeout timeout)
(rename-file tmp-output-log-file
output-log-file)
(delete-file source-log-file)))))
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm
index 1bb1942..0acd62a 100644
--- a/guix-build-coordinator/utils.scm
+++ b/guix-build-coordinator/utils.scm
@@ -63,7 +63,7 @@
read-derivation-from-file*
- with-store/non-blocking
+ non-blocking-port
substitute-derivation
read-derivation-through-substitutes
@@ -83,9 +83,6 @@
create-work-queue
create-thread-pool
- with-timeout
- reset-timeout
-
throttle
get-load-average
@@ -396,23 +393,6 @@
(fcntl port F_SETFL (logior O_NONBLOCK flags)))
port))
-(define (ensure-non-blocking-store-connection store)
- "Mark the file descriptor that backs STORE, a <store-connection>, as
-O_NONBLOCK."
- (match (store-connection-socket store)
- ((? file-port? port)
- (non-blocking-port port))
- (_ #f)))
-
-(define-syntax-rule (with-store/non-blocking store exp ...)
- "Like 'with-store', bind STORE to a connection to the store, but ensure that
-said connection is non-blocking (O_NONBLOCK). Evaluate EXP... in that
-context."
- (with-store store
- (ensure-non-blocking-store-connection store)
- (let ()
- exp ...)))
-
(define* (substitute-derivation store
derivation-name
#:key substitute-urls)
@@ -1264,27 +1244,6 @@ References: ~a~%"
(values pool-mutex job-available count-threads list-jobs)))
-;; copied from (guix scripts substitute)
-(define-syntax-rule (with-timeout duration handler body ...)
- "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY
-again."
- (begin
- (sigaction SIGALRM
- (lambda (signum)
- (sigaction SIGALRM SIG_DFL)
- handler))
- (alarm duration)
- (call-with-values
- (lambda ()
- body ...)
- (lambda result
- (alarm 0)
- (sigaction SIGALRM SIG_DFL)
- (apply values result)))))
-
-(define (reset-timeout duration)
- (alarm duration))
-
(define (throttle min-duration thunk)
(let ((next-min-runtime 0))
(lambda ()
diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm
index 5362b18..293429c 100644
--- a/guix-build-coordinator/utils/fibers.scm
+++ b/guix-build-coordinator/utils/fibers.scm
@@ -1,4 +1,5 @@
(define-module (guix-build-coordinator utils fibers)
+ #:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
@@ -15,6 +16,7 @@
#:select (set-thread-name))
#:use-module (guix-build-coordinator utils)
#:export (make-worker-thread-channel
+ %worker-thread-default-timeout
call-with-worker-thread
worker-thread-timeout-error?
@@ -26,8 +28,13 @@
letpar&
+ port-timeout-error?
+ port-read-timeout-error?
+ port-write-timeout-error?
with-fibers-timeout
- with-fibers-port-timeouts)
+ with-fibers-port-timeouts
+
+ make-queueing-channel)
#:replace (retry-on-error))
(define %worker-thread-args
@@ -202,8 +209,11 @@ arguments of the worker thread procedure."
(define worker-thread-timeout-error?
(record-predicate &worker-thread-timeout))
+(define %worker-thread-default-timeout
+ (make-parameter 30))
+
(define* (call-with-worker-thread channel proc #:key duration-logger
- (timeout 30))
+ (timeout (%worker-thread-default-timeout)))
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
(let ((args (%worker-thread-args)))
@@ -306,7 +316,9 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(with-fibers-port-timeouts
(lambda ()
- (let ((sock (socket PF_INET SOCK_STREAM 0)))
+ (let ((sock
+ (non-blocking-port
+ (socket PF_INET SOCK_STREAM 0))))
(connect sock AF_INET INADDR_LOOPBACK port)
(close-port sock)))
#:timeout 20))
@@ -401,7 +413,7 @@ If already in the worker thread, call PROC immediately."
(define &port-timeout
(make-exception-type '&port-timeout
&external-error
- '(port)))
+ '(thunk port)))
(define make-port-timeout-error
(record-constructor &port-timeout))
@@ -478,7 +490,7 @@ If already in the worker thread, call PROC immediately."
#:key timeout
(read-timeout timeout)
(write-timeout timeout))
- (define (no-fibers-wait port mode timeout)
+ (define (no-fibers-wait thunk port mode timeout)
(define poll-timeout-ms 200)
;; When the GC runs, it restarts the poll syscall, but the timeout
@@ -489,8 +501,7 @@ If already in the worker thread, call PROC immediately."
;; timed out overall.
(let ((timeout-internal
(+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
+ (* internal-time-units-per-second timeout))))
(let loop ((poll-value
(port-poll port mode poll-timeout-ms)))
(if (= poll-value 0)
@@ -498,8 +509,8 @@ If already in the worker thread, call PROC immediately."
timeout-internal)
(raise-exception
(if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
+ (make-port-read-timeout-error thunk port)
+ (make-port-write-timeout-error thunk port)))
(loop (port-poll port mode poll-timeout-ms)))
poll-value))))
@@ -515,7 +526,7 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(raise-exception
(make-port-read-timeout-error thunk port))))))
- (no-fibers-wait port "r" read-timeout))))
+ (no-fibers-wait thunk port "r" read-timeout))))
(current-write-waiter
(lambda (port)
(if (current-scheduler)
@@ -527,7 +538,7 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(raise-exception
(make-port-write-timeout-error thunk port))))))
- (no-fibers-wait port "w" write-timeout)))))
+ (no-fibers-wait thunk port "w" write-timeout)))))
(thunk)))
;; Use the fibers sleep
@@ -537,3 +548,25 @@ If already in the worker thread, call PROC immediately."
(append
args
(list #:sleep-impl sleep))))
+
+(define (make-queueing-channel channel)
+ (define queue (make-q))
+
+ (let ((queue-channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (if (q-empty? queue)
+ (enq! queue
+ (perform-operation
+ (get-operation queue-channel)))
+ (let ((front (q-front queue)))
+ (perform-operation
+ (choice-operation
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val)))
+ (wrap-operation (put-operation channel front)
+ (lambda _
+ (q-pop! queue))))))))))
+ queue-channel))