diff options
-rw-r--r-- | guix-build-coordinator/agent.scm | 23 | ||||
-rw-r--r-- | guix-build-coordinator/client-communication.scm | 6 | ||||
-rw-r--r-- | guix-build-coordinator/coordinator.scm | 29 | ||||
-rw-r--r-- | guix-build-coordinator/datastore/sqlite.scm | 8 | ||||
-rw-r--r-- | guix-build-coordinator/hooks.scm | 135 | ||||
-rw-r--r-- | guix-build-coordinator/utils.scm | 19 | ||||
-rw-r--r-- | guix-build-coordinator/utils/fibers.scm | 39 |
7 files changed, 191 insertions, 68 deletions
diff --git a/guix-build-coordinator/agent.scm b/guix-build-coordinator/agent.scm index 0520da2..a64a61c 100644 --- a/guix-build-coordinator/agent.scm +++ b/guix-build-coordinator/agent.scm @@ -46,7 +46,7 @@ #:use-module (guix base32) #:use-module (guix serialization) #:use-module ((guix build syscalls) - #:select (set-thread-name)) + #:select (set-thread-name free-disk-space)) #:use-module (guix-build-coordinator utils) #:use-module (guix-build-coordinator agent-messaging) #:use-module (guix-build-coordinator agent-messaging abstract) @@ -516,12 +516,21 @@ '()) (lambda () - (fetch-builds-for-agent - coordinator-interface - systems - (+ (max current-threads 1) - (count-post-build-jobs)) - #:log (build-log-procedure lgr))) + (let ((free-space (free-disk-space "/gnu/store"))) + (if (< free-space (* 2 (expt 2 30))) ; 2G + (begin + (log-msg + lgr 'WARN + "low space on /gnu/store, " + "not fetching new builds") + (sleep 30) + '()) + (fetch-builds-for-agent + coordinator-interface + systems + (+ (max current-threads 1) + (count-post-build-jobs)) + #:log (build-log-procedure lgr))))) #:unwind? #t)) (new-builds (remove (lambda (build) diff --git a/guix-build-coordinator/client-communication.scm b/guix-build-coordinator/client-communication.scm index 80d8c96..46535e4 100644 --- a/guix-build-coordinator/client-communication.scm +++ b/guix-build-coordinator/client-communication.scm @@ -151,7 +151,8 @@ (alist-delete 'end-time build-details)) - ,@(if (assq-ref build-details 'processed) + ,@(if (or (assq-ref build-details 'processed) + (assq-ref build-details 'canceled)) '() (datastore-find-unprocessed-build-entry datastore uuid)) (created-at . ,(or (and=> @@ -461,8 +462,7 @@ derivation)))) (define (read-drv/substitute derivation-file) - (with-store store - (ensure-non-blocking-store-connection store) + (with-store/non-blocking store (unless (valid-path? store derivation-file) (substitute-derivation store derivation-file diff --git a/guix-build-coordinator/coordinator.scm b/guix-build-coordinator/coordinator.scm index b75b40f..34b05c9 100644 --- a/guix-build-coordinator/coordinator.scm +++ b/guix-build-coordinator/coordinator.scm @@ -50,6 +50,7 @@ #:use-module (prometheus) #:use-module ((guix build syscalls) #:select (set-thread-name)) + #:use-module (guix store) #:use-module (guix derivations) #:use-module (guix build utils) #:use-module (guix-build-coordinator utils) @@ -423,6 +424,11 @@ ;; The logger assumes this (set-port-encoding! (current-output-port) "UTF-8") + ;; Work around my broken with-store/non-blocking in Guix + (let ((socket-file (%daemon-socket-uri))) + (%daemon-socket-uri + (string-append "file://" socket-file))) + (with-exception-handler (lambda (exn) (simple-format #t "failed enabling core dumps: ~A\n" exn)) @@ -588,8 +594,8 @@ finished?) (wait finished?)) - #:hz 10 - #:parallelism 2)) + #:hz 0 + #:parallelism 1)) finished?))))) (define* (submit-build build-coordinator derivation-file @@ -622,12 +628,19 @@ (derivation (if derivation-exists-in-database? #f ; unnecessary to fetch derivation - (with-fibers-port-timeouts - (lambda () - (call-with-delay-logging read-drv - #:threshold 10 - #:args (list derivation-file))) - #:timeout 30))) + ;; Bit of a hack, but offload reading the derivation to a + ;; thread so that it doesn't block the fibers thread, since + ;; local I/O doesn't cooperate with fibers + (datastore-call-with-transaction + datastore + (lambda _ + (with-fibers-port-timeouts + (lambda () + (call-with-delay-logging read-drv + #:threshold 10 + #:args (list derivation-file))) + #:timeout 240)) + #:readonly? #t))) (system (or system-from-database diff --git a/guix-build-coordinator/datastore/sqlite.scm b/guix-build-coordinator/datastore/sqlite.scm index e67a940..96751a5 100644 --- a/guix-build-coordinator/datastore/sqlite.scm +++ b/guix-build-coordinator/datastore/sqlite.scm @@ -358,6 +358,14 @@ PRAGMA optimize;") (define-method (datastore-spawn-fibers (datastore <sqlite-datastore>)) + ;; Queue messages to the writer thread, so that they're handled in a first + ;; come first served manor + (slot-set! + datastore + 'worker-writer-thread-channel + (make-queueing-channel + (slot-ref datastore 'worker-writer-thread-channel))) + (spawn-fiber (lambda () (while #t diff --git a/guix-build-coordinator/hooks.scm b/guix-build-coordinator/hooks.scm index bc055c2..50c2445 100644 --- a/guix-build-coordinator/hooks.scm +++ b/guix-build-coordinator/hooks.scm @@ -32,7 +32,8 @@ #:use-module (guix config) #:use-module (guix derivations) #:use-module (guix serialization) - #:use-module ((guix utils) #:select (default-keyword-arguments)) + #:use-module ((guix utils) #:select (default-keyword-arguments + call-with-decompressed-port)) #:use-module (guix build utils) #:use-module (guix-build-coordinator config) #:use-module (guix-build-coordinator utils) @@ -81,6 +82,20 @@ build-id agent-id) (current-error-port)))) +(define* (default-nar-compressions #:key output-filename nar-size + source-compression source-size) + (define MiB (* (expt 2 20) 1.)) + + (if (eq? 'none source-compression) + ;; If the agent didn't compress the nar, don't change that here + (list 'none) + (let* ((compression-proportion + (/ source-size nar-size))) + (if (or (> compression-proportion 0.95) + (< nar-size (* 0.5 MiB))) + '(none) + (list source-compression))))) + (define* (build-success-publish-hook publish-directory #:key @@ -94,7 +109,8 @@ post-publish-hook combined-post-publish-hook (publish-referenced-derivation-source-files? #t) - derivation-substitute-urls) + derivation-substitute-urls + (nar-compressions-proc default-nar-compressions)) (mkdir-p (string-append publish-directory "/nar/lzip")) (lambda (build-coordinator build-id) @@ -218,8 +234,7 @@ nar-filename)) #:unwind? #t)) - (cons narinfo-filename - nar-filename))))) + narinfo-filename)))) referenced-source-files)))))) (define (process-output drv-name output) @@ -230,12 +245,6 @@ (nar-location (build-output-file-location datastore build-id output-name)) - (nar-filename - (string-append "nar/lzip/" - (basename output-filename))) - (nar-destination - (string-append publish-directory "/" - nar-filename)) (narinfo-filename (string-append (string-take (basename output-filename) 32) ".narinfo")) @@ -246,8 +255,72 @@ (if (skip-publishing-proc narinfo-filename narinfo-directory) #f - (begin - (copy-file nar-location nar-destination) + (let ((compressions + (nar-compressions-proc + #:output-filename output-filename + #:nar-size (assq-ref output 'size) + ;; TODO Don't hardcode this + #:source-compression 'lzip + #:source-size (stat:size (stat nar-location #f))))) + + (for-each + (lambda (compression) + (if (or (and (pair? compression) + (eq? (car compression) 'lzip)) + (eq? compression 'lzip)) + ;; TODO If the agents start uploading uncompressed files + ;; or files compressed differently, this might not be + ;; right + (let ((nar-destination + (string-append publish-directory "/" + "nar/lzip/" + (basename output-filename)))) + (copy-file nar-location nar-destination)) + (let* ((target-compression + (if (pair? compression) + (car compression) + compression)) + (nar-destination + (string-append + publish-directory "/" + "nar/" (symbol->string target-compression) "/" + (basename output-filename))) + (temporary-destination + (string-append nar-destination ".tmp"))) + (mkdir-p (dirname temporary-destination)) + (call-with-input-file nar-location + (lambda (source-port) + (call-with-decompressed-port + ;; TODO Don't assume the source compression + 'lzip + source-port + (lambda (decompressed-source-port) + (let ((call-with-compressed-output-port* + (match target-compression + ('gzip + (@ (zlib) call-with-gzip-output-port)) + ('lzip + (@ (lzlib) call-with-lzip-output-port)) + ('zstd + (@ (zstd) call-with-zstd-output-port)) + ('none + (lambda (port proc) + (proc port)))))) + (when (file-exists? temporary-destination) + (delete-file temporary-destination)) + (apply + call-with-compressed-output-port* + (open-output-file temporary-destination + #:binary #t) + (lambda (compressed-port) + (dump-port decompressed-source-port + compressed-port)) + (if (pair? compression) + (cdr compression) + '()))))))) + (rename-file temporary-destination + nar-destination)))) + compressions) (call-with-output-file narinfo-location (lambda (port) @@ -257,7 +330,18 @@ (assq-ref output 'size) (vector->list (assq-ref output 'references)) - `((lzip ,(stat:size (stat nar-location #f)))) + (map + (lambda (compression-details) + (let* ((compression + (if (pair? compression-details) + (car compression-details) + compression-details)) + (file (string-append + publish-directory "/" + "nar/" (symbol->string compression) "/" + (basename output-filename)))) + (list compression (stat:size (stat file #f))))) + compressions) #:system (datastore-find-derivation-system datastore drv-name) @@ -276,18 +360,16 @@ (raise-exception exn)) (lambda () (post-publish-hook publish-directory - narinfo-filename - nar-filename)) + narinfo-filename)) #:unwind? #t)) - (cons narinfo-filename - nar-filename))))) + narinfo-filename)))) (let* ((build-details (datastore-find-build datastore build-id)) (drv-name (assq-ref build-details 'derivation-name)) - (narinfos-and-nars + (narinfos (append (if publish-referenced-derivation-source-files? (process-referenced-derivation-source-files drv-name) @@ -297,23 +379,22 @@ (process-output drv-name output)) (datastore-list-build-outputs datastore build-id))))) (when (and combined-post-publish-hook - (not (null? narinfos-and-nars))) + (not (null? narinfos))) (with-exception-handler (lambda (exn) ;; Rollback narinfo creation, to make this more ;; transactional (for-each - (match-lambda - ((narinfo-filename . _) - (delete-file - (string-append - narinfo-directory "/" narinfo-filename)))) - narinfos-and-nars) + (lambda + (narinfo-filename) + (delete-file + (string-append + narinfo-directory "/" narinfo-filename))) + narinfos) (raise-exception exn)) (lambda () - (combined-post-publish-hook publish-directory - narinfos-and-nars)) + (combined-post-publish-hook publish-directory narinfos)) #:unwind? #t))))) (define* (build-success-s3-publish-hook diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index c6c2d59..0acd62a 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -64,8 +64,6 @@ read-derivation-from-file* non-blocking-port - ensure-non-blocking-store-connection - with-store/non-blocking substitute-derivation read-derivation-through-substitutes @@ -395,23 +393,6 @@ (fcntl port F_SETFL (logior O_NONBLOCK flags))) port)) -(define (ensure-non-blocking-store-connection store) - "Mark the file descriptor that backs STORE, a <store-connection>, as -O_NONBLOCK." - (match (store-connection-socket store) - ((? file-port? port) - (non-blocking-port port)) - (_ #f))) - -(define-syntax-rule (with-store/non-blocking store exp ...) - "Like 'with-store', bind STORE to a connection to the store, but ensure that -said connection is non-blocking (O_NONBLOCK). Evaluate EXP... in that -context." - (with-store store - (ensure-non-blocking-store-connection store) - (let () - exp ...))) - (define* (substitute-derivation store derivation-name #:key substitute-urls) diff --git a/guix-build-coordinator/utils/fibers.scm b/guix-build-coordinator/utils/fibers.scm index 82697a5..293429c 100644 --- a/guix-build-coordinator/utils/fibers.scm +++ b/guix-build-coordinator/utils/fibers.scm @@ -1,4 +1,5 @@ (define-module (guix-build-coordinator utils fibers) + #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 atomic) #:use-module (ice-9 threads) @@ -15,6 +16,7 @@ #:select (set-thread-name)) #:use-module (guix-build-coordinator utils) #:export (make-worker-thread-channel + %worker-thread-default-timeout call-with-worker-thread worker-thread-timeout-error? @@ -26,8 +28,13 @@ letpar& + port-timeout-error? + port-read-timeout-error? + port-write-timeout-error? with-fibers-timeout - with-fibers-port-timeouts) + with-fibers-port-timeouts + + make-queueing-channel) #:replace (retry-on-error)) (define %worker-thread-args @@ -202,8 +209,11 @@ arguments of the worker thread procedure." (define worker-thread-timeout-error? (record-predicate &worker-thread-timeout)) +(define %worker-thread-default-timeout + (make-parameter 30)) + (define* (call-with-worker-thread channel proc #:key duration-logger - (timeout 30)) + (timeout (%worker-thread-default-timeout))) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." (let ((args (%worker-thread-args))) @@ -491,8 +501,7 @@ If already in the worker thread, call PROC immediately." ;; timed out overall. (let ((timeout-internal (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) + (* internal-time-units-per-second timeout)))) (let loop ((poll-value (port-poll port mode poll-timeout-ms))) (if (= poll-value 0) @@ -539,3 +548,25 @@ If already in the worker thread, call PROC immediately." (append args (list #:sleep-impl sleep)))) + +(define (make-queueing-channel channel) + (define queue (make-q)) + + (let ((queue-channel (make-channel))) + (spawn-fiber + (lambda () + (while #t + (if (q-empty? queue) + (enq! queue + (perform-operation + (get-operation queue-channel))) + (let ((front (q-front queue))) + (perform-operation + (choice-operation + (wrap-operation (get-operation queue-channel) + (lambda (val) + (enq! queue val))) + (wrap-operation (put-operation channel front) + (lambda _ + (q-pop! queue)))))))))) + queue-channel)) |