(define-module (guix-build-coordinator utils) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) #:use-module (srfi srfi-60) #:use-module (ice-9 q) #:use-module (ice-9 ftw) #:use-module (ice-9 popen) #:use-module (ice-9 iconv) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (ice-9 suspendable-ports) #:use-module ((ice-9 ports internal) #:select (port-poll)) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (web uri) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (gcrypt pk-crypto) #:use-module (gcrypt hash) #:use-module (gcrypt random) #:use-module (json) #:use-module (prometheus) #:use-module (guix pki) #:use-module (guix utils) #:use-module (guix config) #:use-module (guix store) #:use-module (guix status) #:use-module (guix base64) #:use-module (guix progress) #:use-module (guix derivations) #:use-module ((guix http-client) #:select (http-fetch)) #:use-module (guix serialization) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix scripts substitute) #:export (random-v4-uuid &port-timeout &port-read-timeout &port-write-timeout port-timeout-error? port-read-timeout-error? port-write-timeout-error? with-port-timeouts request-query-parameters call-with-streaming-http-request find-missing-substitutes-for-output has-substiutes-no-cache? read-derivation-from-file* non-blocking-port ensure-non-blocking-store-connection with-store/non-blocking substitute-derivation read-derivation-through-substitutes narinfo-string retry-on-error s3-list-objects s3-cp log-delay call-with-delay-logging with-time-logging create-work-queue create-thread-pool with-timeout reset-timeout throttle get-load-average get-uptime running-on-the-hurd? get-guix-memory-metrics-updater open-socket-for-uri* check-locale!)) (eval-when (eval load compile) (begin (when (defined? 'narinfo-references (resolve-module '(guix narinfo))) ;; This module contains narinfo-references in newer version of Guix (use-modules (guix narinfo))) (when (defined? 'lookup-narinfos (resolve-module '(guix substitutes))) ;; This module may contain lookup-narinfos in newer version of Guix (use-modules (guix substitutes))))) (define (random-v4-uuid) ;; https://tools.ietf.org/html/rfc4122#page-14 ;; ;; The pattern in characters is: 8, 4, 4, 4, 12 ;; The pattern in bytes is: 4, 2, 2, 2, 6 ;; ;; time-low "-" time-mid "-" time-high-and-version "-" ;; clock-seq-and-reserved clock-seq-low "-" node ;; ;; - Set the two most significant bits (bits 6 and 7) of the ;; clock_seq_hi_and_reserved to zero and one, respectively. ;; - Set the four most significant bits (bits 12 through 15) of the ;; time_hi_and_version field to the 4-bit version number from ;; Section 4.1.3. (let* ((bytes 16) (bv (gen-random-bv bytes))) (let ((version 4) (6th-byte (array-ref bv 6)) ; Most significant byte in ; time_hi_and_version (8th-byte (array-ref bv 8))) ; Most significant byte in ; clock_seq_hi_and_reserved (array-set! bv (logior (logand #b00001111 6th-byte) (rotate-bit-field version 4 0 8)) 6) (array-set! bv ;; Set bits 6 and 7 to 0 and 1 respectively (logior #b10000000 (logand #b00111111 8th-byte)) 8)) (let* ((int (bytevector-uint-ref bv 0 (endianness big) bytes)) (hex-string (format #f "~32,'0x" int))) (string-join (fold (lambda (part-length result) (let ((start (string-length (string-join result "")))) (append result (list (substring hex-string start (+ start part-length)))))) '() (list 8 4 4 4 12)) "-")))) (define (request-query-parameters request) (define (parse-query-string query) "Parse and decode the URI query string QUERY and return an alist." (let lp ((lst (map uri-decode (string-split query (char-set #\& #\=))))) (match lst ((key value . rest) (cons (cons key value) (lp rest))) (("") '()) (() '())))) (let ((query (uri-query (request-uri request)))) (if (and query (not (string-null? query))) (map (match-lambda ((name . value) (cons (string->symbol name) value))) (parse-query-string query)) '()))) (define &port-timeout (make-exception-type '&port-timeout &external-error '(port))) (define make-port-timeout-error (record-constructor &port-timeout)) (define port-timeout-error? (record-predicate &port-timeout)) (define &port-read-timeout (make-exception-type '&port-read-timeout &port-timeout '())) (define make-port-read-timeout-error (record-constructor &port-read-timeout)) (define port-read-timeout-error? (record-predicate &port-read-timeout)) (define &port-write-timeout (make-exception-type '&port-write-timeout &port-timeout '())) (define make-port-write-timeout-error (record-constructor &port-write-timeout)) (define port-write-timeout-error? (record-predicate &port-write-timeout)) (define* (with-port-timeouts thunk #:key (timeout (* 120 1000))) ;; When the GC runs, it restarts the poll syscall, but the timeout remains ;; unchanged! When the timeout is longer than the time between the syscall ;; restarting, I think this renders the timeout useless. Therefore, this ;; code uses a short timeout, and repeatedly calls poll while watching the ;; clock to see if it has timed out overall. (define poll-timeout-ms 200) (define (wait port mode) (let ((timeout-internal (+ (get-internal-real-time) (* internal-time-units-per-second (/ timeout 1000))))) (let loop ((poll-value (port-poll port mode poll-timeout-ms))) (if (= poll-value 0) (if (> (get-internal-real-time) timeout-internal) (raise-exception (if (string=? mode "r") (make-port-read-timeout-error port) (make-port-write-timeout-error port))) (loop (port-poll port mode poll-timeout-ms))) poll-value)))) (parameterize ((current-read-waiter (lambda (port) (wait port "r"))) (current-write-waiter (lambda (port) (wait port "w")))) (thunk))) (define* (call-with-streaming-http-request uri content-length callback #:key (headers '()) (method 'PUT) streaming?) (with-port-timeouts (lambda () (let* ((port (open-socket-for-uri* uri)) (request (build-request uri #:method method #:version '(1 . 1) #:headers `((connection close) (content-length . ,content-length) (Content-Type . "application/octet-stream") ;; read-request-body/patch looks for this header (Stream-Body . "true") ,@headers) #:port port))) (set-port-encoding! port "ISO-8859-1") (setvbuf port 'block (expt 2 13)) (with-exception-handler (lambda (exp) (simple-format #t "error: ~A ~A: ~A\n" method (uri-path uri) exp) (close-port port) (raise-exception exp)) (lambda () (let ((request (write-request request port))) (callback port) (force-output port) (let ((response (read-response port))) (if streaming? (values response (response-body-port response)) (let ((body (read-response-body response))) (close-port port) (values response body))))))))))) (define (find-missing-substitutes-for-output store substitute-urls output) (if (valid-path? store output) '() (let ((narinfo (any (lambda (substitute-url) (let ((result (lookup-narinfos substitute-url (list output)))) (if (null? result) #f (first result)))) substitute-urls))) (if narinfo (append-map (lambda (reference) (let ((referenced-output (string-append (%store-prefix) "/" reference))) (if (string=? referenced-output output) '() (find-missing-substitutes-for-output store substitute-urls referenced-output)))) (narinfo-references narinfo)) (list output))))) ;; Work around Guix holding on to broken connections to substitute servers ;; (because of mishandling gnutls errors). (let ((mod (resolve-module '(guix scripts substitute)))) (when (module-variable mod '%max-cached-connections) (module-set! mod '%max-cached-connections 0))) (define (has-substiutes-no-cache? substitute-urls file) (define %narinfo-cache-directory (if (zero? (getuid)) (or (and=> (getenv "XDG_CACHE_HOME") (cut string-append <> "/guix/substitute")) (string-append %state-directory "/substitute/cache")) (string-append (cache-directory #:ensure? #f) "/substitute"))) ;; Because there's no control over the caching of 404 lookups, and I'd ;; rather not reach inside and monkey patch the Guix code, just delete any ;; cache files (let ((hash-part (store-path-hash-part file)) (directories (scandir %narinfo-cache-directory (lambda (s) (= (string-length s) 52))))) (for-each (lambda (directory) (let ((cache-file (string-append %narinfo-cache-directory "/" directory "/" hash-part))) ;; Use monitor to avoid multiple threads trying to delete ;; the same file at the same time (monitor (when (file-exists? cache-file) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: when deleting substitute cache file: ~A\n" exn)) (lambda () (delete-file cache-file)) #:unwind? #t))))) (or directories '()))) (let ((substitute-urls (append-map (lambda (substitute-url) (let ((log-port (open-output-string))) (with-throw-handler #t (lambda () (if (null? ;; I doubt the caching is thread safe, so ;; only make one request at a time. (monitor (parameterize ((current-error-port log-port)) (lookup-narinfos substitute-url (list file))))) '() (list substitute-url))) (lambda (key . args) (simple-format (current-error-port) "exception in has-substiutes-no-cache? (~A) ~A: ~A\n" substitute-url key args) (display (string-append (get-output-string log-port) "\n") (current-error-port)) (close-output-port log-port))))) substitute-urls))) substitute-urls)) (define (non-blocking-port port) "Make PORT non-blocking and return it." (let ((flags (fcntl port F_GETFL))) (when (zero? (logand O_NONBLOCK flags)) (fcntl port F_SETFL (logior O_NONBLOCK flags))) port)) (define (ensure-non-blocking-store-connection store) "Mark the file descriptor that backs STORE, a , as O_NONBLOCK." (match (store-connection-socket store) ((? file-port? port) (non-blocking-port port)) (_ #f))) (define-syntax-rule (with-store/non-blocking store exp ...) "Like 'with-store', bind STORE to a connection to the store, but ensure that said connection is non-blocking (O_NONBLOCK). Evaluate EXP... in that context." (with-store store (ensure-non-blocking-store-connection store) (let () exp ...))) (define* (substitute-derivation store derivation-name #:key substitute-urls) (let ((log-port (open-output-string))) (apply set-build-options store `(,@(if substitute-urls `(#:substitute-urls ,substitute-urls) '()) #:max-silent-time 60 #:timeout ,(* 10 60))) (with-exception-handler (lambda (exn) (let* ((log-string (get-output-string log-port)) (lines (string-split log-string #\newline)) (last-n-lines (if (< 10 (length lines)) (take-right lines 10) lines))) (close-output-port log-port) (simple-format (current-error-port) "exception when substituting derivation: ~A:\n ~A\n" exn (string-join last-n-lines "\n")) (raise-exception exn))) (lambda () (parameterize ((current-build-output-port log-port)) (ensure-path store derivation-name))) #:unwind? #t))) (define read-derivation-from-file* (let ((%derivation-cache (@@ (guix derivations) %derivation-cache))) (lambda (file) (or (and file (hash-ref %derivation-cache file)) (let ((drv ;; read-derivation can call read-derivation-from-file, so to ;; avoid having many open files when reading a derivation with ;; inputs, read it in to a string first. (call-with-input-string ;; Avoid calling scm_i_relativize_path in ;; fport_canonicalize_filename since this leads to lots ;; of readlink calls (with-fluids ((%file-port-name-canonicalization 'none)) (call-with-input-file file get-string-all)) (lambda (port) (set-port-filename! port file) (read-derivation port read-derivation-from-file*))))) (hash-set! %derivation-cache file drv) drv))))) (define (read-derivation-through-substitutes derivation-name substitute-urls) (define %fetch-timeout 5) (define open-connection-for-uri/cached (let ((cache '())) (lambda* (uri #:key fresh? (timeout %fetch-timeout) verify-certificate?) (define host (uri-host uri)) (define scheme (uri-scheme uri)) (define key (list host scheme (uri-port uri))) (and (not (memq scheme '(file #f))) (match (assoc-ref cache key) (#f (let ((socket (open-socket-for-uri* uri #:verify-certificate? verify-certificate?))) (set! cache (alist-cons key socket cache)) socket)) (socket (if (or fresh? (port-closed? socket)) (begin (false-if-exception (close-port socket)) (set! cache (alist-delete key cache)) (open-connection-for-uri/cached uri #:timeout timeout #:verify-certificate? verify-certificate?)) (begin ;; Drain input left from the previous use. (drain-input socket) socket)))))))) (define (derivation-name->bytevector derivation-name) (let ((narinfo (any (lambda (substitute-url) (let ((result (lookup-narinfos substitute-url (list derivation-name) #:open-connection open-connection-for-uri/cached))) (if (null? result) #f (first result)))) substitute-urls))) (if narinfo (let*-values (((uri compression file-size) (narinfo-best-uri narinfo)) ((raw download-size) (http-fetch (first (narinfo-uris narinfo)) #:open-connection open-connection-for-uri/cached #:keep-alive? #t)) ((progress) (let* ((dl-size (or download-size (and (equal? compression "none") (narinfo-size narinfo))))) ;; Keep RAW open upon completion so we can later ;; reuse the underlying connection. Pass the ;; download size so that this procedure won't block ;; reading from RAW. (progress-report-port progress-reporter/silent raw #:close? #f #:download-size dl-size))) ((input pids) ;; NOTE: This 'progress' port of current process will be ;; closed here, while the child process doing the ;; reporting will close it upon exit. (decompressed-port (string->symbol compression) progress))) ;; The decompressor can be an external program, so wait for it to ;; exit (every (compose zero? cdr waitpid) pids) (fold-archive (lambda (file type contents result) (match contents ((port . bytes) (get-bytevector-n port bytes)))) #f input #f)) (error "could not fetch narinfo")))) (define (read-derivation-from-file/custom derivation-name) ;; Try the local store (or (and (file-exists? derivation-name) (with-store/non-blocking store (valid-path? store derivation-name)) (read-derivation-from-file* derivation-name)) ;; Otherwise try the network (call-with-input-bytevector (derivation-name->bytevector derivation-name) (lambda (port) (set-port-filename! port derivation-name) (read-derivation port read-derivation-from-file/custom))))) (read-derivation-from-file/custom derivation-name)) (define* (narinfo-string store-path hash size references compressed-files #:key (nar-path "nar") system derivation public-key private-key) (define (signed-string s) (let* ((hash (bytevector->hash-data (sha256 (string->utf8 s)) #:key-type (key-type public-key)))) (signature-sexp hash private-key public-key))) (define base64-encode-string (compose base64-encode string->utf8)) (define* (store-item->recutils compression file-size) (let ((url (encode-and-join-uri-path `(,@(split-and-decode-uri-path nar-path) ,@(if compression (list (symbol->string compression)) '()) ,(basename store-path))))) (format #f "URL: ~a~%Compression: ~a~%~@[FileSize: ~a~%~]" url (or compression "none") file-size))) (let* ((base-info (format #f "\ StorePath: ~a NarHash: sha256:~a NarSize: ~d References: ~a~%" store-path hash size (string-join references " "))) ;; Do not render a "Deriver" or "System" line if we are rendering ;; info for a derivation. (info (if derivation (format #f "~aSystem: ~a~%Deriver: ~a~%" base-info system (basename derivation)) base-info)) (signature (base64-encode-string (canonical-sexp->string (signed-string info))))) (format #f "~aSignature: 1;~a;~a~%~{~a~}" info (gethostname) signature (map (match-lambda ((compression compressed-size) (store-item->recutils compression compressed-size))) compressed-files)))) (define* (retry-on-error f #:key times delay ignore no-retry error-hook (sleep-impl sleep)) (let loop ((attempt 1)) (match (with-exception-handler (lambda (exn) (if (cond ((list? ignore) (any (lambda (test) (test exn)) ignore)) ((procedure? ignore) (ignore exn)) (else #f)) `(#t . (,exn)) (begin (when (cond ((list? no-retry) (any (lambda (test) (test exn)) no-retry)) ((procedure? no-retry) (no-retry exn)) (else #f)) (raise-exception exn)) (cons #f exn)))) (lambda () (call-with-values f (lambda vals (cons #t vals)))) #:unwind? #t) ((#t . return-values) (when (> attempt 1) (simple-format (current-error-port) "retry success: ~A\n on attempt ~A of ~A\n" f attempt times)) (apply values return-values)) ((#f . exn) (if (>= attempt (- times 1)) (begin (simple-format (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n" f exn attempt times delay) (when error-hook (error-hook attempt exn)) (sleep-impl delay) (simple-format (current-error-port) "running last retry of ~A after ~A failed attempts\n" f attempt) (f)) (begin (simple-format (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n" f exn attempt times delay) (when error-hook (error-hook attempt exn)) (sleep-impl delay) (loop (+ 1 attempt)))))))) (define* (s3-list-objects s3-bucket prefix #:key (command "aws") (command-line-arguments '())) (let ((command `(,command "s3api" ,@command-line-arguments "list-objects" "--bucket" ,s3-bucket "--prefix" ,prefix))) (simple-format #t "running: ~A\n" (string-join command)) (let ((pipe (apply open-pipe* OPEN_READ command))) (let ((output (get-string-all pipe)) (exit-code (status:exit-val (close-pipe pipe)))) (if (zero? exit-code) (if (string-null? output) '() (json-string->scm output)) (raise-exception (make-exception-with-message output))))))) (define* (s3-cp s3-bucket source destination #:key (command "aws") (command-line-arguments '())) (let ((command `(,command "s3" ,@command-line-arguments "cp" ,source ,(simple-format #f "s3://~A/~A" s3-bucket destination)))) (simple-format #t "running: ~A\n" (string-join command)) (let ((exit-code (status:exit-val (apply system* command)))) (unless (zero? exit-code) (raise-exception (make-exception-with-message (simple-format #f "error: command failed (~A): ~A\n" exit-code command)))) #t))) (define delay-logging-fluid (make-thread-local-fluid)) (define delay-logging-depth-fluid (make-thread-local-fluid 0)) (define (log-delay proc duration) (and=> (fluid-ref delay-logging-fluid) (lambda (recorder) (recorder proc duration)))) (define* (call-with-delay-logging proc #:key (threshold 1) (args '())) (let ((start (get-internal-real-time)) (trace '()) (root-logger? (eq? #f (fluid-ref delay-logging-fluid)))) (define (format-seconds seconds) (format #f "~5f" seconds)) (call-with-values (lambda () (with-fluid* delay-logging-depth-fluid (+ 1 (fluid-ref delay-logging-depth-fluid)) (lambda () (if root-logger? (with-fluid* delay-logging-fluid (lambda (proc duration) (set! trace (cons (list proc duration (fluid-ref delay-logging-depth-fluid)) trace)) #t) (lambda () (apply proc args))) (apply proc args))))) (lambda vals (let ((elapsed-seconds (/ (- (get-internal-real-time) start) internal-time-units-per-second))) (if (and (> elapsed-seconds threshold) root-logger?) (let ((lines (cons (simple-format #f "warning: delay of ~A seconds: ~A" (format-seconds elapsed-seconds) proc) (filter-map (match-lambda ((proc duration depth) (if (>= duration 0.001) (string-append (make-string (* 2 depth) #\space) (simple-format #f "~A: ~A" (format-seconds duration) proc)) #f))) trace)))) (display (string-append (string-join lines "\n") "\n"))) (unless root-logger? ((fluid-ref delay-logging-fluid) proc elapsed-seconds)))) (apply values vals))))) (define (call-with-time-logging name thunk) (let ((start (current-time time-utc))) (call-with-values thunk (lambda vals (let* ((end (current-time time-utc)) (elapsed (time-difference end start))) (display (format #f "~a took ~f seconds~%" name (+ (time-second elapsed) (/ (time-nanosecond elapsed) 1e9)))) (apply values vals)))))) (define-syntax-rule (with-time-logging name exp ...) "Log under NAME the time taken to evaluate EXP." (call-with-time-logging name (lambda () exp ...))) (define* (create-work-queue thread-count-parameter proc #:key thread-start-delay (thread-stop-delay (make-time time-duration 0 0)) (name "unnamed") priority= running-jobs-count desired-thread-count))) (define (thread-idle-for-too-long? last-job-finished-at) (time>=? (time-difference (current-time time-monotonic) last-job-finished-at) thread-stop-delay)) (define (stop-thread) (hash-remove! running-job-args thread-index) (unlock-mutex queue-mutex)) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " q t " (number->string thread-index)))) (const #t)) (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex queue-mutex) (if (too-many-threads?) (stop-thread) (let ((job-args (if (q-empty? queue) ;; #f from wait-condition-variable indicates a timeout (if (wait-condition-variable job-available queue-mutex) ;; Another thread could have taken ;; the job in the mean time (if (q-empty? queue) #f (if priority threads-to-start 0) (for-each (lambda (thread-index) (when (eq? (hash-ref running-job-args thread-index 'slot-free) 'slot-free) (let* ((now (current-time time-monotonic)) (elapsed (time-difference now previous-thread-started-at))) (when (or (eq? #f thread-start-delay) (time>=? elapsed thread-start-delay)) (set! previous-thread-started-at now) (hash-set! running-job-args thread-index #f) (start-thread thread-index))))) (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " q t"))) (const #t)) (while #t (sleep 15) (with-mutex queue-mutex (let ((idle-threads (hash-count (lambda (index val) (eq? #f val)) running-job-args))) (when (= 0 idle-threads) (start-new-threads-if-necessary (get-thread-count)))))))) (start-new-threads-if-necessary (get-thread-count))) (values process-job count-jobs count-threads list-jobs))) (define* (create-thread-pool thread-count-parameter get-job proc #:key thread-start-delay (thread-stop-delay (make-time time-duration 0 0)) (name "unnamed")) (let ((pool-mutex (make-mutex)) (job-available (make-condition-variable)) (running-job-args (make-hash-table))) (define get-thread-count (cond ((number? thread-count-parameter) (const thread-count-parameter)) (else (lambda () (with-exception-handler (lambda (exn) (count-threads)) thread-count-parameter #:unwind? #t))))) (define (get-job/safe running-jobs) (with-exception-handler (lambda (exn) #f) (lambda () (get-job running-jobs)) #:unwind? #t)) (define (count-threads) (hash-count (const #t) running-job-args)) (define (list-jobs) (append (hash-fold (lambda (key val result) (or (and val (cons val result)) result)) '() running-job-args))) (define (thread-process-job job-args) (with-exception-handler (lambda (exn) (with-exception-handler (lambda _ #f) (lambda () ;; Logging may raise an exception, so try and just keep going. (display (simple-format #f "~A thread pool, job raised exception ~A: ~A\n" name job-args exn) (current-error-port))) #:unwind? #t)) (lambda () (with-throw-handler #t (lambda () (apply proc job-args)) (lambda (key . args) (simple-format (current-error-port) "~A thread pool, exception when handling job: ~A ~A\n" name key args) (let* ((stack (make-stack #t 3)) (backtrace (call-with-output-string (lambda (port) (display-backtrace stack port) (newline port))))) (display backtrace (current-error-port)))))) #:unwind? #t)) (define (start-thread thread-index) (define (too-many-threads?) (let ((running-jobs-count (hash-count (lambda (index val) (list? val)) running-job-args)) (desired-thread-count (get-thread-count))) (>= running-jobs-count desired-thread-count))) (define (thread-idle-for-too-long? last-job-finished-at) (time>=? (time-difference (current-time time-monotonic) last-job-finished-at) thread-stop-delay)) (define (stop-thread) (hash-remove! running-job-args thread-index) (unlock-mutex pool-mutex)) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " p t " (number->string thread-index)))) (const #t)) (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex pool-mutex) (hash-set! running-job-args thread-index #f) (let ((job-args (or (get-job/safe (list-jobs)) ;; #f from wait-condition-variable indicates a timeout (if (wait-condition-variable job-available pool-mutex) (get-job/safe (list-jobs)) #f)))) (if job-args (begin (hash-set! running-job-args thread-index job-args) (unlock-mutex pool-mutex) (thread-process-job job-args) (loop (current-time time-monotonic))) (if (or (thread-idle-for-too-long? last-job-finished-at) (too-many-threads?)) (stop-thread) (begin (unlock-mutex pool-mutex) (loop last-job-finished-at))))))))) (define start-new-threads-if-necessary (let ((previous-thread-started-at (make-time time-monotonic 0 0))) (lambda (desired-count) (let* ((thread-count (hash-count (const #t) running-job-args)) (threads-to-start (- desired-count thread-count))) (when (> threads-to-start 0) (for-each (lambda (thread-index) (when (eq? (hash-ref running-job-args thread-index 'slot-free) 'slot-free) (let* ((now (current-time time-monotonic)) (elapsed (time-difference now previous-thread-started-at))) (when (or (eq? #f thread-start-delay) (time>=? elapsed thread-start-delay)) (set! previous-thread-started-at now) (hash-set! running-job-args thread-index #f) (start-thread thread-index))))) (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " p t "))) (const #t)) (lock-mutex pool-mutex) (while #t (with-exception-handler (lambda _ ;; Things are going really wrong, we probably can't even ;; log without risking another exception, so just sleep and ;; try again. (sleep 10)) (lambda () (let ((idle-threads (hash-count (lambda (index val) (eq? #f val)) running-job-args))) (when (= 0 idle-threads) (start-new-threads-if-necessary (get-thread-count)))) ;; Wake a worker thread to check if a job is available (signal-condition-variable job-available) (wait-condition-variable job-available pool-mutex)) #:unwind? #t)))) (start-new-threads-if-necessary (get-thread-count))) (values pool-mutex job-available count-threads list-jobs))) ;; copied from (guix scripts substitute) (define-syntax-rule (with-timeout duration handler body ...) "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY again." (begin (sigaction SIGALRM (lambda (signum) (sigaction SIGALRM SIG_DFL) handler)) (alarm duration) (call-with-values (lambda () body ...) (lambda result (alarm 0) (sigaction SIGALRM SIG_DFL) (apply values result))))) (define (reset-timeout duration) (alarm duration)) (define (throttle min-duration thunk) (let ((next-min-runtime 0)) (lambda () (if (> (get-internal-real-time) next-min-runtime) (begin (set! next-min-runtime (+ (get-internal-real-time) (* min-duration internal-time-units-per-second))) (thunk)) #f)))) (define* (get-load-average #:key (period 5)) (if (file-exists? "/proc/loadavg") (let ((line (call-with-input-file "/proc/loadavg" get-line))) (match (string-split line #\space) ((1min 5min 15min _ _) (string->number (cond ((= period 1) 1min) ((= period 5) 5min) ((= period 15) 15min)))))) #f)) (define (get-uptime) (if (file-exists? "/proc/uptime") (let ((line (call-with-input-file "/proc/uptime" get-line))) (match (string-split line #\space) ((uptime _) (string->number uptime)))) #f)) (define (running-on-the-hurd?) (let ((cached-system #f)) (unless cached-system (set! cached-system (utsname:sysname (uname)))) (string=? cached-system "GNU"))) (define (get-guix-memory-metrics-updater registry) (define %memoization-tables (@@ (guix memoization) %memoization-tables)) (define memoization-table-entry-count-metric (make-gauge-metric registry "guix_memoization_table_entry_count" #:labels '(procedure))) (define %derivation-cache (@@ (guix derivations) %derivation-cache)) (define derivation-cache-entry-count-metric (make-gauge-metric registry "guix_derivation_cache_entry_count")) (lambda () (metric-set derivation-cache-entry-count-metric ;; hash-count doesn't work for weak tables? (hash-fold (lambda (k v result) (+ 1 result)) 0 %derivation-cache)) (hash-for-each (lambda (proc table) (metric-set memoization-table-entry-count-metric (hash-count (const #t) table) #:label-values `((procedure . ,(simple-format #f "~A" proc))))) %memoization-tables))) ;; Returns the port as well as the raw socket (define* (open-socket-for-uri* uri #:key (verify-certificate? #t) (non-blocking? #t)) (define tls-wrap (@@ (web client) tls-wrap)) (define https? (eq? 'https (uri-scheme uri))) (define plain-uri (if https? (build-uri 'http #:userinfo (uri-userinfo uri) #:host (uri-host uri) #:port (or (uri-port uri) 443) #:path (uri-path uri) #:query (uri-query uri) #:fragment (uri-fragment uri)) uri)) (let* ((s (open-socket-for-uri plain-uri)) (port (if https? (tls-wrap s (uri-host uri) #:verify-certificate? verify-certificate?) s))) (values port (if non-blocking? ;; Guile/guile-gnutls don't handle the handshake happening on ;; a non blocking socket, so change the behavior here. (non-blocking-port s) s)))) (define (check-locale!) (with-exception-handler (lambda (exn) (display (simple-format #f "exception when calling setlocale: ~A falling back to en_US.utf8\n" exn) (current-error-port)) (with-exception-handler (lambda (exn) (display (simple-format #f "exception when calling setlocale with en_US.utf8: ~A\n" exn) (current-error-port)) (exit 1)) (lambda _ (setlocale LC_ALL "en_US.utf8")) #:unwind? #t)) (lambda _ (setlocale LC_ALL "")) #:unwind? #t))