diff options
Diffstat (limited to 'guix-build-coordinator/utils.scm')
-rw-r--r-- | guix-build-coordinator/utils.scm | 427 |
1 files changed, 149 insertions, 278 deletions
diff --git a/guix-build-coordinator/utils.scm b/guix-build-coordinator/utils.scm index 74b4539..f4f15dc 100644 --- a/guix-build-coordinator/utils.scm +++ b/guix-build-coordinator/utils.scm @@ -14,10 +14,11 @@ #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) - #:use-module (ice-9 suspendable-ports) - #:use-module ((ice-9 ports internal) #:select (port-poll)) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) + #:use-module (oop goops) + #:use-module (logging logger) + #:use-module (logging port-log) #:use-module (web uri) #:use-module (web http) #:use-module (web client) @@ -39,24 +40,12 @@ #:use-module ((guix http-client) #:select (http-fetch)) #:use-module (guix serialization) - #:use-module ((guix build download) - #:select ((open-connection-for-uri - . guix:open-connection-for-uri))) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix scripts substitute) + #:use-module (guix-build-coordinator utils timeout) #:export (random-v4-uuid - &port-timeout - &port-read-timeout - &port-write-timeout - - port-timeout-error? - port-read-timeout-error? - port-write-timeout-error? - - with-port-timeouts - request-query-parameters call-with-streaming-http-request @@ -66,7 +55,7 @@ read-derivation-from-file* - with-store/non-blocking + non-blocking-port substitute-derivation read-derivation-through-substitutes @@ -86,9 +75,6 @@ create-work-queue create-thread-pool - with-timeout - reset-timeout - throttle get-load-average @@ -96,13 +82,17 @@ running-on-the-hurd? - get-gc-metrics-updater - get-port-metrics-updater get-guix-memory-metrics-updater open-socket-for-uri* - check-locale!)) + check-locale! + + display/safe + simple-format/safe + format/safe + + <custom-port-log>)) (eval-when (eval load compile) (begin @@ -190,74 +180,6 @@ (parse-query-string query)) '()))) -(define &port-timeout - (make-exception-type '&port-timeout - &external-error - '(port))) - -(define make-port-timeout-error - (record-constructor &port-timeout)) - -(define port-timeout-error? - (record-predicate &port-timeout)) - -(define &port-read-timeout - (make-exception-type '&port-read-timeout - &port-timeout - '())) - -(define make-port-read-timeout-error - (record-constructor &port-read-timeout)) - -(define port-read-timeout-error? - (record-predicate &port-read-timeout)) - -(define &port-write-timeout - (make-exception-type '&port-write-timeout - &port-timeout - '())) - -(define make-port-write-timeout-error - (record-constructor &port-write-timeout)) - -(define port-write-timeout-error? - (record-predicate &port-write-timeout)) - -(define* (with-port-timeouts thunk #:key (timeout (* 120 1000))) - - ;; When the GC runs, it restarts the poll syscall, but the timeout remains - ;; unchanged! When the timeout is longer than the time between the syscall - ;; restarting, I think this renders the timeout useless. Therefore, this - ;; code uses a short timeout, and repeatedly calls poll while watching the - ;; clock to see if it has timed out overall. - (define poll-timeout-ms 200) - - (define (wait port mode) - (let ((timeout-internal - (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) - (let loop ((poll-value - (port-poll port mode poll-timeout-ms))) - (if (= poll-value 0) - (if (> (get-internal-real-time) - timeout-internal) - (raise-exception - (if (string=? mode "r") - (make-port-read-timeout-error port) - (make-port-write-timeout-error port))) - (loop (port-poll port mode poll-timeout-ms))) - poll-value)))) - - (parameterize - ((current-read-waiter - (lambda (port) - (wait port "r"))) - (current-write-waiter - (lambda (port) - (wait port "w")))) - (thunk))) - (define* (call-with-streaming-http-request uri content-length callback @@ -284,8 +206,8 @@ (setvbuf port 'block (expt 2 13)) (with-exception-handler (lambda (exp) - (simple-format #t "error: ~A ~A: ~A\n" - method (uri-path uri) exp) + (simple-format/safe #t "error: ~A ~A: ~A\n" + method (uri-path uri) exp) (close-port port) (raise-exception exp)) (lambda () @@ -300,7 +222,8 @@ (let ((body (read-response-body response))) (close-port port) (values response - body))))))))))) + body))))))))) + #:timeout 120)) (define (find-missing-substitutes-for-output store substitute-urls output) (if (valid-path? store output) @@ -358,7 +281,7 @@ (when (file-exists? cache-file) (with-exception-handler (lambda (exn) - (simple-format + (simple-format/safe (current-error-port) "error: when deleting substitute cache file: ~A\n" exn)) @@ -370,7 +293,18 @@ (let ((substitute-urls (append-map (lambda (substitute-url) (let ((log-port (open-output-string))) - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format/safe + (current-error-port) + "exception in has-substiutes-no-cache? (~A): ~A\n" + substitute-url exn) + (display/safe (string-append + (get-output-string log-port) + "\n") + (current-error-port)) + (close-output-port log-port) + (raise-exception exn)) (lambda () (if (null? ;; I doubt the caching is thread safe, so @@ -380,17 +314,7 @@ (lookup-narinfos substitute-url (list file))))) '() - (list substitute-url))) - (lambda (key . args) - (simple-format - (current-error-port) - "exception in has-substiutes-no-cache? (~A) ~A: ~A\n" - substitute-url key args) - (display (string-append - (get-output-string log-port) - "\n") - (current-error-port)) - (close-output-port log-port))))) + (list substitute-url)))))) substitute-urls))) substitute-urls)) @@ -401,23 +325,6 @@ (fcntl port F_SETFL (logior O_NONBLOCK flags))) port)) -(define (ensure-non-blocking-store-connection store) - "Mark the file descriptor that backs STORE, a <store-connection>, as -O_NONBLOCK." - (match (store-connection-socket store) - ((? file-port? port) - (non-blocking-port port)) - (_ #f))) - -(define-syntax-rule (with-store/non-blocking store exp ...) - "Like 'with-store', bind STORE to a connection to the store, but ensure that -said connection is non-blocking (O_NONBLOCK). Evaluate EXP... in that -context." - (with-store store - (ensure-non-blocking-store-connection store) - (let () - exp ...))) - (define* (substitute-derivation store derivation-name #:key substitute-urls) @@ -441,7 +348,7 @@ context." (take-right lines 10) lines))) (close-output-port log-port) - (simple-format + (simple-format/safe (current-error-port) "exception when substituting derivation: ~A:\n ~A\n" exn (string-join last-n-lines "\n")) @@ -451,27 +358,23 @@ context." (ensure-path store derivation-name))) #:unwind? #t))) -(define read-derivation-from-file* - (let ((%derivation-cache - (@@ (guix derivations) %derivation-cache))) - (lambda (file) - (or (and file (hash-ref %derivation-cache file)) - (let ((drv - ;; read-derivation can call read-derivation-from-file, so to - ;; avoid having many open files when reading a derivation with - ;; inputs, read it in to a string first. - (call-with-input-string - ;; Avoid calling scm_i_relativize_path in - ;; fport_canonicalize_filename since this leads to lots - ;; of readlink calls - (with-fluids ((%file-port-name-canonicalization 'none)) - (call-with-input-file file - get-string-all)) - (lambda (port) - (set-port-filename! port file) - (read-derivation port read-derivation-from-file*))))) - (hash-set! %derivation-cache file drv) - drv))))) +(define* (read-derivation-from-file* file #:optional (drv-hash (make-hash-table))) + (or (and file (hash-ref drv-hash file)) + (let ((drv + ;; read-derivation can call read-derivation-from-file, so to + ;; avoid having many open files when reading a derivation with + ;; inputs, read it in to a string first. + (call-with-input-string + (call-with-input-file file + get-string-all) + (lambda (port) + (set-port-filename! port file) + (read-derivation port (lambda (file) + (read-derivation-from-file* + file + drv-hash))))))) + (hash-set! drv-hash file drv) + drv))) (define (read-derivation-through-substitutes derivation-name substitute-urls) @@ -489,10 +392,9 @@ context." (match (assoc-ref cache key) (#f (let ((socket - (guix:open-connection-for-uri + (open-socket-for-uri* uri - #:verify-certificate? verify-certificate? - #:timeout timeout))) + #:verify-certificate? verify-certificate?))) (set! cache (alist-cons key socket cache)) socket)) (socket @@ -593,9 +495,9 @@ context." (define* (store-item->recutils compression file-size) (let ((url (encode-and-join-uri-path `(,@(split-and-decode-uri-path nar-path) - ,@(if compression - (list (symbol->string compression)) - '()) + ,@(if (eq? compression 'none) + '() + (list (symbol->string compression))) ,(basename store-path))))) (format #f "URL: ~a~%Compression: ~a~%~@[FileSize: ~a~%~]" url @@ -633,7 +535,7 @@ References: ~a~%" compressed-files)))) (define* (retry-on-error f #:key times delay ignore no-retry error-hook - sleep-impl) + (sleep-impl sleep)) (let loop ((attempt 1)) (match (with-exception-handler (lambda (exn) @@ -665,7 +567,7 @@ References: ~a~%" #:unwind? #t) ((#t . return-values) (when (> attempt 1) - (simple-format + (simple-format/safe (current-error-port) "retry success: ~A\n on attempt ~A of ~A\n" f @@ -676,7 +578,7 @@ References: ~a~%" (if (>= attempt (- times 1)) (begin - (simple-format + (simple-format/safe (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n" f @@ -687,14 +589,14 @@ References: ~a~%" (when error-hook (error-hook attempt exn)) (sleep-impl delay) - (simple-format + (simple-format/safe (current-error-port) "running last retry of ~A after ~A failed attempts\n" f attempt) (f)) (begin - (simple-format + (simple-format/safe (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n" f @@ -922,27 +824,29 @@ References: ~a~%" (define (thread-process-job job-args) (with-exception-handler (lambda (exn) - (simple-format (current-error-port) - "~A work queue, job raised exception ~A: ~A\n" - name job-args exn)) + (simple-format/safe + (current-error-port) + "~A work queue, job raised exception ~A: ~A\n" + name job-args exn)) (lambda () - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + (simple-format/safe + (current-error-port) + "~A work queue, exception when handling job: ~A\n" + name exn) + (let* ((stack (make-stack #t 3)) + (backtrace + (call-with-output-string + (lambda (port) + (display-backtrace stack port) + (newline port))))) + (display/safe + backtrace + (current-error-port))) + (raise-exception exn)) (lambda () - (apply proc job-args)) - (lambda (key . args) - (simple-format - (current-error-port) - "~A work queue, exception when handling job: ~A ~A\n" - name key args) - (let* ((stack (make-stack #t 3)) - (backtrace - (call-with-output-string - (lambda (port) - (display-backtrace stack port) - (newline port))))) - (display - backtrace - (current-error-port)))))) + (apply proc job-args)))) #:unwind? #t)) (define (start-thread thread-index) @@ -1112,36 +1016,29 @@ References: ~a~%" (define (thread-process-job job-args) (with-exception-handler (lambda (exn) - (with-exception-handler - (lambda _ - #f) - (lambda () - ;; Logging may raise an exception, so try and just keep going. - (display - (simple-format - #f - "~A thread pool, job raised exception ~A: ~A\n" - name job-args exn) - (current-error-port))) - #:unwind? #t)) + (simple-format/safe + (current-error-port) + "~A thread pool, job raised exception ~A: ~A\n" + name job-args exn)) (lambda () - (with-throw-handler #t - (lambda () - (apply proc job-args)) - (lambda (key . args) - (simple-format + (with-exception-handler + (lambda (exn) + (simple-format/safe (current-error-port) - "~A thread pool, exception when handling job: ~A ~A\n" - name key args) + "~A thread pool, exception when handling job: ~A\n" + name exn) (let* ((stack (make-stack #t 3)) (backtrace (call-with-output-string (lambda (port) (display-backtrace stack port) (newline port))))) - (display + (display/safe backtrace - (current-error-port)))))) + (current-error-port))) + (raise-exception exn)) + (lambda () + (apply proc job-args)))) #:unwind? #t)) (define (start-thread thread-index) @@ -1270,27 +1167,6 @@ References: ~a~%" (values pool-mutex job-available count-threads list-jobs))) -;; copied from (guix scripts substitute) -(define-syntax-rule (with-timeout duration handler body ...) - "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY -again." - (begin - (sigaction SIGALRM - (lambda (signum) - (sigaction SIGALRM SIG_DFL) - handler)) - (alarm duration) - (call-with-values - (lambda () - body ...) - (lambda result - (alarm 0) - (sigaction SIGALRM SIG_DFL) - (apply values result))))) - -(define (reset-timeout duration) - (alarm duration)) - (define (throttle min-duration thunk) (let ((next-min-runtime 0)) (lambda () @@ -1329,55 +1205,6 @@ again." (set! cached-system (utsname:sysname (uname)))) (string=? cached-system "GNU"))) -(define (get-gc-metrics-updater registry) - (define metrics - `((gc-time-taken - . ,(make-gauge-metric registry "guile_gc_time_taken")) - (heap-size - . ,(make-gauge-metric registry "guile_heap_size")) - (heap-free-size - . ,(make-gauge-metric registry "guile_heap_free_size")) - (heap-total-allocated - . ,(make-gauge-metric registry "guile_heap_total_allocated")) - (heap-allocated-since-gc - . ,(make-gauge-metric registry "guile_allocated_since_gc")) - (protected-objects - . ,(make-gauge-metric registry "guile_gc_protected_objects")) - (gc-times - . ,(make-gauge-metric registry "guile_gc_times")))) - - (lambda () - (let ((stats (gc-stats))) - (for-each - (match-lambda - ((name . metric) - (let ((value (assq-ref stats name))) - (metric-set metric value)))) - metrics)))) - -(define (get-port-metrics-updater registry) - (let ((ports-metric - (make-gauge-metric registry "guile_ports_total")) - (fds-metric - (make-gauge-metric registry "file_descriptors_total"))) - (lambda () - (let ((count 0)) - (port-for-each - (lambda _ - (set! count (+ 1 count)))) - - (metric-set ports-metric count)) - - (metric-set - fds-metric - (length - ;; In theory 'scandir' cannot return #f, but in practice, - ;; we've seen it before. - (or (scandir "/proc/self/fd" - (lambda (file) - (not (member file '("." ".."))))) - '())))))) - (define (get-guix-memory-metrics-updater registry) (define %memoization-tables (@@ (guix memoization) %memoization-tables)) @@ -1451,22 +1278,18 @@ again." (define (check-locale!) (with-exception-handler (lambda (exn) - (display - (simple-format - #f - "exception when calling setlocale: ~A + (simple-format/safe + (current-error-port) + "exception when calling setlocale: ~A falling back to en_US.utf8\n" - exn) - (current-error-port)) + exn) (with-exception-handler (lambda (exn) - (display - (simple-format - #f - "exception when calling setlocale with en_US.utf8: ~A\n" - exn) - (current-error-port)) + (simple-format/safe + (current-error-port) + "exception when calling setlocale with en_US.utf8: ~A\n" + exn) (exit 1)) (lambda _ @@ -1475,3 +1298,51 @@ falling back to en_US.utf8\n" (lambda _ (setlocale LC_ALL "")) #:unwind? #t)) + +(define* (display/safe obj #:optional (port (current-output-port))) + ;; Try to avoid the dreaded conversion to port encoding failed error #62590 + (put-bytevector + port + (string->utf8 + (call-with-output-string + (lambda (port) + (display obj port))))) + (force-output port)) + +(define (simple-format/safe port s . args) + (let ((str (apply simple-format #f s args))) + (if (eq? #f port) + str + (display/safe + str + (if (eq? #t port) + (current-output-port) + port))))) + +(define (format/safe port s . args) + (let ((str (apply format #f s args))) + (if (eq? #f port) + str + (display/safe + str + (if (eq? #t port) + (current-output-port) + port))))) + +(define-class <custom-port-log> (<log-handler>) + (port #:init-value #f #:accessor port #:init-keyword #:port)) + +(define-method (emit-log (self <custom-port-log>) str) + (when (port self) + (put-bytevector (port self) + (string->utf8 str)) + ;; Even though the port is line buffered, writing to it with + ;; put-bytevector doesn't cause the buffer to be flushed. + (force-output (port self)))) + +(define-method (flush-log (self <custom-port-log>)) + (and=> (port self) force-output)) + +(define-method (close-log! (self <custom-port-log>)) + (and=> (port self) close-port) + (set! (port self) #f)) |