diff options
author | Christopher Baines <mail@cbaines.net> | 2023-02-01 13:46:24 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2023-02-01 13:46:24 +0000 |
commit | b9fcb0f34bea6fa2a8a712178cde3c528a3e88fe (patch) | |
tree | dbe657051d15bda17d91915276f60d6afa47e5bc | |
parent | 010c926656b64d23e7a7908e0c75713ff17e24e4 (diff) | |
download | nar-herder-b9fcb0f34bea6fa2a8a712178cde3c528a3e88fe.tar nar-herder-b9fcb0f34bea6fa2a8a712178cde3c528a3e88fe.tar.gz |
Cleanup the utils module
-rw-r--r-- | nar-herder/utils.scm | 244 |
1 files changed, 1 insertions, 243 deletions
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index ab42a4a..af78c0a 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -21,8 +21,6 @@ #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) ; time #:use-module (ice-9 q) - ;; #:use-module (ice-9 ftw) - ;; #:use-module (ice-9 popen) #:use-module (ice-9 iconv) #:use-module (ice-9 match) #:use-module (ice-9 format) @@ -42,23 +40,7 @@ #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers conditions) - ;; #:use-module (gcrypt pk-crypto) - ;; #:use-module (gcrypt hash) - ;; #:use-module (gcrypt random) - ;; #:use-module (json) - ;; #:use-module (guix pki) - ;; #:use-module (guix utils) - ;; #:use-module (guix config) - ;; #:use-module (guix store) - ;; #:use-module (guix status) - ;; #:use-module (guix base64) - ;; #:use-module (guix scripts substitute) - #:export (call-with-streaming-http-request - &chunked-input-ended-prematurely - chunked-input-ended-prematurely-error? - make-chunked-input-port* - - make-worker-thread-set + #:export (make-worker-thread-set call-with-worker-thread call-with-time-logging @@ -72,168 +54,6 @@ with-port-timeouts)) -;; Chunked Responses -(define (read-chunk-header port) - "Read a chunk header from PORT and return the size in bytes of the -upcoming chunk." - (match (read-line port) - ((? eof-object?) - ;; Connection closed prematurely: there's nothing left to read. - (error "chunked input ended prematurely")) - (str - (let ((extension-start (string-index str - (lambda (c) - (or (char=? c #\;) - (char=? c #\return)))))) - (string->number (if extension-start ; unnecessary? - (substring str 0 extension-start) - str) - 16))))) - -(define &chunked-input-ended-prematurely - (make-exception-type '&chunked-input-error-prematurely - &external-error - '())) - -(define make-chunked-input-ended-prematurely-error - (record-constructor &chunked-input-ended-prematurely)) - -(define chunked-input-ended-prematurely-error? - (record-predicate &chunked-input-ended-prematurely)) - -(define* (make-chunked-input-port* port #:key (keep-alive? #f)) - (define (close) - (unless keep-alive? - (close-port port))) - - (define chunk-size 0) ;size of the current chunk - (define remaining 0) ;number of bytes left from the current chunk - (define finished? #f) ;did we get all the chunks? - - (define (read! bv idx to-read) - (define (loop to-read num-read) - (cond ((or finished? (zero? to-read)) - num-read) - ((zero? remaining) ;get a new chunk - (let ((size (read-chunk-header port))) - (set! chunk-size size) - (set! remaining size) - (cond - ((zero? size) - (set! finished? #t) - (get-bytevector-n port 2) ; \r\n follows the last chunk - num-read) - (else - (loop to-read num-read))))) - (else ;read from the current chunk - (let* ((ask-for (min to-read remaining)) - (read (get-bytevector-n! port bv (+ idx num-read) - ask-for))) - (cond - ((eof-object? read) ;premature termination - (raise-exception - (make-chunked-input-ended-prematurely-error))) - (else - (let ((left (- remaining read))) - (set! remaining left) - (when (zero? left) - ;; We're done with this chunk; read CR and LF. - (get-u8 port) (get-u8 port)) - (loop (- to-read read) - (+ num-read read))))))))) - (loop to-read 0)) - - (make-custom-binary-input-port "chunked input port" read! #f #f close)) - -(define* (make-chunked-output-port* port #:key (keep-alive? #f) - (buffering 1200) - report-bytes-sent) - (define heap-allocated-limit - (expt 2 20)) ;; 1MiB - - (define (%put-string s) - (unless (string-null? s) - (let* ((bv (string->bytevector s "ISO-8859-1")) - (length (bytevector-length bv))) - (put-string port (number->string length 16)) - (put-string port "\r\n") - (put-bytevector port bv) - (put-string port "\r\n") - - (when report-bytes-sent - (report-bytes-sent length)) - (let* ((stats (gc-stats)) - (initial-gc-times - (assq-ref stats 'gc-times))) - (when (> (assq-ref stats 'heap-allocated-since-gc) - heap-allocated-limit) - (while (let ((updated-stats (gc-stats))) - (= (assq-ref updated-stats 'gc-times) - initial-gc-times)) - (gc) - (usleep 50))))))) - - (define (%put-char c) - (%put-string (list->string (list c)))) - - (define (flush) #t) - (define (close) - (put-string port "0\r\n\r\n") - (force-output port) - (unless keep-alive? - (close-port port))) - (let ((ret (make-soft-port - (vector %put-char %put-string flush #f close) "w"))) - (setvbuf ret 'block buffering) - ret)) - -(define* (call-with-streaming-http-request uri callback - #:key (headers '()) - (method 'PUT) - report-bytes-sent) - (let* ((port (open-socket-for-uri uri)) - (request - (build-request - uri - #:method method - #:version '(1 . 1) - #:headers `((connection close) - (Transfer-Encoding . "chunked") - (Content-Type . "application/octet-stream") - ,@headers) - #:port port))) - - (set-port-encoding! port "ISO-8859-1") - (setvbuf port 'block (expt 2 13)) - (with-exception-handler - (lambda (exp) - (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp) - (close-port port) - (raise-exception exp)) - (lambda () - (let ((request (write-request request port))) - (let* ((chunked-output-port - (make-chunked-output-port* - port - #:buffering (expt 2 12) - #:keep-alive? #t - #:report-bytes-sent report-bytes-sent))) - - ;; A SIGPIPE will kill Guile, so ignore it - (sigaction SIGPIPE - (lambda (arg) - (simple-format (current-error-port) "warning: SIGPIPE\n"))) - - (set-port-encoding! chunked-output-port "ISO-8859-1") - (callback chunked-output-port) - (close-port chunked-output-port) - - (let ((response (read-response port))) - (let ((body (read-response-body response))) - (close-port port) - (values response - body))))))))) - (define* (retry-on-error f #:key times delay ignore) (let loop ((attempt 1)) (match (with-exception-handler @@ -285,68 +105,6 @@ upcoming chunk." (sleep delay) (loop (+ 1 attempt)))))))) -(define delay-logging-fluid - (make-thread-local-fluid)) -(define delay-logging-depth-fluid - (make-thread-local-fluid 0)) - -(define (log-delay proc duration) - (and=> (fluid-ref delay-logging-fluid) - (lambda (recorder) - (recorder proc duration)))) - -(define* (call-with-delay-logging proc #:key (threshold 1) (args '())) - (let ((start (get-internal-real-time)) - (trace '()) - (root-logger? (eq? #f (fluid-ref delay-logging-fluid)))) - - (define (format-seconds seconds) - (format #f "~4f" seconds)) - - (call-with-values - (lambda () - (with-fluid* delay-logging-depth-fluid - (+ 1 (fluid-ref delay-logging-depth-fluid)) - (lambda () - (if root-logger? - (with-fluid* delay-logging-fluid - (lambda (proc duration) - (set! trace - (cons (list proc - duration - (fluid-ref delay-logging-depth-fluid)) - trace)) - #t) - (lambda () - (apply proc args))) - (apply proc args))))) - (lambda vals - (let ((elapsed-seconds - (/ (- (get-internal-real-time) - start) - internal-time-units-per-second))) - (if (and (> elapsed-seconds threshold) - root-logger?) - (let ((lines - (cons - (simple-format #f "warning: delay of ~A seconds: ~A" - (format-seconds elapsed-seconds) - proc) - (map (match-lambda - ((proc duration depth) - (string-append - (make-string (* 2 depth) #\space) - (simple-format #f "~A: ~A" - (format-seconds duration) - proc)))) - trace)))) - (display (string-append - (string-join lines "\n") - "\n"))) - (unless root-logger? - ((fluid-ref delay-logging-fluid) proc elapsed-seconds)))) - (apply values vals))))) - (define (call-with-time-logging name thunk) (let ((start (current-time time-utc))) (call-with-values |