(define-module (guix-build-coordinator utils) #:use-module (srfi srfi-1) #:use-module (srfi srfi-19) #:use-module (srfi srfi-26) #:use-module (srfi srfi-60) #:use-module (ice-9 q) #:use-module (ice-9 ftw) #:use-module (ice-9 popen) #:use-module (ice-9 iconv) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (web uri) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (gcrypt pk-crypto) #:use-module (gcrypt hash) #:use-module (gcrypt random) #:use-module (json) #:use-module (prometheus) #:use-module (guix pki) #:use-module (guix utils) #:use-module (guix config) #:use-module (guix store) #:use-module (guix status) #:use-module (guix base64) #:use-module (guix scripts substitute) #:export (random-v4-uuid make-base64-output-port with-gc-protection request-query-parameters call-with-streaming-http-request make-chunked-input-port* find-missing-substitutes-for-output has-substiutes-no-cache? substitute-derivation narinfo-string retry-on-error s3-list-objects s3-cp with-time-logging create-work-queue with-timeout get-load-average running-on-the-hurd? get-gc-metrics-updater)) (eval-when (eval load compile) (begin (when (defined? 'narinfo-references (resolve-module '(guix narinfo))) ;; This module contains narinfo-references in newer version of Guix (use-modules (guix narinfo))) (when (defined? 'lookup-narinfos (resolve-module '(guix substitutes))) ;; This module may contain lookup-narinfos in newer version of Guix (use-modules (guix substitutes))))) (define (random-v4-uuid) ;; https://tools.ietf.org/html/rfc4122#page-14 ;; ;; The pattern in characters is: 8, 4, 4, 4, 12 ;; The pattern in bytes is: 4, 2, 2, 2, 6 ;; ;; time-low "-" time-mid "-" time-high-and-version "-" ;; clock-seq-and-reserved clock-seq-low "-" node ;; ;; - Set the two most significant bits (bits 6 and 7) of the ;; clock_seq_hi_and_reserved to zero and one, respectively. ;; - Set the four most significant bits (bits 12 through 15) of the ;; time_hi_and_version field to the 4-bit version number from ;; Section 4.1.3. (let* ((bytes 16) (bv (gen-random-bv bytes))) (let ((version 4) (6th-byte (array-ref bv 6)) ; Most significant byte in ; time_hi_and_version (8th-byte (array-ref bv 8))) ; Most significant byte in ; clock_seq_hi_and_reserved (array-set! bv (logior (logand #b00001111 6th-byte) (rotate-bit-field version 4 0 8)) 6) (array-set! bv ;; Set bits 6 and 7 to 0 and 1 respectively (logior #b10000000 (logand #b00111111 8th-byte)) 8)) (let* ((int (bytevector-uint-ref bv 0 (endianness big) bytes)) (hex-string (format #f "~32,'0x" int))) (string-join (fold (lambda (part-length result) (let ((start (string-length (string-join result "")))) (append result (list (substring hex-string start (+ start part-length)))))) '() (list 8 4 4 4 12)) "-")))) (define (make-base64-output-port port) (define line-length 72) ; must be a multiple of 4 (define buffer-length (let ((bytes-per-line (* 3 (/ 72 4))) (lines-per-buffer 65536)) (* bytes-per-line lines-per-buffer))) (define buffer (make-bytevector buffer-length)) (define buffered-bytes 0) (define (write! bv start count) (let ((remaining-buffer-space (- buffer-length buffered-bytes))) (if (<= count remaining-buffer-space) (begin (bytevector-copy! bv start buffer buffered-bytes count) (set! buffered-bytes (+ buffered-bytes count)) count) (begin (bytevector-copy! bv start buffer buffered-bytes remaining-buffer-space) (base64-encode buffer 0 buffer-length line-length #f base64-alphabet port) (set! buffered-bytes 0) remaining-buffer-space)))) (define (close) (base64-encode buffer 0 buffered-bytes line-length #f base64-alphabet port) (set! buffer #f) (set! buffered-bytes 0) #t) (make-custom-binary-output-port "base64-output" write! #f #f close)) ;; Chunked Responses (define (read-chunk-header port) "Read a chunk header from PORT and return the size in bytes of the upcoming chunk." (match (read-line port) ((? eof-object?) ;; Connection closed prematurely: there's nothing left to read. (error "chunked input ended prematurely")) (str (let ((extension-start (string-index str (lambda (c) (or (char=? c #\;) (char=? c #\return)))))) (string->number (if extension-start ; unnecessary? (substring str 0 extension-start) str) 16))))) (define* (make-chunked-input-port* port #:key (keep-alive? #f)) (define (close) (unless keep-alive? (close-port port))) (define chunk-size 0) ;size of the current chunk (define remaining 0) ;number of bytes left from the current chunk (define finished? #f) ;did we get all the chunks? (define (read! bv idx to-read) (define (loop to-read num-read) (cond ((or finished? (zero? to-read)) num-read) ((zero? remaining) ;get a new chunk (let ((size (read-chunk-header port))) (set! chunk-size size) (set! remaining size) (cond ((zero? size) (set! finished? #t) (get-bytevector-n port 2) ; \r\n follows the last chunk num-read) (else (loop to-read num-read))))) (else ;read from the current chunk (let* ((ask-for (min to-read remaining)) (read (get-bytevector-n! port bv (+ idx num-read) ask-for))) (cond ((eof-object? read) ;premature termination (error "chunked input ended prematurely")) (else (let ((left (- remaining read))) (set! remaining left) (when (zero? left) ;; We're done with this chunk; read CR and LF. (get-u8 port) (get-u8 port)) (loop (- to-read read) (+ num-read read))))))))) (loop to-read 0)) (make-custom-binary-input-port "chunked input port" read! #f #f close)) (define (request-query-parameters request) (define (parse-query-string query) "Parse and decode the URI query string QUERY and return an alist." (let lp ((lst (map uri-decode (string-split query (char-set #\& #\=))))) (match lst ((key value . rest) (cons (cons key value) (lp rest))) (("") '()) (() '())))) (let ((query (uri-query (request-uri request)))) (if (and query (not (string-null? query))) (map (match-lambda ((name . value) (cons (string->symbol name) value))) (parse-query-string query)) '()))) (define (with-gc-protection thunk) (dynamic-wind gc-disable thunk gc-enable)) (define* (make-chunked-output-port* port #:key (keep-alive? #f) (buffering 1200) report-bytes-sent) (define heap-allocated-limit (expt 2 20)) ;; 1MiB (define (%put-string s) (unless (string-null? s) (let* ((bv (string->bytevector s "ISO-8859-1")) (length (bytevector-length bv))) (with-gc-protection (lambda () (put-string port (number->string length 16)) (put-string port "\r\n") (put-bytevector port bv) (put-string port "\r\n"))) (when report-bytes-sent (report-bytes-sent length)) (let* ((stats (gc-stats)) (initial-gc-times (assq-ref stats 'gc-times))) (when (> (assq-ref stats 'heap-allocated-since-gc) heap-allocated-limit) (while (let ((updated-stats (gc-stats))) (= (assq-ref updated-stats 'gc-times) initial-gc-times)) (gc) (usleep 50))))))) (define (%put-char c) (%put-string (list->string (list c)))) (define (flush) #t) (define (close) (with-gc-protection (lambda () (put-string port "0\r\n\r\n") (force-output port) (unless keep-alive? (close-port port))))) (let ((ret (make-soft-port (vector %put-char %put-string flush #f close) "w"))) (setvbuf ret 'block buffering) ret)) (define* (call-with-streaming-http-request uri callback #:key (headers '()) report-bytes-sent) (let* ((port (open-socket-for-uri uri)) (request (build-request uri #:method 'PUT #:version '(1 . 1) #:headers `((connection close) (Transfer-Encoding . "chunked") (Content-Type . "application/octet-stream") ,@headers) #:port port))) (set-port-encoding! port "ISO-8859-1") (setvbuf port 'block (expt 2 13)) (with-exception-handler (lambda (exp) (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp) (close-port port) (raise-exception exp)) (lambda () (let ((request (write-request request port))) (let* ((chunked-output-port (make-chunked-output-port* port #:buffering (expt 2 12) #:keep-alive? #t #:report-bytes-sent report-bytes-sent))) ;; A SIGPIPE will kill Guile, so ignore it (sigaction SIGPIPE (lambda (arg) (simple-format (current-error-port) "warning: SIGPIPE\n"))) (set-port-encoding! chunked-output-port "ISO-8859-1") (callback chunked-output-port) (close-port chunked-output-port) (with-gc-protection (lambda () (let ((response (read-response port))) (let ((body (read-response-body response))) (close-port port) (values response body))))))))))) (define (find-missing-substitutes-for-output store substitute-urls output) (if (valid-path? store output) '() (let ((narinfo (any (lambda (substitute-url) (let ((result (lookup-narinfos substitute-url (list output)))) (if (null? result) #f (first result)))) substitute-urls))) (if narinfo (append-map (lambda (reference) (let ((referenced-output (string-append (%store-prefix) "/" reference))) (if (string=? referenced-output output) '() (find-missing-substitutes-for-output store substitute-urls referenced-output)))) (narinfo-references narinfo)) (list output))))) ;; Work around Guix holding on to broken connections to substitute servers ;; (because of mishandling gnutls errors). (let ((mod (resolve-module '(guix scripts substitute)))) (when (module-variable mod '%max-cached-connections) (module-set! mod '%max-cached-connections 0))) (define (has-substiutes-no-cache? substitute-urls file) (define %narinfo-cache-directory (if (zero? (getuid)) (or (and=> (getenv "XDG_CACHE_HOME") (cut string-append <> "/guix/substitute")) (string-append %state-directory "/substitute/cache")) (string-append (cache-directory #:ensure? #f) "/substitute"))) ;; Because there's no control over the caching of 404 lookups, and I'd ;; rather not reach inside and monkey patch the Guix code, just delete any ;; cache files (let ((hash-part (store-path-hash-part file)) (directories (scandir %narinfo-cache-directory (lambda (s) (= (string-length s) 52))))) (for-each (lambda (directory) (let ((cache-file (string-append %narinfo-cache-directory "/" directory "/" hash-part))) ;; Use monitor to avoid multiple threads trying to delete ;; the same file at the same time (monitor (when (file-exists? cache-file) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "error: when deleting substitute cache file: ~A\n" exn)) (lambda () (delete-file cache-file)) #:unwind? #t))))) (or directories '()))) (let ((substitute-urls (append-map (lambda (substitute-url) (let ((log-port (open-output-string))) (with-throw-handler #t (lambda () (if (null? ;; I doubt the caching is thread safe, so ;; only make one request at a time. (monitor (parameterize ((current-error-port log-port)) (lookup-narinfos substitute-url (list file))))) '() (list substitute-url))) (lambda (key . args) (simple-format (current-error-port) "exception in has-substiutes-no-cache? (~A) ~A: ~A\n" substitute-url key args) (display (string-append (get-output-string log-port) "\n") (current-error-port)) (close-output-port log-port))))) substitute-urls))) substitute-urls)) (define* (substitute-derivation derivation-name #:key substitute-urls) (let ((log-port (open-output-string))) (catch #t (lambda () (if (defined? 'ensure-path (resolve-module '(guix store))) (with-store store (apply set-build-options store `(,@(if substitute-urls `(#:substitute-urls ,substitute-urls) '()) #:max-silent-time 60 #:timeout ,(* 10 60))) (parameterize ((current-build-output-port log-port)) (ensure-path store derivation-name))) (with-store store (apply set-build-options store `(#:print-extended-build-trace? #t #:multiplexed-build-output? #t ,@(if substitute-urls `(#:substitute-urls ,substitute-urls) '()))) (with-status-report (lambda (event status new) (print-build-event event status new) (match event (('substituter-succeeded substituted-drv) (when (string=? derivation-name substituted-drv) (close-connection store))) (_ #t))) (build-things store (list derivation-name)))))) (lambda (key . args) (let ((log-string (get-output-string log-port))) (close-output-port log-port) ;; This is a hack, to ignore errors relating to closing the store ;; connection. (if (with-store store (valid-path? store derivation-name)) #t (begin (display log-string) (error (simple-format #f "could not substitute ~A\n" derivation-name))))))))) (define* (narinfo-string store-path hash size references compressed-files #:key (nar-path "nar") system derivation public-key private-key) (define (signed-string s) (let* ((hash (bytevector->hash-data (sha256 (string->utf8 s)) #:key-type (key-type public-key)))) (signature-sexp hash private-key public-key))) (define base64-encode-string (compose base64-encode string->utf8)) (define* (store-item->recutils compression file-size) (let ((url (encode-and-join-uri-path `(,@(split-and-decode-uri-path nar-path) ,@(if compression (list (symbol->string compression)) '()) ,(basename store-path))))) (format #f "URL: ~a~%Compression: ~a~%~@[FileSize: ~a~%~]" url (or compression "none") file-size))) (let* ((base-info (format #f "\ StorePath: ~a ~{~a~}\ NarHash: sha256:~a NarSize: ~d References: ~a~%" store-path (map (match-lambda ((compression compressed-size) (store-item->recutils compression compressed-size))) compressed-files) hash size (string-join references " "))) ;; Do not render a "Deriver" or "System" line if we are rendering ;; info for a derivation. (info (if derivation (format #f "~aSystem: ~a~%Deriver: ~a~%" base-info system (basename derivation)) base-info)) (signature (base64-encode-string (canonical-sexp->string (signed-string info))))) (format #f "~aSignature: 1;~a;~a~%" info (gethostname) signature))) (define* (retry-on-error f #:key times delay ignore) (let loop ((attempt 1)) (match (with-exception-handler (lambda (exn) (when (cond ((list? ignore) (any (lambda (test) (test exn)) ignore)) ((procedure? ignore) (ignore exn)) (else #f)) (raise-exception exn)) (cons #f exn)) (lambda () (cons #t (f))) #:unwind? #t) ((#t . return-value) (when (> attempt 1) (simple-format (current-error-port) "retry success: ~A\n on attempt ~A of ~A\n" f attempt times)) return-value) ((#f . exn) (if (>= attempt times) (begin (simple-format (current-error-port) "error: ~A:\n ~A,\n giving up after ~A attempts\n" f exn times) (raise-exception exn)) (begin (simple-format (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n" f exn attempt times delay) (sleep delay) (loop (+ 1 attempt)))))))) (define* (s3-list-objects s3-bucket prefix #:key (command "aws") (command-line-arguments '())) (let ((command `(,command "s3api" ,@command-line-arguments "list-objects" "--bucket" ,s3-bucket "--prefix" ,prefix))) (simple-format #t "running: ~A\n" (string-join command)) (let ((pipe (apply open-pipe* OPEN_READ command))) (let ((output (get-string-all pipe)) (exit-code (status:exit-val (close-pipe pipe)))) (if (zero? exit-code) (if (string-null? output) '() (json-string->scm output)) (raise-exception (make-exception-with-message output))))))) (define* (s3-cp s3-bucket source destination #:key (command "aws") (command-line-arguments '())) (let ((command `(,command "s3" ,@command-line-arguments "cp" ,source ,(simple-format #f "s3://~A/~A" s3-bucket destination)))) (simple-format #t "running: ~A\n" (string-join command)) (let ((exit-code (status:exit-val (apply system* command)))) (unless (zero? exit-code) (raise-exception (make-exception-with-message (simple-format #f "error: command failed (~A): ~A\n" exit-code command)))) #t))) (define (call-with-time-logging name thunk) (let ((start (current-time time-utc))) (call-with-values thunk (lambda vals (let* ((end (current-time time-utc)) (elapsed (time-difference end start))) (display (format #f "~a took ~f seconds~%" name (+ (time-second elapsed) (/ (time-nanosecond elapsed) 1e9)))) (apply values vals)))))) (define-syntax-rule (with-time-logging name exp ...) "Log under NAME the time taken to evaluate EXP." (call-with-time-logging name (lambda () exp ...))) (define* (create-work-queue thread-count-parameter proc #:key thread-start-delay (thread-stop-delay (make-time time-duration 0 0))) (let ((queue (make-q)) (queue-mutex (make-mutex)) (job-available (make-condition-variable)) (running-job-args (make-hash-table))) (define get-thread-count (cond ((number? thread-count-parameter) (const thread-count-parameter)) ((eq? thread-count-parameter #f) ;; Run one thread per job (lambda () (+ (q-length queue) (hash-count (lambda (index val) (list? val)) running-job-args)))) (else thread-count-parameter))) (define (process-job . args) (with-mutex queue-mutex (enq! queue args) (start-new-threads-if-necessary (get-thread-count)) (signal-condition-variable job-available))) (define (count-threads) (with-mutex queue-mutex (hash-count (const #t) running-job-args))) (define (count-jobs) (with-mutex queue-mutex (+ (q-length queue) (hash-count (lambda (index val) (list? val)) running-job-args)))) (define (list-jobs) (with-mutex queue-mutex (append (list-copy (car queue)) (hash-fold (lambda (key val result) (or (and val (cons val result)) result)) '() running-job-args)))) (define (thread-process-job job-args) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "job raised exception: ~A\n" job-args)) (lambda () (with-throw-handler #t (lambda () (apply proc job-args)) (lambda (key . args) (simple-format (current-error-port) "exception when handling job: ~A ~A\n" key args) (backtrace)))) #:unwind? #t)) (define (start-thread thread-index) (define (too-many-threads?) (let ((running-jobs-count (hash-count (lambda (index val) (list? val)) running-job-args)) (desired-thread-count (get-thread-count))) (>= running-jobs-count desired-thread-count))) (define (thread-idle-for-too-long? last-job-finished-at) (time>=? (time-difference (current-time time-monotonic) last-job-finished-at) thread-stop-delay)) (define (stop-thread) (hash-remove! running-job-args thread-index) (unlock-mutex queue-mutex)) (call-with-new-thread (lambda () (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex queue-mutex) (if (too-many-threads?) (stop-thread) (let ((job-args (if (q-empty? queue) ;; #f from wait-condition-variable indicates a timeout (if (wait-condition-variable job-available queue-mutex (+ 9 (time-second (current-time)))) ;; Another thread could have taken ;; the job in the mean time (if (q-empty? queue) #f (deq! queue)) #f) (deq! queue)))) (if job-args (begin (hash-set! running-job-args thread-index job-args) (unlock-mutex queue-mutex) (thread-process-job job-args) (with-mutex queue-mutex (hash-set! running-job-args thread-index #f)) (loop (current-time time-monotonic))) (if (thread-idle-for-too-long? last-job-finished-at) (stop-thread) (begin (unlock-mutex queue-mutex) (loop last-job-finished-at)))))))))) (define start-new-threads-if-necessary (let ((previous-thread-started-at (make-time time-monotonic 0 0))) (lambda (desired-count) (let* ((thread-count (hash-count (const #t) running-job-args)) (threads-to-start (- desired-count thread-count))) (when (> threads-to-start 0) (for-each (lambda (thread-index) (when (eq? (hash-ref running-job-args thread-index 'slot-free) 'slot-free) (let* ((now (current-time time-monotonic)) (elapsed (time-difference now previous-thread-started-at))) (when (or (eq? #f thread-start-delay) (time>=? elapsed thread-start-delay)) (set! previous-thread-started-at now) (hash-set! running-job-args thread-index #f) (start-thread thread-index))))) (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () (while #t (sleep 15) (with-mutex queue-mutex (let ((idle-threads (hash-count (lambda (index val) (eq? #f val)) running-job-args))) (when (= 0 idle-threads) (start-new-threads-if-necessary (get-thread-count)))))))) (start-new-threads-if-necessary (get-thread-count))) (values process-job count-jobs count-threads list-jobs))) (define %random-state (seed->random-state (+ (ash (cdr (gettimeofday)) 32) (getpid)))) ;; copied from (guix scripts substitute) (define-syntax-rule (with-timeout duration handler body ...) "Run BODY; when DURATION seconds have expired, call HANDLER, and run BODY again." (begin (sigaction SIGALRM (lambda (signum) (sigaction SIGALRM SIG_DFL) handler)) (alarm duration) (call-with-values (lambda () (let try () (catch 'system-error (lambda () body ...) (lambda args ;; Before Guile v2.0.9-39-gfe51c7b, the SIGALRM triggers EINTR ;; because of the bug at ;; . ;; When that happens, try again. Note: SA_RESTART cannot be ;; used because of . (if (= EINTR (system-error-errno args)) (begin ;; Wait a little to avoid bursts. (usleep (random 3000000 %random-state)) (try)) (apply throw args)))))) (lambda result (alarm 0) (sigaction SIGALRM SIG_DFL) (apply values result))))) (define* (get-load-average #:key (period 5)) (if (file-exists? "/proc/loadavg") (let ((line (call-with-input-file "/proc/loadavg" get-line))) (match (string-split line #\space) ((1min 5min 15min _ _) (string->number (cond ((= period 1) 1min) ((= period 5) 5min) ((= period 15) 15min)))))) #f)) (define (running-on-the-hurd?) (let ((cached-system #f)) (unless cached-system (set! cached-system (utsname:sysname (uname)))) (string=? cached-system "GNU"))) (define (get-gc-metrics-updater registry) (define metrics `((gc-time-taken . ,(make-gauge-metric registry "guile_gc_time_taken")) (heap-size . ,(make-gauge-metric registry "guile_heap_size")) (heap-free-size . ,(make-gauge-metric registry "guile_heap_free_size")) (heap-total-allocated . ,(make-gauge-metric registry "guile_heap_total_allocated")) (heap-allocated-since-gc . ,(make-gauge-metric registry "guile_allocated_since_gc")) (protected-objects . ,(make-gauge-metric registry "guile_gc_protected_objects")) (gc-times . ,(make-gauge-metric registry "guile_gc_times")))) (lambda () (let ((stats (gc-stats))) (for-each (match-lambda ((name . metric) (let ((value (assq-ref stats name))) (metric-set metric value)))) metrics))))