diff options
author | Christopher Baines <mail@cbaines.net> | 2021-12-11 10:27:24 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2021-12-12 16:35:38 +0000 |
commit | f9ff69e1c79f024ed188ad51642cca443aedfee2 (patch) | |
tree | 609b37ff8d6fc3d557d339a67ba6641522b0a977 /nar-herder/utils.scm | |
parent | 7e280ca951e8ffa7c86224843075e65266911617 (diff) | |
download | nar-herder-f9ff69e1c79f024ed188ad51642cca443aedfee2.tar nar-herder-f9ff69e1c79f024ed188ad51642cca443aedfee2.tar.gz |
Get most of the functionality sort of working
At least working enough to start trying this out, and finding the
problems.
Diffstat (limited to 'nar-herder/utils.scm')
-rw-r--r-- | nar-herder/utils.scm | 651 |
1 files changed, 651 insertions, 0 deletions
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm new file mode 100644 index 0000000..a0a5171 --- /dev/null +++ b/nar-herder/utils.scm @@ -0,0 +1,651 @@ +;;; Nar Herder +;;; +;;; Copyright © 2021 Christopher Baines <mail@cbaines.net> +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; <http://www.gnu.org/licenses/>. + +(define-module (nar-herder utils) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-19) ; time + #:use-module (ice-9 q) + ;; #:use-module (ice-9 ftw) + ;; #:use-module (ice-9 popen) + #:use-module (ice-9 iconv) + #:use-module (ice-9 match) + #:use-module (ice-9 format) + #:use-module (ice-9 threads) + #:use-module (ice-9 textual-ports) + #:use-module (ice-9 rdelim) + #:use-module (ice-9 binary-ports) + #:use-module (ice-9 exceptions) + #:use-module (rnrs bytevectors) + #:use-module (web uri) + #:use-module (web http) + #:use-module (web client) + #:use-module (web request) + #:use-module (web response) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers conditions) + ;; #:use-module (gcrypt pk-crypto) + ;; #:use-module (gcrypt hash) + ;; #:use-module (gcrypt random) + ;; #:use-module (json) + ;; #:use-module (guix pki) + ;; #:use-module (guix utils) + ;; #:use-module (guix config) + ;; #:use-module (guix store) + ;; #:use-module (guix status) + ;; #:use-module (guix base64) + ;; #:use-module (guix scripts substitute) + #:export (call-with-streaming-http-request + &chunked-input-ended-prematurely + chunked-input-ended-prematurely-error? + make-chunked-input-port* + + make-worker-thread-channel + call-with-worker-thread + + retry-on-error + + create-work-queue + + check-locale!)) + +;; Chunked Responses +(define (read-chunk-header port) + "Read a chunk header from PORT and return the size in bytes of the +upcoming chunk." + (match (read-line port) + ((? eof-object?) + ;; Connection closed prematurely: there's nothing left to read. + (error "chunked input ended prematurely")) + (str + (let ((extension-start (string-index str + (lambda (c) + (or (char=? c #\;) + (char=? c #\return)))))) + (string->number (if extension-start ; unnecessary? + (substring str 0 extension-start) + str) + 16))))) + +(define &chunked-input-ended-prematurely + (make-exception-type '&chunked-input-error-prematurely + &external-error + '())) + +(define make-chunked-input-ended-prematurely-error + (record-constructor &chunked-input-ended-prematurely)) + +(define chunked-input-ended-prematurely-error? + (record-predicate &chunked-input-ended-prematurely)) + +(define* (make-chunked-input-port* port #:key (keep-alive? #f)) + (define (close) + (unless keep-alive? + (close-port port))) + + (define chunk-size 0) ;size of the current chunk + (define remaining 0) ;number of bytes left from the current chunk + (define finished? #f) ;did we get all the chunks? + + (define (read! bv idx to-read) + (define (loop to-read num-read) + (cond ((or finished? (zero? to-read)) + num-read) + ((zero? remaining) ;get a new chunk + (let ((size (read-chunk-header port))) + (set! chunk-size size) + (set! remaining size) + (cond + ((zero? size) + (set! finished? #t) + (get-bytevector-n port 2) ; \r\n follows the last chunk + num-read) + (else + (loop to-read num-read))))) + (else ;read from the current chunk + (let* ((ask-for (min to-read remaining)) + (read (get-bytevector-n! port bv (+ idx num-read) + ask-for))) + (cond + ((eof-object? read) ;premature termination + (raise-exception + (make-chunked-input-ended-prematurely-error))) + (else + (let ((left (- remaining read))) + (set! remaining left) + (when (zero? left) + ;; We're done with this chunk; read CR and LF. + (get-u8 port) (get-u8 port)) + (loop (- to-read read) + (+ num-read read))))))))) + (loop to-read 0)) + + (make-custom-binary-input-port "chunked input port" read! #f #f close)) + +(define* (make-chunked-output-port* port #:key (keep-alive? #f) + (buffering 1200) + report-bytes-sent) + (define heap-allocated-limit + (expt 2 20)) ;; 1MiB + + (define (%put-string s) + (unless (string-null? s) + (let* ((bv (string->bytevector s "ISO-8859-1")) + (length (bytevector-length bv))) + (put-string port (number->string length 16)) + (put-string port "\r\n") + (put-bytevector port bv) + (put-string port "\r\n") + + (when report-bytes-sent + (report-bytes-sent length)) + (let* ((stats (gc-stats)) + (initial-gc-times + (assq-ref stats 'gc-times))) + (when (> (assq-ref stats 'heap-allocated-since-gc) + heap-allocated-limit) + (while (let ((updated-stats (gc-stats))) + (= (assq-ref updated-stats 'gc-times) + initial-gc-times)) + (gc) + (usleep 50))))))) + + (define (%put-char c) + (%put-string (list->string (list c)))) + + (define (flush) #t) + (define (close) + (put-string port "0\r\n\r\n") + (force-output port) + (unless keep-alive? + (close-port port))) + (let ((ret (make-soft-port + (vector %put-char %put-string flush #f close) "w"))) + (setvbuf ret 'block buffering) + ret)) + +(define* (call-with-streaming-http-request uri callback + #:key (headers '()) + (method 'PUT) + report-bytes-sent) + (let* ((port (open-socket-for-uri uri)) + (request + (build-request + uri + #:method method + #:version '(1 . 1) + #:headers `((connection close) + (Transfer-Encoding . "chunked") + (Content-Type . "application/octet-stream") + ,@headers) + #:port port))) + + (set-port-encoding! port "ISO-8859-1") + (setvbuf port 'block (expt 2 13)) + (with-exception-handler + (lambda (exp) + (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp) + (close-port port) + (raise-exception exp)) + (lambda () + (let ((request (write-request request port))) + (let* ((chunked-output-port + (make-chunked-output-port* + port + #:buffering (expt 2 12) + #:keep-alive? #t + #:report-bytes-sent report-bytes-sent))) + + ;; A SIGPIPE will kill Guile, so ignore it + (sigaction SIGPIPE + (lambda (arg) + (simple-format (current-error-port) "warning: SIGPIPE\n"))) + + (set-port-encoding! chunked-output-port "ISO-8859-1") + (callback chunked-output-port) + (close-port chunked-output-port) + + (let ((response (read-response port))) + (let ((body (read-response-body response))) + (close-port port) + (values response + body))))))))) + +(define* (retry-on-error f #:key times delay ignore) + (let loop ((attempt 1)) + (match (with-exception-handler + (lambda (exn) + (when (cond + ((list? ignore) + (any (lambda (test) + (test exn)) + ignore)) + ((procedure? ignore) + (ignore exn)) + (else #f)) + (raise-exception exn)) + + (cons #f exn)) + (lambda () + (call-with-values f + (lambda vals + (cons #t vals)))) + #:unwind? #t) + ((#t . return-values) + (when (> attempt 1) + (simple-format + (current-error-port) + "retry success: ~A\n on attempt ~A of ~A\n" + f + attempt + times)) + (apply values return-values)) + ((#f . exn) + (if (>= attempt times) + (begin + (simple-format + (current-error-port) + "error: ~A:\n ~A,\n giving up after ~A attempts\n" + f + exn + times) + (raise-exception exn)) + (begin + (simple-format + (current-error-port) + "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n" + f + exn + attempt + times + delay) + (sleep delay) + (loop (+ 1 attempt)))))))) + +(define delay-logging-fluid + (make-thread-local-fluid)) +(define delay-logging-depth-fluid + (make-thread-local-fluid 0)) + +(define (log-delay proc duration) + (and=> (fluid-ref delay-logging-fluid) + (lambda (recorder) + (recorder proc duration)))) + +(define* (call-with-delay-logging proc #:key (threshold 1) (args '())) + (let ((start (get-internal-real-time)) + (trace '()) + (root-logger? (eq? #f (fluid-ref delay-logging-fluid)))) + + (define (format-seconds seconds) + (format #f "~4f" seconds)) + + (call-with-values + (lambda () + (with-fluid* delay-logging-depth-fluid + (+ 1 (fluid-ref delay-logging-depth-fluid)) + (lambda () + (if root-logger? + (with-fluid* delay-logging-fluid + (lambda (proc duration) + (set! trace + (cons (list proc + duration + (fluid-ref delay-logging-depth-fluid)) + trace)) + #t) + (lambda () + (apply proc args))) + (apply proc args))))) + (lambda vals + (let ((elapsed-seconds + (/ (- (get-internal-real-time) + start) + internal-time-units-per-second))) + (if (and (> elapsed-seconds threshold) + root-logger?) + (let ((lines + (cons + (simple-format #f "warning: delay of ~A seconds: ~A" + (format-seconds elapsed-seconds) + proc) + (map (match-lambda + ((proc duration depth) + (string-append + (make-string (* 2 depth) #\space) + (simple-format #f "~A: ~A" + (format-seconds duration) + proc)))) + trace)))) + (display (string-append + (string-join lines "\n") + "\n"))) + (unless root-logger? + ((fluid-ref delay-logging-fluid) proc elapsed-seconds)))) + (apply values vals))))) + +(define (call-with-time-logging name thunk) + (let ((start (current-time time-utc))) + (call-with-values + thunk + (lambda vals + (let* ((end (current-time time-utc)) + (elapsed (time-difference end start))) + (display + (format #f + "~a took ~f seconds~%" + name + (+ (time-second elapsed) + (/ (time-nanosecond elapsed) 1e9)))) + (apply values vals)))))) + +(define-syntax-rule (with-time-logging name exp ...) + "Log under NAME the time taken to evaluate EXP." + (call-with-time-logging name (lambda () exp ...))) + +(define* (create-work-queue thread-count-parameter proc + #:key thread-start-delay + (thread-stop-delay + (make-time time-duration 0 0))) + (let ((queue (make-q)) + (queue-mutex (make-mutex)) + (job-available (make-condition-variable)) + (running-job-args (make-hash-table))) + + (define get-thread-count + (cond + ((number? thread-count-parameter) + (const thread-count-parameter)) + ((eq? thread-count-parameter #f) + ;; Run one thread per job + (lambda () + (+ (q-length queue) + (hash-count (lambda (index val) + (list? val)) + running-job-args)))) + (else + thread-count-parameter))) + + (define (process-job . args) + (with-mutex queue-mutex + (enq! queue args) + (start-new-threads-if-necessary (get-thread-count)) + (signal-condition-variable job-available))) + + (define (count-threads) + (with-mutex queue-mutex + (hash-count (const #t) running-job-args))) + + (define (count-jobs) + (with-mutex queue-mutex + (+ (q-length queue) + (hash-count (lambda (index val) + (list? val)) + running-job-args)))) + + (define (list-jobs) + (with-mutex queue-mutex + (append (list-copy + (car queue)) + (hash-fold (lambda (key val result) + (or (and val + (cons val result)) + result)) + '() + running-job-args)))) + + (define (thread-process-job job-args) + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "job raised exception: ~A\n" + job-args)) + (lambda () + (with-throw-handler #t + (lambda () + (apply proc job-args)) + (lambda (key . args) + (simple-format (current-error-port) + "exception when handling job: ~A ~A\n" + key args) + (backtrace)))) + #:unwind? #t)) + + (define (start-thread thread-index) + (define (too-many-threads?) + (let ((running-jobs-count + (hash-count (lambda (index val) + (list? val)) + running-job-args)) + (desired-thread-count (get-thread-count))) + + (>= running-jobs-count + desired-thread-count))) + + (define (thread-idle-for-too-long? last-job-finished-at) + (time>=? + (time-difference (current-time time-monotonic) + last-job-finished-at) + thread-stop-delay)) + + (define (stop-thread) + (hash-remove! running-job-args + thread-index) + (unlock-mutex queue-mutex)) + + (call-with-new-thread + (lambda () + (let loop ((last-job-finished-at (current-time time-monotonic))) + (lock-mutex queue-mutex) + + (if (too-many-threads?) + (stop-thread) + (let ((job-args + (if (q-empty? queue) + ;; #f from wait-condition-variable indicates a timeout + (if (wait-condition-variable + job-available + queue-mutex + (+ 9 (time-second (current-time)))) + ;; Another thread could have taken + ;; the job in the mean time + (if (q-empty? queue) + #f + (deq! queue)) + #f) + (deq! queue)))) + + (if job-args + (begin + (hash-set! running-job-args + thread-index + job-args) + + (unlock-mutex queue-mutex) + (thread-process-job job-args) + + (with-mutex queue-mutex + (hash-set! running-job-args + thread-index + #f)) + + (loop (current-time time-monotonic))) + (if (thread-idle-for-too-long? last-job-finished-at) + (stop-thread) + (begin + (unlock-mutex queue-mutex) + + (loop last-job-finished-at)))))))))) + + + (define start-new-threads-if-necessary + (let ((previous-thread-started-at (make-time time-monotonic 0 0))) + (lambda (desired-count) + (let* ((thread-count + (hash-count (const #t) running-job-args)) + (threads-to-start + (- desired-count thread-count))) + (when (> threads-to-start 0) + (for-each + (lambda (thread-index) + (when (eq? (hash-ref running-job-args + thread-index + 'slot-free) + 'slot-free) + (let* ((now (current-time time-monotonic)) + (elapsed (time-difference now + previous-thread-started-at))) + (when (or (eq? #f thread-start-delay) + (time>=? elapsed thread-start-delay)) + (set! previous-thread-started-at now) + (hash-set! running-job-args + thread-index + #f) + (start-thread thread-index))))) + (iota desired-count))))))) + + (if (procedure? thread-count-parameter) + (call-with-new-thread + (lambda () + (while #t + (sleep 15) + (with-mutex queue-mutex + (let ((idle-threads (hash-count (lambda (index val) + (eq? #f val)) + running-job-args))) + (when (= 0 idle-threads) + (start-new-threads-if-necessary (get-thread-count)))))))) + (start-new-threads-if-necessary (get-thread-count))) + + (values process-job count-jobs count-threads list-jobs))) + +(define (check-locale!) + (with-exception-handler + (lambda (exn) + (display + (simple-format + #f + "exception when calling setlocale: ~A +falling back to en_US.utf8\n" + exn) + (current-error-port)) + + (with-exception-handler + (lambda (exn) + (display + (simple-format + #f + "exception when calling setlocale with en_US.utf8: ~A\n" + exn) + (current-error-port)) + + (exit 1)) + (lambda _ + (setlocale LC_ALL "en_US.utf8")) + #:unwind? #t)) + (lambda _ + (setlocale LC_ALL "")) + #:unwind? #t)) + +(define %worker-thread-args + (make-parameter #f)) + +(define* (make-worker-thread-channel initializer + #:key (parallelism 1) + (delay-logger (lambda _ #f)) + destructor + lifetime + (log-exception? (const #t))) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the worker thread procedure." + (parameterize (((@@ (fibers internal) current-fiber) #f)) + (let ((channel (make-channel))) + (for-each + (lambda _ + (call-with-new-thread + (lambda () + (let init ((args (initializer))) + (parameterize ((%worker-thread-args args)) + (let loop ((current-lifetime lifetime)) + (match (get-message channel) + (((? channel? reply) sent-time (? procedure? proc)) + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay + internal-time-units-per-second)) + (put-message + reply + (let ((start-time (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (list 'worker-thread-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))) + (lambda args + (when (match args + (('%exception exn) + (log-exception? exn)) + (_ #t)) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" args) + (backtrace))))) + #:unwind? #t)))))) + (if (number? current-lifetime) + (unless (< current-lifetime 0) + (loop (if current-lifetime + (- current-lifetime 1) + #f))) + (loop #f)))) + (when destructor + (apply destructor args)) + (init (initializer)))))) + (iota parallelism)) + channel))) + +(define* (call-with-worker-thread channel proc #:key duration-logger) + "Send PROC to the worker thread through CHANNEL. Return the result of PROC. +If already in the worker thread, call PROC immediately." + (let ((args (%worker-thread-args))) + (if args + (apply proc args) + (let ((reply (make-channel))) + (put-message channel (list reply (get-internal-real-time) proc)) + (match (get-message reply) + (('worker-thread-error duration exn) + (when duration-logger + (duration-logger duration)) + (raise-exception exn)) + ((duration . result) + (when duration-logger + (duration-logger duration)) + (apply values result))))))) |