aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-02-01 13:46:24 +0000
committerChristopher Baines <mail@cbaines.net>2023-02-01 13:46:24 +0000
commitb9fcb0f34bea6fa2a8a712178cde3c528a3e88fe (patch)
treedbe657051d15bda17d91915276f60d6afa47e5bc
parent010c926656b64d23e7a7908e0c75713ff17e24e4 (diff)
downloadnar-herder-b9fcb0f34bea6fa2a8a712178cde3c528a3e88fe.tar
nar-herder-b9fcb0f34bea6fa2a8a712178cde3c528a3e88fe.tar.gz
Cleanup the utils module
-rw-r--r--nar-herder/utils.scm244
1 files changed, 1 insertions, 243 deletions
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index ab42a4a..af78c0a 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -21,8 +21,6 @@
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-19) ; time
#:use-module (ice-9 q)
- ;; #:use-module (ice-9 ftw)
- ;; #:use-module (ice-9 popen)
#:use-module (ice-9 iconv)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
@@ -42,23 +40,7 @@
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers conditions)
- ;; #:use-module (gcrypt pk-crypto)
- ;; #:use-module (gcrypt hash)
- ;; #:use-module (gcrypt random)
- ;; #:use-module (json)
- ;; #:use-module (guix pki)
- ;; #:use-module (guix utils)
- ;; #:use-module (guix config)
- ;; #:use-module (guix store)
- ;; #:use-module (guix status)
- ;; #:use-module (guix base64)
- ;; #:use-module (guix scripts substitute)
- #:export (call-with-streaming-http-request
- &chunked-input-ended-prematurely
- chunked-input-ended-prematurely-error?
- make-chunked-input-port*
-
- make-worker-thread-set
+ #:export (make-worker-thread-set
call-with-worker-thread
call-with-time-logging
@@ -72,168 +54,6 @@
with-port-timeouts))
-;; Chunked Responses
-(define (read-chunk-header port)
- "Read a chunk header from PORT and return the size in bytes of the
-upcoming chunk."
- (match (read-line port)
- ((? eof-object?)
- ;; Connection closed prematurely: there's nothing left to read.
- (error "chunked input ended prematurely"))
- (str
- (let ((extension-start (string-index str
- (lambda (c)
- (or (char=? c #\;)
- (char=? c #\return))))))
- (string->number (if extension-start ; unnecessary?
- (substring str 0 extension-start)
- str)
- 16)))))
-
-(define &chunked-input-ended-prematurely
- (make-exception-type '&chunked-input-error-prematurely
- &external-error
- '()))
-
-(define make-chunked-input-ended-prematurely-error
- (record-constructor &chunked-input-ended-prematurely))
-
-(define chunked-input-ended-prematurely-error?
- (record-predicate &chunked-input-ended-prematurely))
-
-(define* (make-chunked-input-port* port #:key (keep-alive? #f))
- (define (close)
- (unless keep-alive?
- (close-port port)))
-
- (define chunk-size 0) ;size of the current chunk
- (define remaining 0) ;number of bytes left from the current chunk
- (define finished? #f) ;did we get all the chunks?
-
- (define (read! bv idx to-read)
- (define (loop to-read num-read)
- (cond ((or finished? (zero? to-read))
- num-read)
- ((zero? remaining) ;get a new chunk
- (let ((size (read-chunk-header port)))
- (set! chunk-size size)
- (set! remaining size)
- (cond
- ((zero? size)
- (set! finished? #t)
- (get-bytevector-n port 2) ; \r\n follows the last chunk
- num-read)
- (else
- (loop to-read num-read)))))
- (else ;read from the current chunk
- (let* ((ask-for (min to-read remaining))
- (read (get-bytevector-n! port bv (+ idx num-read)
- ask-for)))
- (cond
- ((eof-object? read) ;premature termination
- (raise-exception
- (make-chunked-input-ended-prematurely-error)))
- (else
- (let ((left (- remaining read)))
- (set! remaining left)
- (when (zero? left)
- ;; We're done with this chunk; read CR and LF.
- (get-u8 port) (get-u8 port))
- (loop (- to-read read)
- (+ num-read read)))))))))
- (loop to-read 0))
-
- (make-custom-binary-input-port "chunked input port" read! #f #f close))
-
-(define* (make-chunked-output-port* port #:key (keep-alive? #f)
- (buffering 1200)
- report-bytes-sent)
- (define heap-allocated-limit
- (expt 2 20)) ;; 1MiB
-
- (define (%put-string s)
- (unless (string-null? s)
- (let* ((bv (string->bytevector s "ISO-8859-1"))
- (length (bytevector-length bv)))
- (put-string port (number->string length 16))
- (put-string port "\r\n")
- (put-bytevector port bv)
- (put-string port "\r\n")
-
- (when report-bytes-sent
- (report-bytes-sent length))
- (let* ((stats (gc-stats))
- (initial-gc-times
- (assq-ref stats 'gc-times)))
- (when (> (assq-ref stats 'heap-allocated-since-gc)
- heap-allocated-limit)
- (while (let ((updated-stats (gc-stats)))
- (= (assq-ref updated-stats 'gc-times)
- initial-gc-times))
- (gc)
- (usleep 50)))))))
-
- (define (%put-char c)
- (%put-string (list->string (list c))))
-
- (define (flush) #t)
- (define (close)
- (put-string port "0\r\n\r\n")
- (force-output port)
- (unless keep-alive?
- (close-port port)))
- (let ((ret (make-soft-port
- (vector %put-char %put-string flush #f close) "w")))
- (setvbuf ret 'block buffering)
- ret))
-
-(define* (call-with-streaming-http-request uri callback
- #:key (headers '())
- (method 'PUT)
- report-bytes-sent)
- (let* ((port (open-socket-for-uri uri))
- (request
- (build-request
- uri
- #:method method
- #:version '(1 . 1)
- #:headers `((connection close)
- (Transfer-Encoding . "chunked")
- (Content-Type . "application/octet-stream")
- ,@headers)
- #:port port)))
-
- (set-port-encoding! port "ISO-8859-1")
- (setvbuf port 'block (expt 2 13))
- (with-exception-handler
- (lambda (exp)
- (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp)
- (close-port port)
- (raise-exception exp))
- (lambda ()
- (let ((request (write-request request port)))
- (let* ((chunked-output-port
- (make-chunked-output-port*
- port
- #:buffering (expt 2 12)
- #:keep-alive? #t
- #:report-bytes-sent report-bytes-sent)))
-
- ;; A SIGPIPE will kill Guile, so ignore it
- (sigaction SIGPIPE
- (lambda (arg)
- (simple-format (current-error-port) "warning: SIGPIPE\n")))
-
- (set-port-encoding! chunked-output-port "ISO-8859-1")
- (callback chunked-output-port)
- (close-port chunked-output-port)
-
- (let ((response (read-response port)))
- (let ((body (read-response-body response)))
- (close-port port)
- (values response
- body)))))))))
-
(define* (retry-on-error f #:key times delay ignore)
(let loop ((attempt 1))
(match (with-exception-handler
@@ -285,68 +105,6 @@ upcoming chunk."
(sleep delay)
(loop (+ 1 attempt))))))))
-(define delay-logging-fluid
- (make-thread-local-fluid))
-(define delay-logging-depth-fluid
- (make-thread-local-fluid 0))
-
-(define (log-delay proc duration)
- (and=> (fluid-ref delay-logging-fluid)
- (lambda (recorder)
- (recorder proc duration))))
-
-(define* (call-with-delay-logging proc #:key (threshold 1) (args '()))
- (let ((start (get-internal-real-time))
- (trace '())
- (root-logger? (eq? #f (fluid-ref delay-logging-fluid))))
-
- (define (format-seconds seconds)
- (format #f "~4f" seconds))
-
- (call-with-values
- (lambda ()
- (with-fluid* delay-logging-depth-fluid
- (+ 1 (fluid-ref delay-logging-depth-fluid))
- (lambda ()
- (if root-logger?
- (with-fluid* delay-logging-fluid
- (lambda (proc duration)
- (set! trace
- (cons (list proc
- duration
- (fluid-ref delay-logging-depth-fluid))
- trace))
- #t)
- (lambda ()
- (apply proc args)))
- (apply proc args)))))
- (lambda vals
- (let ((elapsed-seconds
- (/ (- (get-internal-real-time)
- start)
- internal-time-units-per-second)))
- (if (and (> elapsed-seconds threshold)
- root-logger?)
- (let ((lines
- (cons
- (simple-format #f "warning: delay of ~A seconds: ~A"
- (format-seconds elapsed-seconds)
- proc)
- (map (match-lambda
- ((proc duration depth)
- (string-append
- (make-string (* 2 depth) #\space)
- (simple-format #f "~A: ~A"
- (format-seconds duration)
- proc))))
- trace))))
- (display (string-append
- (string-join lines "\n")
- "\n")))
- (unless root-logger?
- ((fluid-ref delay-logging-fluid) proc elapsed-seconds))))
- (apply values vals)))))
-
(define (call-with-time-logging name thunk)
(let ((start (current-time time-utc)))
(call-with-values