diff options
author | Christopher Baines <mail@cbaines.net> | 2025-01-13 09:28:57 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2025-01-13 09:28:57 +0000 |
commit | 69c8f3d26112a97bdf04f2c50634d162bb6eeee3 (patch) | |
tree | 5c8bee05934c538a1c37a46fc0174af6b78abad4 | |
parent | 2d513a2e24a6acd223ca3093c3fb2272865366dc (diff) | |
download | nar-herder-69c8f3d26112a97bdf04f2c50634d162bb6eeee3.tar nar-herder-69c8f3d26112a97bdf04f2c50634d162bb6eeee3.tar.gz |
Use Guile Knots
-rw-r--r-- | guix-dev.scm | 34 | ||||
-rw-r--r-- | nar-herder/cached-compression.scm | 6 | ||||
-rw-r--r-- | nar-herder/database.scm | 1 | ||||
-rw-r--r-- | nar-herder/mirror.scm | 6 | ||||
-rw-r--r-- | nar-herder/server.scm | 9 | ||||
-rw-r--r-- | nar-herder/storage.scm | 7 | ||||
-rw-r--r-- | nar-herder/utils.scm | 626 |
7 files changed, 54 insertions, 635 deletions
diff --git a/guix-dev.scm b/guix-dev.scm index 61e282c..b2e6da3 100644 --- a/guix-dev.scm +++ b/guix-dev.scm @@ -45,6 +45,39 @@ (gnu packages web) (srfi srfi-1)) +(define guile-knots + (let ((commit "dcb56ee2c5ac3e283cb46841766e7282f3c2c52e") + (revision "1")) + (package + (name "guile-knots") + (version (git-version "0" revision commit)) + (source (origin + (method git-fetch) + (uri (git-reference + (url "https://git.cbaines.net/git/guile/knots") + (commit commit))) + (sha256 + (base32 + "04z48572canx35hl0kfli3pf3g3m6184zvmnpyg1rbwla6g5z1fk")) + (file-name (string-append name "-" version "-checkout")))) + (build-system gnu-build-system) + (native-inputs + (list pkg-config + autoconf + automake + guile-3.0 + guile-lib + guile-fibers)) + (inputs + (list guile-3.0)) + (propagated-inputs + (list guile-fibers)) + (home-page "https://git.cbaines.net/guile/knots") + (synopsis "Patterns and functionality to use with Guile Fibers") + (description + "") + (license license:gpl3+)))) + (package (name "nar-herder") (version "0") @@ -54,6 +87,7 @@ `(("guix" ,guix) ("guile-json" ,guile-json-4) ("guile-fibers" ,guile-fibers-1.3) + ("guile-knots" ,guile-knots) ("guile-gcrypt" ,guile-gcrypt) ("guile-readline" ,guile-readline) ("guile-lzlib" ,guile-lzlib) diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm index 5c257dc..1ac6d96 100644 --- a/nar-herder/cached-compression.scm +++ b/nar-herder/cached-compression.scm @@ -31,6 +31,9 @@ #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) + #:use-module (knots timeout) + #:use-module (knots non-blocking) + #:use-module (knots worker-threads) #:use-module (web uri) #:use-module (web client) #:use-module (web response) @@ -662,8 +665,7 @@ (call-with-values (lambda () (let ((port - socket - (open-socket-for-uri* uri))) + (non-blocking-open-socket-for-uri uri))) (http-get uri #:port port #:decode-body? #f diff --git a/nar-herder/database.scm b/nar-herder/database.scm index 63eeac0..7c88c8c 100644 --- a/nar-herder/database.scm +++ b/nar-herder/database.scm @@ -27,6 +27,7 @@ #:use-module (web uri) #:use-module (sqlite3) #:use-module (fibers) + #:use-module (knots worker-threads) #:use-module (prometheus) #:use-module (guix store) #:use-module (guix narinfo) diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm index 14e23a9..67d4c00 100644 --- a/nar-herder/mirror.scm +++ b/nar-herder/mirror.scm @@ -32,6 +32,8 @@ #:use-module (json) #:use-module (fibers) #:use-module (fibers channels) + #:use-module (knots timeout) + #:use-module (knots non-blocking) #:use-module (guix narinfo) #:use-module ((guix store) #:select (store-path-hash-part)) #:use-module (nar-herder utils) @@ -77,9 +79,7 @@ latest-recent-change) (with-port-timeouts (lambda () - (let ((port - socket - (open-socket-for-uri* uri))) + (let ((port (non-blocking-open-socket-for-uri uri))) (http-get uri #:port port #:streaming? #t))) diff --git a/nar-herder/server.scm b/nar-herder/server.scm index 97f0567..b904675 100644 --- a/nar-herder/server.scm +++ b/nar-herder/server.scm @@ -38,6 +38,10 @@ #:use-module (fibers scheduler) #:use-module (fibers conditions) #:use-module (fibers operations) + #:use-module (knots) + #:use-module (knots timeout) + #:use-module (knots web-server) + #:use-module (knots non-blocking) #:use-module (prometheus) #:use-module (json) #:use-module ((system foreign) @@ -592,8 +596,7 @@ (simple-format (current-error-port) "starting downloading the database\n") (let ((port - socket - (open-socket-for-uri* database-uri))) + (non-blocking-open-socket-for-uri database-uri))) (http-get database-uri #:port port #:streaming? #t))) @@ -923,7 +926,7 @@ (log-msg 'INFO "starting server (" (getpid) "), listening on " (assq-ref opts 'host) ":" (assq-ref opts 'port)) - (run-server/patched + (run-knots-web-server (make-request-handler database canonical-storage diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm index 55e374e..0e7186d 100644 --- a/nar-herder/storage.scm +++ b/nar-herder/storage.scm @@ -28,6 +28,8 @@ #:use-module (web response) #:use-module (fibers) #:use-module (fibers channels) + #:use-module (knots timeout) + #:use-module (knots non-blocking) #:use-module (logging logger) #:use-module (logging port-log) #:use-module (prometheus) @@ -356,7 +358,7 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." ;; Open a new connection to URI and evict old entries from ;; CACHE, if any. (let ((socket - (open-socket-for-uri* + (non-blocking-open-socket-for-uri uri #:verify-certificate? verify-certificate?)) (new-cache evicted @@ -620,8 +622,7 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." (call-with-values (lambda () (let ((port - socket - (open-socket-for-uri* uri))) + (non-blocking-open-socket-for-uri uri))) (http-get uri #:port port #:decode-body? #f diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index 5bac2da..4155ea0 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -31,10 +31,6 @@ #:use-module (ice-9 binary-ports) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) - #:use-module (ice-9 suspendable-ports) - #:use-module ((ice-9 ports internal) #:select (port-poll - port-read-wait-fd - port-write-wait-fd)) #:use-module (web uri) #:use-module (web http) #:use-module (web client) @@ -46,30 +42,12 @@ #:use-module (fibers scheduler) #:use-module (fibers conditions) #:use-module (fibers operations) - #:use-module ((guix build syscalls) - #:select (set-thread-name)) - #:export (make-worker-thread-set - call-with-worker-thread - - call-with-time-logging + #:export (call-with-time-logging with-time-logging retry-on-error - create-work-queue - - check-locale! - - open-socket-for-uri* - - call-with-sigint - run-server/patched - - timeout-error? - - port-read-timeout-error? - port-write-timeout-error? - with-port-timeouts)) + check-locale!)) (define* (retry-on-error f #:key times delay ignore error-hook) (let loop ((attempt 1)) @@ -154,218 +132,6 @@ "Log under NAME the time taken to evaluate EXP." (call-with-time-logging name (lambda () exp ...))) -(define* (create-work-queue thread-count-parameter proc - #:key thread-start-delay - (thread-stop-delay - (make-time time-duration 0 0)) - (name "unnamed") - priority<?) - (let ((queue (make-q)) - (queue-mutex (make-mutex)) - (job-available (make-condition-variable)) - (running-job-args (make-hash-table))) - - (define get-thread-count - (cond - ((number? thread-count-parameter) - (const thread-count-parameter)) - ((eq? thread-count-parameter #f) - ;; Run one thread per job - (lambda () - (+ (q-length queue) - (hash-count (lambda (index val) - (list? val)) - running-job-args)))) - (else - thread-count-parameter))) - - (define process-job - (if priority<? - (lambda* (args #:key priority) - (with-mutex queue-mutex - (enq! queue (cons priority args)) - (set-car! - queue - (stable-sort! (car queue) - (lambda (a b) - (priority<? - (car a) - (car b))))) - (sync-q! queue) - (start-new-threads-if-necessary (get-thread-count)) - (signal-condition-variable job-available))) - (lambda args - (with-mutex queue-mutex - (enq! queue args) - (start-new-threads-if-necessary (get-thread-count)) - (signal-condition-variable job-available))))) - - (define (count-threads) - (with-mutex queue-mutex - (hash-count (const #t) running-job-args))) - - (define (count-jobs) - (with-mutex queue-mutex - (+ (q-length queue) - (hash-count (lambda (index val) - (list? val)) - running-job-args)))) - - (define (list-jobs) - (with-mutex queue-mutex - (append (if priority<? - (map cdr (car queue)) - (list-copy (car queue))) - (hash-fold (lambda (key val result) - (if val - (cons val result) - result)) - '() - running-job-args)))) - - (define (thread-process-job job-args) - (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "~A work queue, job raised exception ~A: ~A\n" - name job-args exn)) - (lambda () - (with-throw-handler #t - (lambda () - (apply proc job-args)) - (lambda (key . args) - (simple-format - (current-error-port) - "~A work queue, exception when handling job: ~A ~A\n" - name key args) - (backtrace)))) - #:unwind? #t)) - - (define (start-thread thread-index) - (define (too-many-threads?) - (let ((running-jobs-count - (hash-count (lambda (index val) - (list? val)) - running-job-args)) - (desired-thread-count (get-thread-count))) - - (>= running-jobs-count - desired-thread-count))) - - (define (thread-idle-for-too-long? last-job-finished-at) - (time>=? - (time-difference (current-time time-monotonic) - last-job-finished-at) - thread-stop-delay)) - - (define (stop-thread) - (hash-remove! running-job-args - thread-index) - (unlock-mutex queue-mutex)) - - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append name " q t " - (number->string thread-index)))) - (const #t)) - - (let loop ((last-job-finished-at (current-time time-monotonic))) - (lock-mutex queue-mutex) - - (if (too-many-threads?) - (stop-thread) - (let ((job-args - (if (q-empty? queue) - ;; #f from wait-condition-variable indicates a timeout - (if (wait-condition-variable - job-available - queue-mutex - (+ 9 (time-second (current-time)))) - ;; Another thread could have taken - ;; the job in the mean time - (if (q-empty? queue) - #f - (if priority<? - (cdr (deq! queue)) - (deq! queue))) - #f) - (if priority<? - (cdr (deq! queue)) - (deq! queue))))) - - (if job-args - (begin - (hash-set! running-job-args - thread-index - job-args) - - (unlock-mutex queue-mutex) - (thread-process-job job-args) - - (with-mutex queue-mutex - (hash-set! running-job-args - thread-index - #f)) - - (loop (current-time time-monotonic))) - (if (thread-idle-for-too-long? last-job-finished-at) - (stop-thread) - (begin - (unlock-mutex queue-mutex) - - (loop last-job-finished-at)))))))))) - - - (define start-new-threads-if-necessary - (let ((previous-thread-started-at (make-time time-monotonic 0 0))) - (lambda (desired-count) - (let* ((thread-count - (hash-count (const #t) running-job-args)) - (threads-to-start - (- desired-count thread-count))) - (when (> threads-to-start 0) - (for-each - (lambda (thread-index) - (when (eq? (hash-ref running-job-args - thread-index - 'slot-free) - 'slot-free) - (let* ((now (current-time time-monotonic)) - (elapsed (time-difference now - previous-thread-started-at))) - (when (or (eq? #f thread-start-delay) - (time>=? elapsed thread-start-delay)) - (set! previous-thread-started-at now) - (hash-set! running-job-args - thread-index - #f) - (start-thread thread-index))))) - (iota desired-count))))))) - - (if (procedure? thread-count-parameter) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append name " q t"))) - (const #t)) - - (while #t - (sleep 15) - (with-mutex queue-mutex - (let ((idle-threads (hash-count (lambda (index val) - (eq? #f val)) - running-job-args))) - (when (= 0 idle-threads) - (start-new-threads-if-necessary (get-thread-count)))))))) - (start-new-threads-if-necessary (get-thread-count))) - - (values process-job count-jobs count-threads list-jobs))) - (define (check-locale!) (with-exception-handler (lambda (exn) @@ -393,391 +159,3 @@ falling back to en_US.utf8\n" (lambda _ (setlocale LC_ALL "")) #:unwind? #t)) - -(define-record-type <worker-thread-set> - (worker-thread-set channel arguments-parameter) - worker-thread-set? - (channel worker-thread-set-channel) - (arguments-parameter worker-thread-set-arguments-parameter)) - -(define* (make-worker-thread-set initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - destructor - lifetime - (log-exception? (const #t)) - (expire-on-exception? #f) - (name "unnamed")) - (define param - (make-parameter #f)) - - (define (initializer/safe) - (let ((args - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running initializer in worker thread (~A): ~A:\n ~A\n" - name - initializer - exn) - #f) - (lambda () - (with-throw-handler #t - initializer - (lambda args - (backtrace)))) - #:unwind? #t))) - - (if args - args - ;; never give up, just keep retrying - (begin - (sleep 5) - (initializer/safe))))) - - (define (destructor/safe args) - (let ((success? - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running destructor in worker thread (~A): ~A:\n ~A\n" - name - initializer - exn) - #f) - (lambda () - (with-throw-handler #t - (lambda () - (apply destructor args) - #t) - (lambda _ - (backtrace)))) - #:unwind? #t))) - - (or success? - #t - (begin - (sleep 5) - (destructor/safe args))))) - - (let ((channel (make-channel))) - (for-each - (lambda (thread-index) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append - name " w t " - (number->string thread-index)))) - (const #t)) - - (let init ((args (initializer/safe))) - (parameterize ((param args)) - (let loop ((current-lifetime lifetime)) - (let ((exception? - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second) - proc) - - (let* ((start-time (get-internal-real-time)) - (response - (with-exception-handler - (lambda (exn) - (list 'worker-thread-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (call-with-values - (lambda () - (apply proc args)) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - vals)))) - (lambda args - (when (match args - (('%exception exn) - (log-exception? exn)) - (_ #t)) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" args) - (backtrace))))) - #:unwind? #t))) - (put-message reply - response) - - (match response - (('worker-thread-error duration _) - (when duration-logger - (duration-logger duration proc)) - #t) - ((duration . _) - (when duration-logger - (duration-logger duration proc)) - #f)))))))) - - (unless (and expire-on-exception? - exception?) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))))) - - (when destructor - (destructor/safe args)) - - (init (initializer/safe)))))) - (iota parallelism)) - - (worker-thread-set channel - param))) - -(define* (call-with-worker-thread record proc #:key duration-logger) - "Send PROC to the worker thread through CHANNEL. Return the result of PROC. -If already in the worker thread, call PROC immediately." - (let ((args ((worker-thread-set-arguments-parameter record)))) - (if args - (apply proc args) - (let ((reply (make-channel))) - (put-message (worker-thread-set-channel record) - (list reply (get-internal-real-time) proc)) - (match (get-message reply) - (('worker-thread-error duration exn) - (when duration-logger - (duration-logger duration)) - (raise-exception exn)) - ((duration . result) - (when duration-logger - (duration-logger duration)) - (apply values result))))))) - -(define* (open-socket-for-uri* uri - #:key (verify-certificate? #t)) - (define tls-wrap - (@@ (web client) tls-wrap)) - - (define https? - (eq? 'https (uri-scheme uri))) - - (define plain-uri - (if https? - (build-uri - 'http - #:userinfo (uri-userinfo uri) - #:host (uri-host uri) - #:port (or (uri-port uri) 443) - #:path (uri-path uri) - #:query (uri-query uri) - #:fragment (uri-fragment uri)) - uri)) - - (let ((s (open-socket-for-uri plain-uri))) - (values - (if https? - (let ((port - (tls-wrap s (uri-host uri) - #:verify-certificate? verify-certificate?))) - ;; Guile/guile-gnutls don't handle the handshake happening on a non - ;; blocking socket, so change the behavior here. - (let ((flags (fcntl s F_GETFL))) - (fcntl s F_SETFL (logior O_NONBLOCK flags))) - port) - (let ((flags (fcntl s F_GETFL))) - (fcntl s F_SETFL (logior O_NONBLOCK flags)) - s)) - s))) - -;; Copied from (fibers web server) -(define (call-with-sigint thunk cvar) - (let ((handler #f)) - (dynamic-wind - (lambda () - (set! handler - (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) - thunk - (lambda () - (if handler - ;; restore Scheme handler, SIG_IGN or SIG_DFL. - (sigaction SIGINT (car handler) (cdr handler)) - ;; restore original C handler. - (sigaction SIGINT #f)))))) - -;; This variant of run-server from the fibers library supports running -;; multiple servers within one process. -(define run-server/patched - (let ((fibers-web-server-module - (resolve-module '(fibers web server)))) - - (define set-nonblocking! - (module-ref fibers-web-server-module 'set-nonblocking!)) - - (define make-default-socket - (module-ref fibers-web-server-module 'make-default-socket)) - - (define socket-loop - (module-ref fibers-web-server-module 'socket-loop)) - - (lambda* (handler - #:key - (host #f) - (family AF_INET) - (addr (if host - (inet-pton family host) - INADDR_LOOPBACK)) - (port 8080) - (socket (make-default-socket family addr port))) - ;; We use a large backlog by default. If the server is suddenly hit - ;; with a number of connections on a small backlog, clients won't - ;; receive confirmation for their SYN, leading them to retry -- - ;; probably successfully, but with a large latency. - (listen socket 1024) - (set-nonblocking! socket) - (sigaction SIGPIPE SIG_IGN) - (spawn-fiber (lambda () (socket-loop socket handler)))))) - -;; These procedure are subject to spurious wakeups. - -(define (readable? port) - "Test if PORT is writable." - (= 1 (port-poll port "r" 0))) - -(define (writable? port) - "Test if PORT is writable." - (= 1 (port-poll port "w" 0))) - -(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) - (make-base-operation #f - (lambda _ - (and (ready? port) values)) - (lambda (flag sched resume) - (define (commit) - (match (atomic-box-compare-and-swap! flag 'W 'S) - ('W (resume values)) - ('C (commit)) - ('S #f))) - (schedule-when-ready - sched (port-ready-fd port) commit)))) - -(define (wait-until-port-readable-operation port) - "Make an operation that will succeed when PORT is readable." - (unless (input-port? port) - (error "refusing to wait forever for input on non-input port")) - (make-wait-operation readable? schedule-task-when-fd-readable port - port-read-wait-fd - wait-until-port-readable-operation)) - -(define (wait-until-port-writable-operation port) - "Make an operation that will succeed when PORT is writable." - (unless (output-port? port) - (error "refusing to wait forever for output on non-output port")) - (make-wait-operation writable? schedule-task-when-fd-writable port - port-write-wait-fd - wait-until-port-writable-operation)) - - - -(define &port-timeout - (make-exception-type '&port-timeout - &external-error - '(thunk port))) - -(define make-port-timeout-error - (record-constructor &port-timeout)) - -(define port-timeout-error? - (record-predicate &port-timeout)) - -(define &port-read-timeout - (make-exception-type '&port-read-timeout - &port-timeout - '())) - -(define make-port-read-timeout-error - (record-constructor &port-read-timeout)) - -(define port-read-timeout-error? - (record-predicate &port-read-timeout)) - -(define &port-write-timeout - (make-exception-type '&port-write-timeout - &port-timeout - '())) - -(define make-port-write-timeout-error - (record-constructor &port-write-timeout)) - -(define port-write-timeout-error? - (record-predicate &port-write-timeout)) - -(define* (with-port-timeouts thunk - #:key timeout - (read-timeout timeout) - (write-timeout timeout)) - (define (no-fibers-wait thunk port mode timeout) - (define poll-timeout-ms 200) - - ;; When the GC runs, it restarts the poll syscall, but the timeout - ;; remains unchanged! When the timeout is longer than the time - ;; between the syscall restarting, I think this renders the - ;; timeout useless. Therefore, this code uses a short timeout, and - ;; repeatedly calls poll while watching the clock to see if it has - ;; timed out overall. - (let ((timeout-internal - (+ (get-internal-real-time) - (* timeout internal-time-units-per-second)))) - (let loop ((poll-value - (port-poll port mode poll-timeout-ms))) - (if (= poll-value 0) - (if (> (get-internal-real-time) - timeout-internal) - (raise-exception - (if (string=? mode "r") - (make-port-read-timeout-error thunk port) - (make-port-write-timeout-error thunk port))) - (loop (port-poll port mode poll-timeout-ms))) - poll-value)))) - - (parameterize - ((current-read-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-readable-operation port) - (wrap-operation - (sleep-operation read-timeout) - (lambda () - (raise-exception - (make-port-read-timeout-error thunk port)))))) - (no-fibers-wait thunk port "r" read-timeout)))) - (current-write-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-writable-operation port) - (wrap-operation - (sleep-operation write-timeout) - (lambda () - (raise-exception - (make-port-write-timeout-error thunk port)))))) - (no-fibers-wait thunk port "w" write-timeout))))) - (thunk))) |