(define-module (guix-build-coordinator utils) #:use-module (srfi srfi-1) #:use-module (srfi srfi-60) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (rnrs bytevectors) #:use-module (web uri) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (gcrypt pk-crypto) #:use-module (gcrypt hash) #:use-module (gcrypt random) #:use-module (fibers) #:use-module (fibers channels) #:use-module (guix pki) #:use-module (guix store) #:use-module (guix status) #:use-module (guix base64) #:export (make-worker-thread-channel call-with-worker-thread random-v4-uuid make-base64-output-port call-with-streaming-http-request make-chunked-input-port* substitute-derivation narinfo-string)) (define %worker-thread-args (make-parameter #f)) (define* (make-worker-thread-channel initializer #:key (parallelism 1)) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the worker thread procedure." (parameterize (((@@ (fibers internal) current-fiber) #f)) (let ((channel (make-channel))) (for-each (lambda _ (let ((args (initializer))) (call-with-new-thread (lambda () (parameterize ((%worker-thread-args args)) (let loop () (match (get-message channel) (((? channel? reply) . (? procedure? proc)) (put-message reply (catch #t (lambda () (apply proc args)) (lambda (key . args) (cons* 'worker-thread-error key args)))))) (loop))))))) (iota parallelism)) channel))) (define (call-with-worker-thread channel proc) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." (let ((args (%worker-thread-args))) (if args (apply proc args) (let ((reply (make-channel))) (put-message channel (cons reply proc)) (match (get-message reply) (('worker-thread-error key args ...) (apply throw key args)) (result result)))))) (define (random-v4-uuid) ;; https://tools.ietf.org/html/rfc4122#page-14 ;; ;; The pattern in characters is: 8, 4, 4, 4, 12 ;; The pattern in bytes is: 4, 2, 2, 2, 6 ;; ;; time-low "-" time-mid "-" time-high-and-version "-" ;; clock-seq-and-reserved clock-seq-low "-" node ;; ;; - Set the two most significant bits (bits 6 and 7) of the ;; clock_seq_hi_and_reserved to zero and one, respectively. ;; - Set the four most significant bits (bits 12 through 15) of the ;; time_hi_and_version field to the 4-bit version number from ;; Section 4.1.3. (let* ((bytes 16) (bv (gen-random-bv bytes))) (let ((version 4) (6th-byte (array-ref bv 6)) ; Most significant byte in ; time_hi_and_version (8th-byte (array-ref bv 8))) ; Most significant byte in ; clock_seq_hi_and_reserved (array-set! bv (logior (logand #b00001111 6th-byte) (rotate-bit-field version 4 0 8)) 6) (array-set! bv ;; Set bits 6 and 7 to 0 and 1 respectively (logior #b10000000 (logand #b00111111 8th-byte)) 8)) (let* ((int (bytevector-uint-ref bv 0 (endianness big) bytes)) (hex-string (format #f "~32,'0x" int))) (string-join (fold (lambda (part-length result) (let ((start (string-length (string-join result "")))) (append result (list (substring hex-string start (+ start part-length)))))) '() (list 8 4 4 4 12)) "-")))) (define (make-base64-output-port port) (define line-length 72) ; must be a multiple of 4 (define buffer-length (let ((bytes-per-line (* 3 (/ 72 4))) (lines-per-buffer 128)) (* bytes-per-line lines-per-buffer))) (define buffer (make-bytevector buffer-length)) (define buffered-bytes 0) (define (write! bv start count) (let ((remaining-buffer-space (- buffer-length buffered-bytes))) (if (<= count remaining-buffer-space) (begin (bytevector-copy! bv start buffer buffered-bytes count) (set! buffered-bytes (+ buffered-bytes count)) count) (begin (bytevector-copy! bv start buffer buffered-bytes remaining-buffer-space) (base64-encode buffer 0 buffer-length line-length #f base64-alphabet port) (set! buffered-bytes 0) remaining-buffer-space)))) (define (close) (base64-encode buffer 0 buffered-bytes line-length #f base64-alphabet port) (set! buffer #f) (set! buffered-bytes 0) #t) (make-custom-binary-output-port "base64-output" write! #f #f close)) ;; Chunked Responses (define (read-chunk-header port) "Read a chunk header from PORT and return the size in bytes of the upcoming chunk." (match (read-line port) ((? eof-object?) ;; Connection closed prematurely: there's nothing left to read. 0) (str (let ((extension-start (string-index str (lambda (c) (or (char=? c #\;) (char=? c #\return)))))) (string->number (if extension-start ; unnecessary? (substring str 0 extension-start) str) 16))))) (define* (make-chunked-input-port* port #:key (keep-alive? #f)) (define (close) (unless keep-alive? (close-port port))) (define chunk-size 0) ;size of the current chunk (define remaining 0) ;number of bytes left from the current chunk (define finished? #f) ;did we get all the chunks? (define (read! bv idx to-read) (define (loop to-read num-read) (cond ((or finished? (zero? to-read)) num-read) ((zero? remaining) ;get a new chunk (let ((size (read-chunk-header port))) (set! chunk-size size) (set! remaining size) (cond ((zero? size) (set! finished? #t) (get-bytevector-n port 2) ; \r\n follows the last chunk num-read) (else (loop to-read num-read))))) (else ;read from the current chunk (let* ((ask-for (min to-read remaining)) (read (get-bytevector-n! port bv (+ idx num-read) ask-for))) (cond ((eof-object? read) ;premature termination (set! finished? #t) num-read) (else (let ((left (- remaining read))) (set! remaining left) (when (zero? left) ;; We're done with this chunk; read CR and LF. (get-u8 port) (get-u8 port)) (loop (- to-read read) (+ num-read read))))))))) (loop to-read 0)) (make-custom-binary-input-port "chunked input port" read! #f #f close)) (define* (call-with-streaming-http-request uri callback #:key (headers '())) (let* ((port (open-socket-for-uri uri)) (request (build-request uri #:method 'PUT #:version '(1 . 1) #:headers `((connection close) (Transfer-Encoding . "chunked") (Content-Type . "application/octet-stream") ,@headers) #:port port))) (let ((request (write-request request port))) (let* ((chunked-output-port (make-chunked-output-port (request-port request) ;; The number of bytes produced when the base64 port flushes ;; it's buffer #:buffering 9343 #:keep-alive? #t)) (base64-output-port (make-base64-output-port chunked-output-port))) (callback base64-output-port) (close-port base64-output-port) (close-port chunked-output-port) (display "\r\n" port) (force-output port)) (let ((response (read-response port))) (let ((body (read-response-body response))) (close-port port) (values response body)))))) (define* (substitute-derivation derivation-name #:key substitute-urls) (catch #t (lambda () (with-store store (set-build-options store #:print-extended-build-trace? #t #:multiplexed-build-output? #t #:substitute-urls substitute-urls) (with-status-report (lambda (event status new) (print-build-event event status new) (match event (('substituter-succeeded substituted-drv) (when (string=? derivation-name substituted-drv) (close-connection store))) (_ #t))) (build-things store (list derivation-name))))) (lambda (key . args) ;; This is a hack, to ignore errors relating to closing the store ;; connection. #f))) (define* (narinfo-string store-path hash size references compressed-files #:key (nar-path "nar") system derivation public-key private-key) (define (signed-string s) (let* ((hash (bytevector->hash-data (sha256 (string->utf8 s)) #:key-type (key-type public-key)))) (signature-sexp hash private-key public-key))) (define base64-encode-string (compose base64-encode string->utf8)) (define* (store-item->recutils compression file-size) (let ((url (encode-and-join-uri-path `(,@(split-and-decode-uri-path nar-path) ,@(if compression (list (symbol->string compression)) '()) ,(basename store-path))))) (format #f "URL: ~a~%Compression: ~a~%~@[FileSize: ~a~%~]" url (or compression "none") file-size))) (let* ((base-info (format #f "\ StorePath: ~a ~{~a~}\ NarHash: sha256:~a NarSize: ~d References: ~a~%" store-path (map (match-lambda ((compression compressed-size) (store-item->recutils compression compressed-size))) compressed-files) hash size (string-join references " "))) ;; Do not render a "Deriver" or "System" line if we are rendering ;; info for a derivation. (info (if derivation (format #f "~aSystem: ~a~%Deriver: ~a~%" base-info system (basename derivation)) base-info)) (signature (base64-encode-string (canonical-sexp->string (signed-string info))))) (format #f "~aSignature: 1;~a;~a~%" info (gethostname) signature)))