;;; Nar Herder ;;; ;;; Copyright © 2021 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (nar-herder utils) #:use-module (srfi srfi-1) #:use-module (srfi srfi-19) ; time #:use-module (ice-9 q) ;; #:use-module (ice-9 ftw) ;; #:use-module (ice-9 popen) #:use-module (ice-9 iconv) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (web uri) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers conditions) ;; #:use-module (gcrypt pk-crypto) ;; #:use-module (gcrypt hash) ;; #:use-module (gcrypt random) ;; #:use-module (json) ;; #:use-module (guix pki) ;; #:use-module (guix utils) ;; #:use-module (guix config) ;; #:use-module (guix store) ;; #:use-module (guix status) ;; #:use-module (guix base64) ;; #:use-module (guix scripts substitute) #:export (call-with-streaming-http-request &chunked-input-ended-prematurely chunked-input-ended-prematurely-error? make-chunked-input-port* make-worker-thread-channel call-with-worker-thread call-with-time-logging with-time-logging retry-on-error create-work-queue check-locale!)) ;; Chunked Responses (define (read-chunk-header port) "Read a chunk header from PORT and return the size in bytes of the upcoming chunk." (match (read-line port) ((? eof-object?) ;; Connection closed prematurely: there's nothing left to read. (error "chunked input ended prematurely")) (str (let ((extension-start (string-index str (lambda (c) (or (char=? c #\;) (char=? c #\return)))))) (string->number (if extension-start ; unnecessary? (substring str 0 extension-start) str) 16))))) (define &chunked-input-ended-prematurely (make-exception-type '&chunked-input-error-prematurely &external-error '())) (define make-chunked-input-ended-prematurely-error (record-constructor &chunked-input-ended-prematurely)) (define chunked-input-ended-prematurely-error? (record-predicate &chunked-input-ended-prematurely)) (define* (make-chunked-input-port* port #:key (keep-alive? #f)) (define (close) (unless keep-alive? (close-port port))) (define chunk-size 0) ;size of the current chunk (define remaining 0) ;number of bytes left from the current chunk (define finished? #f) ;did we get all the chunks? (define (read! bv idx to-read) (define (loop to-read num-read) (cond ((or finished? (zero? to-read)) num-read) ((zero? remaining) ;get a new chunk (let ((size (read-chunk-header port))) (set! chunk-size size) (set! remaining size) (cond ((zero? size) (set! finished? #t) (get-bytevector-n port 2) ; \r\n follows the last chunk num-read) (else (loop to-read num-read))))) (else ;read from the current chunk (let* ((ask-for (min to-read remaining)) (read (get-bytevector-n! port bv (+ idx num-read) ask-for))) (cond ((eof-object? read) ;premature termination (raise-exception (make-chunked-input-ended-prematurely-error))) (else (let ((left (- remaining read))) (set! remaining left) (when (zero? left) ;; We're done with this chunk; read CR and LF. (get-u8 port) (get-u8 port)) (loop (- to-read read) (+ num-read read))))))))) (loop to-read 0)) (make-custom-binary-input-port "chunked input port" read! #f #f close)) (define* (make-chunked-output-port* port #:key (keep-alive? #f) (buffering 1200) report-bytes-sent) (define heap-allocated-limit (expt 2 20)) ;; 1MiB (define (%put-string s) (unless (string-null? s) (let* ((bv (string->bytevector s "ISO-8859-1")) (length (bytevector-length bv))) (put-string port (number->string length 16)) (put-string port "\r\n") (put-bytevector port bv) (put-string port "\r\n") (when report-bytes-sent (report-bytes-sent length)) (let* ((stats (gc-stats)) (initial-gc-times (assq-ref stats 'gc-times))) (when (> (assq-ref stats 'heap-allocated-since-gc) heap-allocated-limit) (while (let ((updated-stats (gc-stats))) (= (assq-ref updated-stats 'gc-times) initial-gc-times)) (gc) (usleep 50))))))) (define (%put-char c) (%put-string (list->string (list c)))) (define (flush) #t) (define (close) (put-string port "0\r\n\r\n") (force-output port) (unless keep-alive? (close-port port))) (let ((ret (make-soft-port (vector %put-char %put-string flush #f close) "w"))) (setvbuf ret 'block buffering) ret)) (define* (call-with-streaming-http-request uri callback #:key (headers '()) (method 'PUT) report-bytes-sent) (let* ((port (open-socket-for-uri uri)) (request (build-request uri #:method method #:version '(1 . 1) #:headers `((connection close) (Transfer-Encoding . "chunked") (Content-Type . "application/octet-stream") ,@headers) #:port port))) (set-port-encoding! port "ISO-8859-1") (setvbuf port 'block (expt 2 13)) (with-exception-handler (lambda (exp) (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp) (close-port port) (raise-exception exp)) (lambda () (let ((request (write-request request port))) (let* ((chunked-output-port (make-chunked-output-port* port #:buffering (expt 2 12) #:keep-alive? #t #:report-bytes-sent report-bytes-sent))) ;; A SIGPIPE will kill Guile, so ignore it (sigaction SIGPIPE (lambda (arg) (simple-format (current-error-port) "warning: SIGPIPE\n"))) (set-port-encoding! chunked-output-port "ISO-8859-1") (callback chunked-output-port) (close-port chunked-output-port) (let ((response (read-response port))) (let ((body (read-response-body response))) (close-port port) (values response body))))))))) (define* (retry-on-error f #:key times delay ignore) (let loop ((attempt 1)) (match (with-exception-handler (lambda (exn) (when (cond ((list? ignore) (any (lambda (test) (test exn)) ignore)) ((procedure? ignore) (ignore exn)) (else #f)) (raise-exception exn)) (cons #f exn)) (lambda () (call-with-values f (lambda vals (cons #t vals)))) #:unwind? #t) ((#t . return-values) (when (> attempt 1) (simple-format (current-error-port) "retry success: ~A\n on attempt ~A of ~A\n" f attempt times)) (apply values return-values)) ((#f . exn) (if (>= attempt times) (begin (simple-format (current-error-port) "error: ~A:\n ~A,\n giving up after ~A attempts\n" f exn times) (raise-exception exn)) (begin (simple-format (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n" f exn attempt times delay) (sleep delay) (loop (+ 1 attempt)))))))) (define delay-logging-fluid (make-thread-local-fluid)) (define delay-logging-depth-fluid (make-thread-local-fluid 0)) (define (log-delay proc duration) (and=> (fluid-ref delay-logging-fluid) (lambda (recorder) (recorder proc duration)))) (define* (call-with-delay-logging proc #:key (threshold 1) (args '())) (let ((start (get-internal-real-time)) (trace '()) (root-logger? (eq? #f (fluid-ref delay-logging-fluid)))) (define (format-seconds seconds) (format #f "~4f" seconds)) (call-with-values (lambda () (with-fluid* delay-logging-depth-fluid (+ 1 (fluid-ref delay-logging-depth-fluid)) (lambda () (if root-logger? (with-fluid* delay-logging-fluid (lambda (proc duration) (set! trace (cons (list proc duration (fluid-ref delay-logging-depth-fluid)) trace)) #t) (lambda () (apply proc args))) (apply proc args))))) (lambda vals (let ((elapsed-seconds (/ (- (get-internal-real-time) start) internal-time-units-per-second))) (if (and (> elapsed-seconds threshold) root-logger?) (let ((lines (cons (simple-format #f "warning: delay of ~A seconds: ~A" (format-seconds elapsed-seconds) proc) (map (match-lambda ((proc duration depth) (string-append (make-string (* 2 depth) #\space) (simple-format #f "~A: ~A" (format-seconds duration) proc)))) trace)))) (display (string-append (string-join lines "\n") "\n"))) (unless root-logger? ((fluid-ref delay-logging-fluid) proc elapsed-seconds)))) (apply values vals))))) (define (call-with-time-logging name thunk) (let ((start (current-time time-utc))) (call-with-values thunk (lambda vals (let* ((end (current-time time-utc)) (elapsed (time-difference end start))) (display (format #f "~a took ~f seconds~%" name (+ (time-second elapsed) (/ (time-nanosecond elapsed) 1e9)))) (apply values vals)))))) (define-syntax-rule (with-time-logging name exp ...) "Log under NAME the time taken to evaluate EXP." (call-with-time-logging name (lambda () exp ...))) (define* (create-work-queue thread-count-parameter proc #:key thread-start-delay (thread-stop-delay (make-time time-duration 0 0))) (let ((queue (make-q)) (queue-mutex (make-mutex)) (job-available (make-condition-variable)) (running-job-args (make-hash-table))) (define get-thread-count (cond ((number? thread-count-parameter) (const thread-count-parameter)) ((eq? thread-count-parameter #f) ;; Run one thread per job (lambda () (+ (q-length queue) (hash-count (lambda (index val) (list? val)) running-job-args)))) (else thread-count-parameter))) (define (process-job . args) (with-mutex queue-mutex (enq! queue args) (start-new-threads-if-necessary (get-thread-count)) (signal-condition-variable job-available))) (define (count-threads) (with-mutex queue-mutex (hash-count (const #t) running-job-args))) (define (count-jobs) (with-mutex queue-mutex (+ (q-length queue) (hash-count (lambda (index val) (list? val)) running-job-args)))) (define (list-jobs) (with-mutex queue-mutex (append (list-copy (car queue)) (hash-fold (lambda (key val result) (or (and val (cons val result)) result)) '() running-job-args)))) (define (thread-process-job job-args) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "job raised exception: ~A\n" job-args)) (lambda () (with-throw-handler #t (lambda () (apply proc job-args)) (lambda (key . args) (simple-format (current-error-port) "exception when handling job: ~A ~A\n" key args) (backtrace)))) #:unwind? #t)) (define (start-thread thread-index) (define (too-many-threads?) (let ((running-jobs-count (hash-count (lambda (index val) (list? val)) running-job-args)) (desired-thread-count (get-thread-count))) (>= running-jobs-count desired-thread-count))) (define (thread-idle-for-too-long? last-job-finished-at) (time>=? (time-difference (current-time time-monotonic) last-job-finished-at) thread-stop-delay)) (define (stop-thread) (hash-remove! running-job-args thread-index) (unlock-mutex queue-mutex)) (call-with-new-thread (lambda () (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex queue-mutex) (if (too-many-threads?) (stop-thread) (let ((job-args (if (q-empty? queue) ;; #f from wait-condition-variable indicates a timeout (if (wait-condition-variable job-available queue-mutex (+ 9 (time-second (current-time)))) ;; Another thread could have taken ;; the job in the mean time (if (q-empty? queue) #f (deq! queue)) #f) (deq! queue)))) (if job-args (begin (hash-set! running-job-args thread-index job-args) (unlock-mutex queue-mutex) (thread-process-job job-args) (with-mutex queue-mutex (hash-set! running-job-args thread-index #f)) (loop (current-time time-monotonic))) (if (thread-idle-for-too-long? last-job-finished-at) (stop-thread) (begin (unlock-mutex queue-mutex) (loop last-job-finished-at)))))))))) (define start-new-threads-if-necessary (let ((previous-thread-started-at (make-time time-monotonic 0 0))) (lambda (desired-count) (let* ((thread-count (hash-count (const #t) running-job-args)) (threads-to-start (- desired-count thread-count))) (when (> threads-to-start 0) (for-each (lambda (thread-index) (when (eq? (hash-ref running-job-args thread-index 'slot-free) 'slot-free) (let* ((now (current-time time-monotonic)) (elapsed (time-difference now previous-thread-started-at))) (when (or (eq? #f thread-start-delay) (time>=? elapsed thread-start-delay)) (set! previous-thread-started-at now) (hash-set! running-job-args thread-index #f) (start-thread thread-index))))) (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () (while #t (sleep 15) (with-mutex queue-mutex (let ((idle-threads (hash-count (lambda (index val) (eq? #f val)) running-job-args))) (when (= 0 idle-threads) (start-new-threads-if-necessary (get-thread-count)))))))) (start-new-threads-if-necessary (get-thread-count))) (values process-job count-jobs count-threads list-jobs))) (define (check-locale!) (with-exception-handler (lambda (exn) (display (simple-format #f "exception when calling setlocale: ~A falling back to en_US.utf8\n" exn) (current-error-port)) (with-exception-handler (lambda (exn) (display (simple-format #f "exception when calling setlocale with en_US.utf8: ~A\n" exn) (current-error-port)) (exit 1)) (lambda _ (setlocale LC_ALL "en_US.utf8")) #:unwind? #t)) (lambda _ (setlocale LC_ALL "")) #:unwind? #t)) (define %worker-thread-args (make-parameter #f)) (define* (make-worker-thread-channel initializer #:key (parallelism 1) (delay-logger (lambda _ #f)) destructor lifetime (log-exception? (const #t))) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the worker thread procedure." (let ((channel (make-channel))) (for-each (lambda _ (call-with-new-thread (lambda () (let init ((args (initializer))) (parameterize ((%worker-thread-args args)) (let loop ((current-lifetime lifetime)) (match (get-message channel) (((? channel? reply) sent-time (? procedure? proc)) (let ((time-delay (- (get-internal-real-time) sent-time))) (delay-logger (/ time-delay internal-time-units-per-second)) (put-message reply (let ((start-time (get-internal-real-time))) (with-exception-handler (lambda (exn) (list 'worker-thread-error (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) exn)) (lambda () (with-throw-handler #t (lambda () (call-with-values (lambda () (apply proc args)) (lambda vals (cons (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) vals)))) (lambda args (when (match args (('%exception exn) (log-exception? exn)) (_ #t)) (simple-format (current-error-port) "worker-thread: exception: ~A\n" args) (backtrace))))) #:unwind? #t)))))) (if (number? current-lifetime) (unless (< current-lifetime 0) (loop (if current-lifetime (- current-lifetime 1) #f))) (loop #f)))) (when destructor (apply destructor args)) (init (initializer)))))) (iota parallelism)) channel)) (define* (call-with-worker-thread channel proc #:key duration-logger) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." (let ((args (%worker-thread-args))) (if args (apply proc args) (let ((reply (make-channel))) (put-message channel (list reply (get-internal-real-time) proc)) (match (get-message reply) (('worker-thread-error duration exn) (when duration-logger (duration-logger duration)) (raise-exception exn)) ((duration . result) (when duration-logger (duration-logger duration)) (apply values result)))))))