(define-module (guix-build-coordinator utils) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) #:use-module (srfi srfi-60) #:use-module (ice-9 q) #:use-module (ice-9 ftw) #:use-module (ice-9 popen) #:use-module (ice-9 iconv) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (ice-9 suspendable-ports) #:use-module ((ice-9 ports internal) #:select (port-poll)) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (web uri) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (gcrypt pk-crypto) #:use-module (gcrypt hash) #:use-module (gcrypt random) #:use-module (json) #:use-module (prometheus) #:use-module (guix pki) #:use-module (guix utils) #:use-module (guix config) #:use-module (guix store) #:use-module (guix status) #:use-module (guix base64) #:use-module (guix progress) #:use-module (guix derivations) #:use-module ((guix http-client) #:select (http-fetch)) #:use-module (guix serialization) #:use-module ((guix build download) #:select ((open-connection-for-uri . guix:open-connection-for-uri))) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (guix scripts substitute) #:export (random-v4-uuid make-base64-output-port use-gc-protection? with-gc-protection with-port-timeouts set-store-connection-timeout request-query-parameters call-with-streaming-http-request &chunked-input-ended-prematurely chunked-input-ended-prematurely-error? make-chunked-input-port* find-missing-substitutes-for-output has-substiutes-no-cache? substitute-derivation read-derivation-through-substitutes narinfo-string retry-on-error s3-list-objects s3-cp log-delay call-with-delay-logging with-time-logging create-work-queue create-thread-pool with-timeout reset-timeout throttle get-load-average running-on-the-hurd? get-gc-metrics-updater get-guix-memory-metrics-updater check-locale! core-guile-sleep)) (eval-when (eval load compile) (begin (when (defined? 'narinfo-references (resolve-module '(guix narinfo))) ;; This module contains narinfo-references in newer version of Guix (use-modules (guix narinfo))) (when (defined? 'lookup-narinfos (resolve-module '(guix substitutes))) ;; This module may contain lookup-narinfos in newer version of Guix (use-modules (guix substitutes))))) (define (random-v4-uuid) ;; https://tools.ietf.org/html/rfc4122#page-14 ;; ;; The pattern in characters is: 8, 4, 4, 4, 12 ;; The pattern in bytes is: 4, 2, 2, 2, 6 ;; ;; time-low "-" time-mid "-" time-high-and-version "-" ;; clock-seq-and-reserved clock-seq-low "-" node ;; ;; - Set the two most significant bits (bits 6 and 7) of the ;; clock_seq_hi_and_reserved to zero and one, respectively. ;; - Set the four most significant bits (bits 12 through 15) of the ;; time_hi_and_version field to the 4-bit version number from ;; Section 4.1.3. (let* ((bytes 16) (bv (gen-random-bv bytes))) (let ((version 4) (6th-byte (array-ref bv 6)) ; Most significant byte in ; time_hi_and_version (8th-byte (array-ref bv 8))) ; Most significant byte in ; clock_seq_hi_and_reserved (array-set! bv (logior (logand #b00001111 6th-byte) (rotate-bit-field version 4 0 8)) 6) (array-set! bv ;; Set bits 6 and 7 to 0 and 1 respectively (logior #b10000000 (logand #b00111111 8th-byte)) 8)) (let* ((int (bytevector-uint-ref bv 0 (endianness big) bytes)) (hex-string (format #f "~32,'0x" int))) (string-join (fold (lambda (part-length result) (let ((start (string-length (string-join result "")))) (append result (list (substring hex-string start (+ start part-length)))))) '() (list 8 4 4 4 12)) "-")))) (define (make-base64-output-port port) (define line-length 72) ; must be a multiple of 4 (define buffer-length (let ((bytes-per-line (* 3 (/ 72 4))) (lines-per-buffer 65536)) (* bytes-per-line lines-per-buffer))) (define buffer (make-bytevector buffer-length)) (define buffered-bytes 0) (define (write! bv start count) (let ((remaining-buffer-space (- buffer-length buffered-bytes))) (if (<= count remaining-buffer-space) (begin (bytevector-copy! bv start buffer buffered-bytes count) (set! buffered-bytes (+ buffered-bytes count)) count) (begin (bytevector-copy! bv start buffer buffered-bytes remaining-buffer-space) (base64-encode buffer 0 buffer-length line-length #f base64-alphabet port) (set! buffered-bytes 0) remaining-buffer-space)))) (define (close) (base64-encode buffer 0 buffered-bytes line-length #f base64-alphabet port) (set! buffer #f) (set! buffered-bytes 0) #t) (make-custom-binary-output-port "base64-output" write! #f #f close)) (define &chunked-input-ended-prematurely (make-exception-type '&chunked-input-error-prematurely &external-error '())) (define make-chunked-input-ended-prematurely-error (record-constructor &chunked-input-ended-prematurely)) (define chunked-input-ended-prematurely-error? (record-predicate &chunked-input-ended-prematurely)) ;; Chunked Responses (define (read-chunk-header port) "Read a chunk header from PORT and return the size in bytes of the upcoming chunk." (match (read-line port) ((? eof-object?) ;; Connection closed prematurely: there's nothing left to read. (raise-exception (make-chunked-input-ended-prematurely-error))) (str (let ((extension-start (string-index str (lambda (c) (or (char=? c #\;) (char=? c #\return)))))) (string->number (if extension-start ; unnecessary? (substring str 0 extension-start) str) 16))))) (define* (make-chunked-input-port* port #:key (keep-alive? #f)) (define (close) (unless keep-alive? (close-port port))) (define chunk-size 0) ;size of the current chunk (define remaining 0) ;number of bytes left from the current chunk (define finished? #f) ;did we get all the chunks? (define (read! bv idx to-read) (define (loop to-read num-read) (cond ((or finished? (zero? to-read)) num-read) ((zero? remaining) ;get a new chunk (let ((size (read-chunk-header port))) (set! chunk-size size) (set! remaining size) (cond ((zero? size) (set! finished? #t) (get-bytevector-n port 2) ; \r\n follows the last chunk num-read) (else (loop to-read num-read))))) (else ;read from the current chunk (let* ((ask-for (min to-read remaining)) (read (get-bytevector-n! port bv (+ idx num-read) ask-for))) (cond ((eof-object? read) ;premature termination (raise-exception (make-chunked-input-ended-prematurely-error))) (else (let ((left (- remaining read))) (set! remaining left) (when (zero? left) ;; We're done with this chunk; read CR and LF. (get-u8 port) (get-u8 port)) (loop (- to-read read) (+ num-read read))))))))) (loop to-read 0)) (make-custom-binary-input-port "chunked input port" read! #f #f close)) (define (request-query-parameters request) (define (parse-query-string query) "Parse and decode the URI query string QUERY and return an alist." (let lp ((lst (map uri-decode (string-split query (char-set #\& #\=))))) (match lst ((key value . rest) (cons (cons key value) (lp rest))) (("") '()) (() '())))) (let ((query (uri-query (request-uri request)))) (if (and query (not (string-null? query))) (map (match-lambda ((name . value) (cons (string->symbol name) value))) (parse-query-string query)) '()))) (define use-gc-protection? (make-parameter #t)) (define (with-gc-protection thunk) (if (use-gc-protection?) (dynamic-wind gc-disable thunk gc-enable) (thunk))) (define &port-timeout (make-exception-type '&port-timeout &external-error '())) (define make-port-timeout-error (record-constructor &port-timeout)) (define port-timeout-error? (record-predicate &port-timeout)) (define* (with-port-timeouts thunk #:key (timeout (* 120 1000))) (parameterize ((current-read-waiter (lambda (port) (when (= (port-poll port "r" timeout) 0) (raise-exception (make-port-timeout-error))))) (current-write-waiter (lambda (port) (when (= (port-poll port "w" timeout) 0) (raise-exception (make-port-timeout-error)))))) (thunk))) (define* (set-store-connection-timeout store #:key (timeout 120)) (define raw-port (store-connection-socket store)) (when (and (defined? 'SO_RCVTIMEO) (not (running-on-the-hurd?))) ;; This is only supported on Guile 3.0.9 and later (setsockopt raw-port SOL_SOCKET SO_RCVTIMEO `(,timeout . 0)) (setsockopt raw-port SOL_SOCKET SO_SNDTIMEO `(,timeout . 0)))) (define* (make-chunked-output-port* port #:key (keep-alive? #f) (buffering 1200) report-bytes-sent) (define heap-allocated-limit (expt 2 20)) ;; 1MiB (define (%put-string s) (unless (string-null? s) (let* ((bv (string->bytevector s "ISO-8859-1")) (length (bytevector-length bv))) (with-gc-protection (lambda () (put-string port (number->string length 16)) (put-string port "\r\n") (put-bytevector port bv) (put-string port "\r\n"))) (when report-bytes-sent (report-bytes-sent length)) (let* ((stats (gc-stats)) (initial-gc-times (assq-ref stats 'gc-times))) (when (> (assq-ref stats 'heap-allocated-since-gc) heap-allocated-limit) (while (let ((updated-stats (gc-stats))) (= (assq-ref updated-stats 'gc-times) initial-gc-times)) (gc) (usleep 50))))))) (define (%put-char c) (%put-string (list->string (list c)))) (define (flush) #t) (define (close) (with-gc-protection (lambda () (put-string port "0\r\n\r\n") (force-output port) (unless keep-alive? (close-port port))))) (let ((ret (make-soft-port (vector %put-char %put-string flush #f close) "w"))) (setvbuf ret 'block buffering) ret)) (define* (call-with-streaming-http-request uri callback #:key (headers '()) (method 'PUT) report-bytes-sent) (with-port-timeouts (lambda () (let* ((port (open-socket-for-uri uri)) (request (build-request uri #:method method #:version '(1 . 1) #:headers `((connection close) (Transfer-Encoding . "chunked") (Content-Type . "application/octet-stream") ,@headers) #:port port))) (set-port-encoding! port "ISO-8859-1") (setvbuf port 'block (expt 2 13)) (with-exception-handler (lambda (exp) (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp) (close-port port) (raise-exception exp)) (lambda () (let ((request (write-request request port))) (let* ((chunked-output-port (make-chunked-output-port* port #:buffering (expt 2 12) #:keep-alive? #t #:report-bytes-sent report-bytes-sent))) (set-port-encoding! chunked-output-port "ISO-8859-1") (callback chunked-output-port) (close-port chunked-output-port) (with-gc-protection (lambda () (let ((response (read-response port))) (let ((body (read-response-body response))) (close-port port) (values response body))))))))))))) (define (find-missing-substitutes-for-output store substitute-urls output) (if (valid-path? store output) '() (let ((narinfo (any (lambda (substitute-url) (let ((result (lookup-narinfos substitute-url (list output)))) (if (null? result) #f (first result)))) substitute-urls))) (if narinfo (append-map (lambda (reference) (let ((referenced-output (string-append (%store-prefix) "/" reference))) (if (string=? referenced-output output) '() (find-missing-substitutes-for-output store substitute-urls referenced-output)))) (narinfo-references narinfo)) (list output))))) ;; Work around Guix holding on to broken connections to substitute servers ;; (because of mishandling gnutls errors). (let ((mod (resolve-module '(guix scripts substitute)))) (when (module-variable mod '%max-cached-connections) (module-set! mod '%max-cached-connections 0))) (define (has-substiutes-no-cache? substitute-urls file) (define %narinfo-cache-directory (if (zero? (getuid)) (or (and=> (getenv "XDG_CACHE_HOME") (cut string-append <> "/guix/substitute")) (string-append %state-directory "/substitute/cache")) (string-append (cache-directory #:ensure? #f) "/substitute"))) ;; Because there's no control over the caching of 404 lookups, and I'd ;; rather not reach inside and monkey patch the Guix code, just delete any ;; cache files (let ((hash-part (store-path-hash-part file)) (directories (scandir %narinfo-cache-directory (lambda (s) (= (string-length s) 52))))) (for-each (lambda (directory) (let ((cache-file (string-append %narinfo-cache-directory "/" directory "/" hash-part))) ;; Use monitor to avoid multiple threads trying to delete ;; the same file at the same time (monitor (when (file-exists? cache-file) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: when deleting substitute cache file: ~A\n" exn)) (lambda () (delete-file cache-file)) #:unwind? #t))))) (or directories '()))) (let ((substitute-urls (append-map (lambda (substitute-url) (let ((log-port (open-output-string))) (with-throw-handler #t (lambda () (if (null? ;; I doubt the caching is thread safe, so ;; only make one request at a time. (monitor (parameterize ((current-error-port log-port)) (lookup-narinfos substitute-url (list file))))) '() (list substitute-url))) (lambda (key . args) (simple-format (current-error-port) "exception in has-substiutes-no-cache? (~A) ~A: ~A\n" substitute-url key args) (display (string-append (get-output-string log-port) "\n") (current-error-port)) (close-output-port log-port))))) substitute-urls))) substitute-urls)) (define* (substitute-derivation derivation-name #:key substitute-urls) (let ((log-port (open-output-string))) (catch #t (lambda () (if (defined? 'ensure-path (resolve-module '(guix store))) (with-store store (apply set-build-options store `(,@(if substitute-urls `(#:substitute-urls ,substitute-urls) '()) #:max-silent-time 60 #:timeout ,(* 10 60))) (parameterize ((current-build-output-port log-port)) (with-port-timeouts (lambda () (set-store-connection-timeout store) (ensure-path store derivation-name)) #:timeout (* 120 1000)))) (with-store store (apply set-build-options store `(#:print-extended-build-trace? #t #:multiplexed-build-output? #t ,@(if substitute-urls `(#:substitute-urls ,substitute-urls) '()))) (with-status-report (lambda (event status new) (print-build-event event status new) (match event (('substituter-succeeded substituted-drv) (when (string=? derivation-name substituted-drv) (close-connection store))) (_ #t))) (build-things store (list derivation-name)))))) (lambda (key . args) (let ((log-string (get-output-string log-port))) (close-output-port log-port) ;; This is a hack, to ignore errors relating to closing the store ;; connection. (if (with-store store (valid-path? store derivation-name)) #t (begin (simple-format (current-error-port) "exception when substituting derivation: ~A ~A:\n ~A\n" key args log-string) (error (simple-format #f "could not substitute ~A\n" derivation-name))))))))) (define %derivation-cache (make-doubly-weak-hash-table)) (define (read-derivation-through-substitutes derivation-name substitute-urls) (define %fetch-timeout 5) (define open-connection-for-uri/cached (let ((cache '())) (lambda* (uri #:key fresh? (timeout %fetch-timeout) verify-certificate?) (define host (uri-host uri)) (define scheme (uri-scheme uri)) (define key (list host scheme (uri-port uri))) (and (not (memq scheme '(file #f))) (match (assoc-ref cache key) (#f (let ((socket (guix:open-connection-for-uri uri #:verify-certificate? verify-certificate? #:timeout timeout))) (set! cache (alist-cons key socket cache)) socket)) (socket (if (or fresh? (port-closed? socket)) (begin (false-if-exception (close-port socket)) (set! cache (alist-delete key cache)) (open-connection-for-uri/cached uri #:timeout timeout #:verify-certificate? verify-certificate?)) (begin ;; Drain input left from the previous use. (drain-input socket) socket)))))))) (define (derivation-name->bytevector derivation-name) (let ((narinfo (any (lambda (substitute-url) (let ((result (lookup-narinfos substitute-url (list derivation-name) #:open-connection open-connection-for-uri/cached))) (if (null? result) #f (first result)))) substitute-urls))) (if narinfo (let*-values (((uri compression file-size) (narinfo-best-uri narinfo)) ((raw download-size) (http-fetch (first (narinfo-uris narinfo)) #:open-connection open-connection-for-uri/cached #:keep-alive? #t)) ((progress) (let* ((dl-size (or download-size (and (equal? compression "none") (narinfo-size narinfo))))) ;; Keep RAW open upon completion so we can later ;; reuse the underlying connection. Pass the ;; download size so that this procedure won't block ;; reading from RAW. (progress-report-port progress-reporter/silent raw #:close? #f #:download-size dl-size))) ((input pids) ;; NOTE: This 'progress' port of current process will be ;; closed here, while the child process doing the ;; reporting will close it upon exit. (decompressed-port (string->symbol compression) progress))) ;; The decompressor can be an external program, so wait for it to ;; exit (every (compose zero? cdr waitpid) pids) (fold-archive (lambda (file type contents result) (match contents ((port . bytes) (get-bytevector-n port bytes)))) #f input #f)) (error "could not fetch narinfo")))) (with-store store (define (read-derivation-from-file* derivation-name) (or (hash-ref %derivation-cache derivation-name) ;; Try the local store (and (file-exists? derivation-name) (valid-path? store derivation-name) (read-derivation-from-file derivation-name)) ;; Otherwise try the network (let ((drv (call-with-input-bytevector (derivation-name->bytevector derivation-name) (lambda (port) (set-port-filename! port derivation-name) (read-derivation port read-derivation-from-file*))))) (hash-set! %derivation-cache derivation-name drv) drv))) (read-derivation-from-file* derivation-name))) (define* (narinfo-string store-path hash size references compressed-files #:key (nar-path "nar") system derivation public-key private-key) (define (signed-string s) (let* ((hash (bytevector->hash-data (sha256 (string->utf8 s)) #:key-type (key-type public-key)))) (signature-sexp hash private-key public-key))) (define base64-encode-string (compose base64-encode string->utf8)) (define* (store-item->recutils compression file-size) (let ((url (encode-and-join-uri-path `(,@(split-and-decode-uri-path nar-path) ,@(if compression (list (symbol->string compression)) '()) ,(basename store-path))))) (format #f "URL: ~a~%Compression: ~a~%~@[FileSize: ~a~%~]" url (or compression "none") file-size))) (let* ((base-info (format #f "\ StorePath: ~a NarHash: sha256:~a NarSize: ~d References: ~a~%" store-path hash size (string-join references " "))) ;; Do not render a "Deriver" or "System" line if we are rendering ;; info for a derivation. (info (if derivation (format #f "~aSystem: ~a~%Deriver: ~a~%" base-info system (basename derivation)) base-info)) (signature (base64-encode-string (canonical-sexp->string (signed-string info))))) (format #f "~aSignature: 1;~a;~a~%~{~a~}" info (gethostname) signature (map (match-lambda ((compression compressed-size) (store-item->recutils compression compressed-size))) compressed-files)))) (define* (retry-on-error f #:key times delay ignore error-hook) (let loop ((attempt 1)) (match (with-exception-handler (lambda (exn) (when (cond ((list? ignore) (any (lambda (test) (test exn)) ignore)) ((procedure? ignore) (ignore exn)) (else #f)) (raise-exception exn)) (cons #f exn)) (lambda () (call-with-values f (lambda vals (cons #t vals)))) #:unwind? #t) ((#t . return-values) (when (> attempt 1) (simple-format (current-error-port) "retry success: ~A\n on attempt ~A of ~A\n" f attempt times)) (apply values return-values)) ((#f . exn) (if (>= attempt (- times 1)) (begin (simple-format (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n" f exn attempt times delay) (when error-hook (error-hook attempt exn)) (sleep delay) (simple-format (current-error-port) "running last retry of ~A after ~A failed attempts\n" f attempt) (f)) (begin (simple-format (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n" f exn attempt times delay) (when error-hook (error-hook attempt exn)) (sleep delay) (loop (+ 1 attempt)))))))) (define* (s3-list-objects s3-bucket prefix #:key (command "aws") (command-line-arguments '())) (let ((command `(,command "s3api" ,@command-line-arguments "list-objects" "--bucket" ,s3-bucket "--prefix" ,prefix))) (simple-format #t "running: ~A\n" (string-join command)) (let ((pipe (apply open-pipe* OPEN_READ command))) (let ((output (get-string-all pipe)) (exit-code (status:exit-val (close-pipe pipe)))) (if (zero? exit-code) (if (string-null? output) '() (json-string->scm output)) (raise-exception (make-exception-with-message output))))))) (define* (s3-cp s3-bucket source destination #:key (command "aws") (command-line-arguments '())) (let ((command `(,command "s3" ,@command-line-arguments "cp" ,source ,(simple-format #f "s3://~A/~A" s3-bucket destination)))) (simple-format #t "running: ~A\n" (string-join command)) (let ((exit-code (status:exit-val (apply system* command)))) (unless (zero? exit-code) (raise-exception (make-exception-with-message (simple-format #f "error: command failed (~A): ~A\n" exit-code command)))) #t))) (define delay-logging-fluid (make-thread-local-fluid)) (define delay-logging-depth-fluid (make-thread-local-fluid 0)) (define (log-delay proc duration) (and=> (fluid-ref delay-logging-fluid) (lambda (recorder) (recorder proc duration)))) (define* (call-with-delay-logging proc #:key (threshold 1) (args '())) (let ((start (get-internal-real-time)) (trace '()) (root-logger? (eq? #f (fluid-ref delay-logging-fluid)))) (define (format-seconds seconds) (format #f "~5f" seconds)) (call-with-values (lambda () (with-fluid* delay-logging-depth-fluid (+ 1 (fluid-ref delay-logging-depth-fluid)) (lambda () (if root-logger? (with-fluid* delay-logging-fluid (lambda (proc duration) (set! trace (cons (list proc duration (fluid-ref delay-logging-depth-fluid)) trace)) #t) (lambda () (apply proc args))) (apply proc args))))) (lambda vals (let ((elapsed-seconds (/ (- (get-internal-real-time) start) internal-time-units-per-second))) (if (and (> elapsed-seconds threshold) root-logger?) (let ((lines (cons (simple-format #f "warning: delay of ~A seconds: ~A" (format-seconds elapsed-seconds) proc) (filter-map (match-lambda ((proc duration depth) (if (>= duration 0.001) (string-append (make-string (* 2 depth) #\space) (simple-format #f "~A: ~A" (format-seconds duration) proc)) #f))) trace)))) (display (string-append (string-join lines "\n") "\n"))) (unless root-logger? ((fluid-ref delay-logging-fluid) proc elapsed-seconds)))) (apply values vals))))) (define (call-with-time-logging name thunk) (let ((start (current-time time-utc))) (call-with-values thunk (lambda vals (let* ((end (current-time time-utc)) (elapsed (time-difference end start))) (display (format #f "~a took ~f seconds~%" name (+ (time-second elapsed) (/ (time-nanosecond elapsed) 1e9)))) (apply values vals)))))) (define-syntax-rule (with-time-logging name exp ...) "Log under NAME the time taken to evaluate EXP." (call-with-time-logging name (lambda () exp ...))) (define* (create-work-queue thread-count-parameter proc #:key thread-start-delay (thread-stop-delay (make-time time-duration 0 0)) (name "unnamed") priority= running-jobs-count desired-thread-count))) (define (thread-idle-for-too-long? last-job-finished-at) (time>=? (time-difference (current-time time-monotonic) last-job-finished-at) thread-stop-delay)) (define (stop-thread) (hash-remove! running-job-args thread-index) (unlock-mutex queue-mutex)) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append "gbc " name " q t " (number->string thread-index)))) (const #t)) (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex queue-mutex) (if (too-many-threads?) (stop-thread) (let ((job-args (if (q-empty? queue) ;; #f from wait-condition-variable indicates a timeout (if (wait-condition-variable job-available queue-mutex (+ 9 (time-second (current-time)))) ;; Another thread could have taken ;; the job in the mean time (if (q-empty? queue) #f (if priority threads-to-start 0) (for-each (lambda (thread-index) (when (eq? (hash-ref running-job-args thread-index 'slot-free) 'slot-free) (let* ((now (current-time time-monotonic)) (elapsed (time-difference now previous-thread-started-at))) (when (or (eq? #f thread-start-delay) (time>=? elapsed thread-start-delay)) (set! previous-thread-started-at now) (hash-set! running-job-args thread-index #f) (start-thread thread-index))))) (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () (while #t (sleep 15) (with-mutex queue-mutex (let ((idle-threads (hash-count (lambda (index val) (eq? #f val)) running-job-args))) (when (= 0 idle-threads) (start-new-threads-if-necessary (get-thread-count)))))))) (start-new-threads-if-necessary (get-thread-count))) (values process-job count-jobs count-threads list-jobs))) (define* (create-thread-pool thread-count-parameter get-job proc #:key thread-start-delay (thread-stop-delay (make-time time-duration 0 0)) (name "unnamed")) (let ((pool-mutex (make-mutex)) (job-available (make-condition-variable)) (running-job-args (make-hash-table))) (define get-thread-count (cond ((number? thread-count-parameter) (const thread-count-parameter)) (else thread-count-parameter))) (define (count-threads) (hash-count (const #t) running-job-args)) (define (list-jobs) (append (hash-fold (lambda (key val result) (or (and val (cons val result)) result)) '() running-job-args))) (define (thread-process-job job-args) (with-exception-handler (lambda (exn) (with-exception-handler (lambda _ #f) (lambda () ;; Logging may raise an exception, so try and just keep going. (simple-format (current-error-port) "job raised exception: ~A\n" job-args)) #:unwind? #t)) (lambda () (apply proc job-args)) #:unwind? #t)) (define (start-thread thread-index) (define (too-many-threads?) (let ((running-jobs-count (hash-count (lambda (index val) (list? val)) running-job-args)) (desired-thread-count (get-thread-count))) (>= running-jobs-count desired-thread-count))) (define (thread-idle-for-too-long? last-job-finished-at) (time>=? (time-difference (current-time time-monotonic) last-job-finished-at) thread-stop-delay)) (define (stop-thread) (hash-remove! running-job-args thread-index) (unlock-mutex pool-mutex)) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append "gbc " name " p t " (number->string thread-index)))) (const #t)) (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex pool-mutex) (if (too-many-threads?) (stop-thread) (let* ((running-jobs (list-jobs)) (job-args (or (get-job running-jobs) ;; #f from wait-condition-variable indicates a timeout (if (wait-condition-variable job-available pool-mutex (+ 9 (time-second (current-time)))) (get-job running-jobs) #f)))) (if job-args (begin (hash-set! running-job-args thread-index job-args) (unlock-mutex pool-mutex) (thread-process-job job-args) (with-mutex pool-mutex (hash-set! running-job-args thread-index #f)) (loop (current-time time-monotonic))) (if (thread-idle-for-too-long? last-job-finished-at) (stop-thread) (begin (unlock-mutex pool-mutex) (loop last-job-finished-at)))))))))) (define start-new-threads-if-necessary (let ((previous-thread-started-at (make-time time-monotonic 0 0))) (lambda (desired-count) (let* ((thread-count (hash-count (const #t) running-job-args)) (threads-to-start (- desired-count thread-count))) (when (> threads-to-start 0) (for-each (lambda (thread-index) (when (eq? (hash-ref running-job-args thread-index 'slot-free) 'slot-free) (let* ((now (current-time time-monotonic)) (elapsed (time-difference now previous-thread-started-at))) (when (or (eq? #f thread-start-delay) (time>=? elapsed thread-start-delay)) (set! previous-thread-started-at now) (hash-set! running-job-args thread-index #f) (start-thread thread-index))))) (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () (lock-mutex pool-mutex) (while #t (with-exception-handler (lambda _ ;; Things are going really wrong, we probably can't even ;; log without risking another exception, so just sleep and ;; try again. (sleep 10)) (lambda () (let ((idle-threads (hash-count (lambda (index val) (eq? #f val)) running-job-args))) (let ((thread-info (hash-fold (lambda (k v result) (string-append result (simple-format #f " ~A: ~A" k v))) "" running-job-args))) (unless (string-null? thread-info) (display (string-append (simple-format #f "~A thread pool: " name) thread-info "\n\n")))) (when (= 0 idle-threads) (start-new-threads-if-necessary (get-thread-count)))) (wait-condition-variable job-available pool-mutex (+ 15 (time-second (current-time))))) #:unwind? #t)))) (start-new-threads-if-necessary (get-thread-count))) (values pool-mutex job-available count-threads list-jobs))) ;; copied from (guix scripts substitute) (define-syntax-rule (with-timeout duration handler body ...) "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY again." (begin (sigaction SIGALRM (lambda (signum) (sigaction SIGALRM SIG_DFL) handler)) (alarm duration) (call-with-values (lambda () body ...) (lambda result (alarm 0) (sigaction SIGALRM SIG_DFL) (apply values result))))) (define (reset-timeout duration) (alarm duration)) (define (throttle min-duration thunk) (let ((next-min-runtime 0)) (lambda () (if (> (get-internal-real-time) next-min-runtime) (begin (set! next-min-runtime (+ (get-internal-real-time) (* min-duration internal-time-units-per-second))) (thunk)) #f)))) (define* (get-load-average #:key (period 5)) (if (file-exists? "/proc/loadavg") (let ((line (call-with-input-file "/proc/loadavg" get-line))) (match (string-split line #\space) ((1min 5min 15min _ _) (string->number (cond ((= period 1) 1min) ((= period 5) 5min) ((= period 15) 15min)))))) #f)) (define (running-on-the-hurd?) (let ((cached-system #f)) (unless cached-system (set! cached-system (utsname:sysname (uname)))) (string=? cached-system "GNU"))) (define (get-gc-metrics-updater registry) (define metrics `((gc-time-taken . ,(make-gauge-metric registry "guile_gc_time_taken")) (heap-size . ,(make-gauge-metric registry "guile_heap_size")) (heap-free-size . ,(make-gauge-metric registry "guile_heap_free_size")) (heap-total-allocated . ,(make-gauge-metric registry "guile_heap_total_allocated")) (heap-allocated-since-gc . ,(make-gauge-metric registry "guile_allocated_since_gc")) (protected-objects . ,(make-gauge-metric registry "guile_gc_protected_objects")) (gc-times . ,(make-gauge-metric registry "guile_gc_times")))) (lambda () (let ((stats (gc-stats))) (for-each (match-lambda ((name . metric) (let ((value (assq-ref stats name))) (metric-set metric value)))) metrics)))) (define (get-guix-memory-metrics-updater registry) (define %memoization-tables (@@ (guix memoization) %memoization-tables)) (define memoization-table-entry-count-metric (make-gauge-metric registry "guix_memoization_table_entry_count" #:labels '(procedure))) (define %derivation-cache (@@ (guix derivations) %derivation-cache)) (define derivation-cache-entry-count-metric (make-gauge-metric registry "guix_derivation_cache_entry_count")) (lambda () (metric-set derivation-cache-entry-count-metric ;; hash-count doesn't work for weak tables? (hash-fold (lambda (k v result) (+ 1 result)) 0 %derivation-cache)) (hash-for-each (lambda (proc table) (metric-set memoization-table-entry-count-metric (hash-count (const #t) table) #:label-values `((procedure . ,(simple-format #f "~A" proc))))) %memoization-tables))) (define (check-locale!) (with-exception-handler (lambda (exn) (display (simple-format #f "exception when calling setlocale: ~A falling back to en_US.utf8\n" exn) (current-error-port)) (with-exception-handler (lambda (exn) (display (simple-format #f "exception when calling setlocale with en_US.utf8: ~A\n" exn) (current-error-port)) (exit 1)) (lambda _ (setlocale LC_ALL "en_US.utf8")) #:unwind? #t)) (lambda _ (setlocale LC_ALL "")) #:unwind? #t)) (define core-guile-sleep sleep)