aboutsummaryrefslogtreecommitdiff
path: root/nar-herder/utils.scm
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2021-12-11 10:27:24 +0000
committerChristopher Baines <mail@cbaines.net>2021-12-12 16:35:38 +0000
commitf9ff69e1c79f024ed188ad51642cca443aedfee2 (patch)
tree609b37ff8d6fc3d557d339a67ba6641522b0a977 /nar-herder/utils.scm
parent7e280ca951e8ffa7c86224843075e65266911617 (diff)
downloadnar-herder-f9ff69e1c79f024ed188ad51642cca443aedfee2.tar
nar-herder-f9ff69e1c79f024ed188ad51642cca443aedfee2.tar.gz
Get most of the functionality sort of working
At least working enough to start trying this out, and finding the problems.
Diffstat (limited to 'nar-herder/utils.scm')
-rw-r--r--nar-herder/utils.scm651
1 files changed, 651 insertions, 0 deletions
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
new file mode 100644
index 0000000..a0a5171
--- /dev/null
+++ b/nar-herder/utils.scm
@@ -0,0 +1,651 @@
+;;; Nar Herder
+;;;
+;;; Copyright © 2021 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (nar-herder utils)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-19) ; time
+ #:use-module (ice-9 q)
+ ;; #:use-module (ice-9 ftw)
+ ;; #:use-module (ice-9 popen)
+ #:use-module (ice-9 iconv)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 format)
+ #:use-module (ice-9 threads)
+ #:use-module (ice-9 textual-ports)
+ #:use-module (ice-9 rdelim)
+ #:use-module (ice-9 binary-ports)
+ #:use-module (ice-9 exceptions)
+ #:use-module (rnrs bytevectors)
+ #:use-module (web uri)
+ #:use-module (web http)
+ #:use-module (web client)
+ #:use-module (web request)
+ #:use-module (web response)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers conditions)
+ ;; #:use-module (gcrypt pk-crypto)
+ ;; #:use-module (gcrypt hash)
+ ;; #:use-module (gcrypt random)
+ ;; #:use-module (json)
+ ;; #:use-module (guix pki)
+ ;; #:use-module (guix utils)
+ ;; #:use-module (guix config)
+ ;; #:use-module (guix store)
+ ;; #:use-module (guix status)
+ ;; #:use-module (guix base64)
+ ;; #:use-module (guix scripts substitute)
+ #:export (call-with-streaming-http-request
+ &chunked-input-ended-prematurely
+ chunked-input-ended-prematurely-error?
+ make-chunked-input-port*
+
+ make-worker-thread-channel
+ call-with-worker-thread
+
+ retry-on-error
+
+ create-work-queue
+
+ check-locale!))
+
+;; Chunked Responses
+(define (read-chunk-header port)
+ "Read a chunk header from PORT and return the size in bytes of the
+upcoming chunk."
+ (match (read-line port)
+ ((? eof-object?)
+ ;; Connection closed prematurely: there's nothing left to read.
+ (error "chunked input ended prematurely"))
+ (str
+ (let ((extension-start (string-index str
+ (lambda (c)
+ (or (char=? c #\;)
+ (char=? c #\return))))))
+ (string->number (if extension-start ; unnecessary?
+ (substring str 0 extension-start)
+ str)
+ 16)))))
+
+(define &chunked-input-ended-prematurely
+ (make-exception-type '&chunked-input-error-prematurely
+ &external-error
+ '()))
+
+(define make-chunked-input-ended-prematurely-error
+ (record-constructor &chunked-input-ended-prematurely))
+
+(define chunked-input-ended-prematurely-error?
+ (record-predicate &chunked-input-ended-prematurely))
+
+(define* (make-chunked-input-port* port #:key (keep-alive? #f))
+ (define (close)
+ (unless keep-alive?
+ (close-port port)))
+
+ (define chunk-size 0) ;size of the current chunk
+ (define remaining 0) ;number of bytes left from the current chunk
+ (define finished? #f) ;did we get all the chunks?
+
+ (define (read! bv idx to-read)
+ (define (loop to-read num-read)
+ (cond ((or finished? (zero? to-read))
+ num-read)
+ ((zero? remaining) ;get a new chunk
+ (let ((size (read-chunk-header port)))
+ (set! chunk-size size)
+ (set! remaining size)
+ (cond
+ ((zero? size)
+ (set! finished? #t)
+ (get-bytevector-n port 2) ; \r\n follows the last chunk
+ num-read)
+ (else
+ (loop to-read num-read)))))
+ (else ;read from the current chunk
+ (let* ((ask-for (min to-read remaining))
+ (read (get-bytevector-n! port bv (+ idx num-read)
+ ask-for)))
+ (cond
+ ((eof-object? read) ;premature termination
+ (raise-exception
+ (make-chunked-input-ended-prematurely-error)))
+ (else
+ (let ((left (- remaining read)))
+ (set! remaining left)
+ (when (zero? left)
+ ;; We're done with this chunk; read CR and LF.
+ (get-u8 port) (get-u8 port))
+ (loop (- to-read read)
+ (+ num-read read)))))))))
+ (loop to-read 0))
+
+ (make-custom-binary-input-port "chunked input port" read! #f #f close))
+
+(define* (make-chunked-output-port* port #:key (keep-alive? #f)
+ (buffering 1200)
+ report-bytes-sent)
+ (define heap-allocated-limit
+ (expt 2 20)) ;; 1MiB
+
+ (define (%put-string s)
+ (unless (string-null? s)
+ (let* ((bv (string->bytevector s "ISO-8859-1"))
+ (length (bytevector-length bv)))
+ (put-string port (number->string length 16))
+ (put-string port "\r\n")
+ (put-bytevector port bv)
+ (put-string port "\r\n")
+
+ (when report-bytes-sent
+ (report-bytes-sent length))
+ (let* ((stats (gc-stats))
+ (initial-gc-times
+ (assq-ref stats 'gc-times)))
+ (when (> (assq-ref stats 'heap-allocated-since-gc)
+ heap-allocated-limit)
+ (while (let ((updated-stats (gc-stats)))
+ (= (assq-ref updated-stats 'gc-times)
+ initial-gc-times))
+ (gc)
+ (usleep 50)))))))
+
+ (define (%put-char c)
+ (%put-string (list->string (list c))))
+
+ (define (flush) #t)
+ (define (close)
+ (put-string port "0\r\n\r\n")
+ (force-output port)
+ (unless keep-alive?
+ (close-port port)))
+ (let ((ret (make-soft-port
+ (vector %put-char %put-string flush #f close) "w")))
+ (setvbuf ret 'block buffering)
+ ret))
+
+(define* (call-with-streaming-http-request uri callback
+ #:key (headers '())
+ (method 'PUT)
+ report-bytes-sent)
+ (let* ((port (open-socket-for-uri uri))
+ (request
+ (build-request
+ uri
+ #:method method
+ #:version '(1 . 1)
+ #:headers `((connection close)
+ (Transfer-Encoding . "chunked")
+ (Content-Type . "application/octet-stream")
+ ,@headers)
+ #:port port)))
+
+ (set-port-encoding! port "ISO-8859-1")
+ (setvbuf port 'block (expt 2 13))
+ (with-exception-handler
+ (lambda (exp)
+ (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp)
+ (close-port port)
+ (raise-exception exp))
+ (lambda ()
+ (let ((request (write-request request port)))
+ (let* ((chunked-output-port
+ (make-chunked-output-port*
+ port
+ #:buffering (expt 2 12)
+ #:keep-alive? #t
+ #:report-bytes-sent report-bytes-sent)))
+
+ ;; A SIGPIPE will kill Guile, so ignore it
+ (sigaction SIGPIPE
+ (lambda (arg)
+ (simple-format (current-error-port) "warning: SIGPIPE\n")))
+
+ (set-port-encoding! chunked-output-port "ISO-8859-1")
+ (callback chunked-output-port)
+ (close-port chunked-output-port)
+
+ (let ((response (read-response port)))
+ (let ((body (read-response-body response)))
+ (close-port port)
+ (values response
+ body)))))))))
+
+(define* (retry-on-error f #:key times delay ignore)
+ (let loop ((attempt 1))
+ (match (with-exception-handler
+ (lambda (exn)
+ (when (cond
+ ((list? ignore)
+ (any (lambda (test)
+ (test exn))
+ ignore))
+ ((procedure? ignore)
+ (ignore exn))
+ (else #f))
+ (raise-exception exn))
+
+ (cons #f exn))
+ (lambda ()
+ (call-with-values f
+ (lambda vals
+ (cons #t vals))))
+ #:unwind? #t)
+ ((#t . return-values)
+ (when (> attempt 1)
+ (simple-format
+ (current-error-port)
+ "retry success: ~A\n on attempt ~A of ~A\n"
+ f
+ attempt
+ times))
+ (apply values return-values))
+ ((#f . exn)
+ (if (>= attempt times)
+ (begin
+ (simple-format
+ (current-error-port)
+ "error: ~A:\n ~A,\n giving up after ~A attempts\n"
+ f
+ exn
+ times)
+ (raise-exception exn))
+ (begin
+ (simple-format
+ (current-error-port)
+ "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n"
+ f
+ exn
+ attempt
+ times
+ delay)
+ (sleep delay)
+ (loop (+ 1 attempt))))))))
+
+(define delay-logging-fluid
+ (make-thread-local-fluid))
+(define delay-logging-depth-fluid
+ (make-thread-local-fluid 0))
+
+(define (log-delay proc duration)
+ (and=> (fluid-ref delay-logging-fluid)
+ (lambda (recorder)
+ (recorder proc duration))))
+
+(define* (call-with-delay-logging proc #:key (threshold 1) (args '()))
+ (let ((start (get-internal-real-time))
+ (trace '())
+ (root-logger? (eq? #f (fluid-ref delay-logging-fluid))))
+
+ (define (format-seconds seconds)
+ (format #f "~4f" seconds))
+
+ (call-with-values
+ (lambda ()
+ (with-fluid* delay-logging-depth-fluid
+ (+ 1 (fluid-ref delay-logging-depth-fluid))
+ (lambda ()
+ (if root-logger?
+ (with-fluid* delay-logging-fluid
+ (lambda (proc duration)
+ (set! trace
+ (cons (list proc
+ duration
+ (fluid-ref delay-logging-depth-fluid))
+ trace))
+ #t)
+ (lambda ()
+ (apply proc args)))
+ (apply proc args)))))
+ (lambda vals
+ (let ((elapsed-seconds
+ (/ (- (get-internal-real-time)
+ start)
+ internal-time-units-per-second)))
+ (if (and (> elapsed-seconds threshold)
+ root-logger?)
+ (let ((lines
+ (cons
+ (simple-format #f "warning: delay of ~A seconds: ~A"
+ (format-seconds elapsed-seconds)
+ proc)
+ (map (match-lambda
+ ((proc duration depth)
+ (string-append
+ (make-string (* 2 depth) #\space)
+ (simple-format #f "~A: ~A"
+ (format-seconds duration)
+ proc))))
+ trace))))
+ (display (string-append
+ (string-join lines "\n")
+ "\n")))
+ (unless root-logger?
+ ((fluid-ref delay-logging-fluid) proc elapsed-seconds))))
+ (apply values vals)))))
+
+(define (call-with-time-logging name thunk)
+ (let ((start (current-time time-utc)))
+ (call-with-values
+ thunk
+ (lambda vals
+ (let* ((end (current-time time-utc))
+ (elapsed (time-difference end start)))
+ (display
+ (format #f
+ "~a took ~f seconds~%"
+ name
+ (+ (time-second elapsed)
+ (/ (time-nanosecond elapsed) 1e9))))
+ (apply values vals))))))
+
+(define-syntax-rule (with-time-logging name exp ...)
+ "Log under NAME the time taken to evaluate EXP."
+ (call-with-time-logging name (lambda () exp ...)))
+
+(define* (create-work-queue thread-count-parameter proc
+ #:key thread-start-delay
+ (thread-stop-delay
+ (make-time time-duration 0 0)))
+ (let ((queue (make-q))
+ (queue-mutex (make-mutex))
+ (job-available (make-condition-variable))
+ (running-job-args (make-hash-table)))
+
+ (define get-thread-count
+ (cond
+ ((number? thread-count-parameter)
+ (const thread-count-parameter))
+ ((eq? thread-count-parameter #f)
+ ;; Run one thread per job
+ (lambda ()
+ (+ (q-length queue)
+ (hash-count (lambda (index val)
+ (list? val))
+ running-job-args))))
+ (else
+ thread-count-parameter)))
+
+ (define (process-job . args)
+ (with-mutex queue-mutex
+ (enq! queue args)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))
+
+ (define (count-threads)
+ (with-mutex queue-mutex
+ (hash-count (const #t) running-job-args)))
+
+ (define (count-jobs)
+ (with-mutex queue-mutex
+ (+ (q-length queue)
+ (hash-count (lambda (index val)
+ (list? val))
+ running-job-args))))
+
+ (define (list-jobs)
+ (with-mutex queue-mutex
+ (append (list-copy
+ (car queue))
+ (hash-fold (lambda (key val result)
+ (or (and val
+ (cons val result))
+ result))
+ '()
+ running-job-args))))
+
+ (define (thread-process-job job-args)
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "job raised exception: ~A\n"
+ job-args))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply proc job-args))
+ (lambda (key . args)
+ (simple-format (current-error-port)
+ "exception when handling job: ~A ~A\n"
+ key args)
+ (backtrace))))
+ #:unwind? #t))
+
+ (define (start-thread thread-index)
+ (define (too-many-threads?)
+ (let ((running-jobs-count
+ (hash-count (lambda (index val)
+ (list? val))
+ running-job-args))
+ (desired-thread-count (get-thread-count)))
+
+ (>= running-jobs-count
+ desired-thread-count)))
+
+ (define (thread-idle-for-too-long? last-job-finished-at)
+ (time>=?
+ (time-difference (current-time time-monotonic)
+ last-job-finished-at)
+ thread-stop-delay))
+
+ (define (stop-thread)
+ (hash-remove! running-job-args
+ thread-index)
+ (unlock-mutex queue-mutex))
+
+ (call-with-new-thread
+ (lambda ()
+ (let loop ((last-job-finished-at (current-time time-monotonic)))
+ (lock-mutex queue-mutex)
+
+ (if (too-many-threads?)
+ (stop-thread)
+ (let ((job-args
+ (if (q-empty? queue)
+ ;; #f from wait-condition-variable indicates a timeout
+ (if (wait-condition-variable
+ job-available
+ queue-mutex
+ (+ 9 (time-second (current-time))))
+ ;; Another thread could have taken
+ ;; the job in the mean time
+ (if (q-empty? queue)
+ #f
+ (deq! queue))
+ #f)
+ (deq! queue))))
+
+ (if job-args
+ (begin
+ (hash-set! running-job-args
+ thread-index
+ job-args)
+
+ (unlock-mutex queue-mutex)
+ (thread-process-job job-args)
+
+ (with-mutex queue-mutex
+ (hash-set! running-job-args
+ thread-index
+ #f))
+
+ (loop (current-time time-monotonic)))
+ (if (thread-idle-for-too-long? last-job-finished-at)
+ (stop-thread)
+ (begin
+ (unlock-mutex queue-mutex)
+
+ (loop last-job-finished-at))))))))))
+
+
+ (define start-new-threads-if-necessary
+ (let ((previous-thread-started-at (make-time time-monotonic 0 0)))
+ (lambda (desired-count)
+ (let* ((thread-count
+ (hash-count (const #t) running-job-args))
+ (threads-to-start
+ (- desired-count thread-count)))
+ (when (> threads-to-start 0)
+ (for-each
+ (lambda (thread-index)
+ (when (eq? (hash-ref running-job-args
+ thread-index
+ 'slot-free)
+ 'slot-free)
+ (let* ((now (current-time time-monotonic))
+ (elapsed (time-difference now
+ previous-thread-started-at)))
+ (when (or (eq? #f thread-start-delay)
+ (time>=? elapsed thread-start-delay))
+ (set! previous-thread-started-at now)
+ (hash-set! running-job-args
+ thread-index
+ #f)
+ (start-thread thread-index)))))
+ (iota desired-count)))))))
+
+ (if (procedure? thread-count-parameter)
+ (call-with-new-thread
+ (lambda ()
+ (while #t
+ (sleep 15)
+ (with-mutex queue-mutex
+ (let ((idle-threads (hash-count (lambda (index val)
+ (eq? #f val))
+ running-job-args)))
+ (when (= 0 idle-threads)
+ (start-new-threads-if-necessary (get-thread-count))))))))
+ (start-new-threads-if-necessary (get-thread-count)))
+
+ (values process-job count-jobs count-threads list-jobs)))
+
+(define (check-locale!)
+ (with-exception-handler
+ (lambda (exn)
+ (display
+ (simple-format
+ #f
+ "exception when calling setlocale: ~A
+falling back to en_US.utf8\n"
+ exn)
+ (current-error-port))
+
+ (with-exception-handler
+ (lambda (exn)
+ (display
+ (simple-format
+ #f
+ "exception when calling setlocale with en_US.utf8: ~A\n"
+ exn)
+ (current-error-port))
+
+ (exit 1))
+ (lambda _
+ (setlocale LC_ALL "en_US.utf8"))
+ #:unwind? #t))
+ (lambda _
+ (setlocale LC_ALL ""))
+ #:unwind? #t))
+
+(define %worker-thread-args
+ (make-parameter #f))
+
+(define* (make-worker-thread-channel initializer
+ #:key (parallelism 1)
+ (delay-logger (lambda _ #f))
+ destructor
+ lifetime
+ (log-exception? (const #t)))
+ "Return a channel used to offload work to a dedicated thread. ARGS are the
+arguments of the worker thread procedure."
+ (parameterize (((@@ (fibers internal) current-fiber) #f))
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda _
+ (call-with-new-thread
+ (lambda ()
+ (let init ((args (initializer)))
+ (parameterize ((%worker-thread-args args))
+ (let loop ((current-lifetime lifetime))
+ (match (get-message channel)
+ (((? channel? reply) sent-time (? procedure? proc))
+ (let ((time-delay
+ (- (get-internal-real-time)
+ sent-time)))
+ (delay-logger (/ time-delay
+ internal-time-units-per-second))
+ (put-message
+ reply
+ (let ((start-time (get-internal-real-time)))
+ (with-exception-handler
+ (lambda (exn)
+ (list 'worker-thread-error
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ exn))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (apply proc args))
+ (lambda vals
+ (cons (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ vals))))
+ (lambda args
+ (when (match args
+ (('%exception exn)
+ (log-exception? exn))
+ (_ #t))
+ (simple-format
+ (current-error-port)
+ "worker-thread: exception: ~A\n" args)
+ (backtrace)))))
+ #:unwind? #t))))))
+ (if (number? current-lifetime)
+ (unless (< current-lifetime 0)
+ (loop (if current-lifetime
+ (- current-lifetime 1)
+ #f)))
+ (loop #f))))
+ (when destructor
+ (apply destructor args))
+ (init (initializer))))))
+ (iota parallelism))
+ channel)))
+
+(define* (call-with-worker-thread channel proc #:key duration-logger)
+ "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
+If already in the worker thread, call PROC immediately."
+ (let ((args (%worker-thread-args)))
+ (if args
+ (apply proc args)
+ (let ((reply (make-channel)))
+ (put-message channel (list reply (get-internal-real-time) proc))
+ (match (get-message reply)
+ (('worker-thread-error duration exn)
+ (when duration-logger
+ (duration-logger duration))
+ (raise-exception exn))
+ ((duration . result)
+ (when duration-logger
+ (duration-logger duration))
+ (apply values result)))))))