;;; Nar Herder ;;; ;;; Copyright © 2021 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (nar-herder utils) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) ; time #:use-module (ice-9 q) #:use-module (ice-9 iconv) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 atomic) #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (ice-9 suspendable-ports) #:use-module ((ice-9 ports internal) #:select (port-poll port-read-wait-fd port-write-wait-fd)) #:use-module (web uri) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers conditions) #:use-module (fibers operations) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:export (make-worker-thread-set call-with-worker-thread call-with-time-logging with-time-logging retry-on-error create-work-queue check-locale! open-socket-for-uri* call-with-sigint run-server/patched timeout-error? port-read-timeout-error? port-write-timeout-error? with-port-timeouts)) (define* (retry-on-error f #:key times delay ignore error-hook) (let loop ((attempt 1)) (match (with-exception-handler (lambda (exn) (when (cond ((list? ignore) (any (lambda (test) (test exn)) ignore)) ((procedure? ignore) (ignore exn)) (else #f)) (raise-exception exn)) (cons #f exn)) (lambda () (call-with-values f (lambda vals (cons #t vals)))) #:unwind? #t) ((#t . return-values) (when (> attempt 1) (simple-format (current-error-port) "retry success: ~A\n on attempt ~A of ~A\n" f attempt times)) (apply values return-values)) ((#f . exn) (if (>= attempt (- times 1)) (begin (simple-format (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n" f exn attempt times delay) (when error-hook (error-hook attempt exn)) (sleep delay) (simple-format (current-error-port) "running last retry of ~A after ~A failed attempts\n" f attempt) (f)) (begin (simple-format (current-error-port) "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n" f exn attempt times delay) (when error-hook (error-hook attempt exn)) (sleep delay) (loop (+ 1 attempt)))))))) (define (call-with-time-logging name thunk) (let ((start (current-time time-utc))) (call-with-values thunk (lambda vals (let* ((end (current-time time-utc)) (elapsed (time-difference end start))) (display (format #f "~a took ~f seconds~%" name (+ (time-second elapsed) (/ (time-nanosecond elapsed) 1e9)))) (apply values vals)))))) (define-syntax-rule (with-time-logging name exp ...) "Log under NAME the time taken to evaluate EXP." (call-with-time-logging name (lambda () exp ...))) (define* (create-work-queue thread-count-parameter proc #:key thread-start-delay (thread-stop-delay (make-time time-duration 0 0)) (name "unnamed") priority= running-jobs-count desired-thread-count))) (define (thread-idle-for-too-long? last-job-finished-at) (time>=? (time-difference (current-time time-monotonic) last-job-finished-at) thread-stop-delay)) (define (stop-thread) (hash-remove! running-job-args thread-index) (unlock-mutex queue-mutex)) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " q t " (number->string thread-index)))) (const #t)) (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex queue-mutex) (if (too-many-threads?) (stop-thread) (let ((job-args (if (q-empty? queue) ;; #f from wait-condition-variable indicates a timeout (if (wait-condition-variable job-available queue-mutex (+ 9 (time-second (current-time)))) ;; Another thread could have taken ;; the job in the mean time (if (q-empty? queue) #f (if priority threads-to-start 0) (for-each (lambda (thread-index) (when (eq? (hash-ref running-job-args thread-index 'slot-free) 'slot-free) (let* ((now (current-time time-monotonic)) (elapsed (time-difference now previous-thread-started-at))) (when (or (eq? #f thread-start-delay) (time>=? elapsed thread-start-delay)) (set! previous-thread-started-at now) (hash-set! running-job-args thread-index #f) (start-thread thread-index))))) (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " q t"))) (const #t)) (while #t (sleep 15) (with-mutex queue-mutex (let ((idle-threads (hash-count (lambda (index val) (eq? #f val)) running-job-args))) (when (= 0 idle-threads) (start-new-threads-if-necessary (get-thread-count)))))))) (start-new-threads-if-necessary (get-thread-count))) (values process-job count-jobs count-threads list-jobs))) (define (check-locale!) (with-exception-handler (lambda (exn) (display (simple-format #f "exception when calling setlocale: ~A falling back to en_US.utf8\n" exn) (current-error-port)) (with-exception-handler (lambda (exn) (display (simple-format #f "exception when calling setlocale with en_US.utf8: ~A\n" exn) (current-error-port)) (exit 1)) (lambda _ (setlocale LC_ALL "en_US.utf8")) #:unwind? #t)) (lambda _ (setlocale LC_ALL "")) #:unwind? #t)) (define-record-type (worker-thread-set channel arguments-parameter) worker-thread-set? (channel worker-thread-set-channel) (arguments-parameter worker-thread-set-arguments-parameter)) (define* (make-worker-thread-set initializer #:key (parallelism 1) (delay-logger (lambda _ #f)) (duration-logger (const #f)) destructor lifetime (log-exception? (const #t)) (expire-on-exception? #f) (name "unnamed")) (define param (make-parameter #f)) (define (initializer/safe) (let ((args (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running initializer in worker thread (~A): ~A:\n ~A\n" name initializer exn) #f) (lambda () (with-throw-handler #t initializer (lambda args (backtrace)))) #:unwind? #t))) (if args args ;; never give up, just keep retrying (begin (sleep 5) (initializer/safe))))) (define (destructor/safe args) (let ((success? (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running destructor in worker thread (~A): ~A:\n ~A\n" name initializer exn) #f) (lambda () (with-throw-handler #t (lambda () (apply destructor args) #t) (lambda _ (backtrace)))) #:unwind? #t))) (or success? #t (begin (sleep 5) (destructor/safe args))))) (let ((channel (make-channel))) (for-each (lambda (thread-index) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " w t " (number->string thread-index)))) (const #t)) (let init ((args (initializer/safe))) (parameterize ((param args)) (let loop ((current-lifetime lifetime)) (let ((exception? (match (get-message channel) (((? channel? reply) sent-time (? procedure? proc)) (let ((time-delay (- (get-internal-real-time) sent-time))) (delay-logger (/ time-delay internal-time-units-per-second) proc) (let* ((start-time (get-internal-real-time)) (response (with-exception-handler (lambda (exn) (list 'worker-thread-error (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) exn)) (lambda () (with-throw-handler #t (lambda () (call-with-values (lambda () (apply proc args)) (lambda vals (cons (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) vals)))) (lambda args (when (match args (('%exception exn) (log-exception? exn)) (_ #t)) (simple-format (current-error-port) "worker-thread: exception: ~A\n" args) (backtrace))))) #:unwind? #t))) (put-message reply response) (match response (('worker-thread-error duration _) (when duration-logger (duration-logger duration proc)) #t) ((duration . _) (when duration-logger (duration-logger duration proc)) #f)))))))) (unless (and expire-on-exception? exception?) (if (number? current-lifetime) (unless (< current-lifetime 0) (loop (if current-lifetime (- current-lifetime 1) #f))) (loop #f)))))) (when destructor (destructor/safe args)) (init (initializer/safe)))))) (iota parallelism)) (worker-thread-set channel param))) (define* (call-with-worker-thread record proc #:key duration-logger) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." (let ((args ((worker-thread-set-arguments-parameter record)))) (if args (apply proc args) (let ((reply (make-channel))) (put-message (worker-thread-set-channel record) (list reply (get-internal-real-time) proc)) (match (get-message reply) (('worker-thread-error duration exn) (when duration-logger (duration-logger duration)) (raise-exception exn)) ((duration . result) (when duration-logger (duration-logger duration)) (apply values result))))))) (define* (open-socket-for-uri* uri #:key (verify-certificate? #t)) (define tls-wrap (@@ (web client) tls-wrap)) (define https? (eq? 'https (uri-scheme uri))) (define plain-uri (if https? (build-uri 'http #:userinfo (uri-userinfo uri) #:host (uri-host uri) #:port (or (uri-port uri) 443) #:path (uri-path uri) #:query (uri-query uri) #:fragment (uri-fragment uri)) uri)) (let ((s (open-socket-for-uri plain-uri))) (values (if https? (let ((port (tls-wrap s (uri-host uri) #:verify-certificate? verify-certificate?))) ;; Guile/guile-gnutls don't handle the handshake happening on a non ;; blocking socket, so change the behavior here. (let ((flags (fcntl s F_GETFL))) (fcntl s F_SETFL (logior O_NONBLOCK flags))) port) (let ((flags (fcntl s F_GETFL))) (fcntl s F_SETFL (logior O_NONBLOCK flags)) s)) s))) ;; Copied from (fibers web server) (define (call-with-sigint thunk cvar) (let ((handler #f)) (dynamic-wind (lambda () (set! handler (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) thunk (lambda () (if handler ;; restore Scheme handler, SIG_IGN or SIG_DFL. (sigaction SIGINT (car handler) (cdr handler)) ;; restore original C handler. (sigaction SIGINT #f)))))) ;; This variant of run-server from the fibers library supports running ;; multiple servers within one process. (define run-server/patched (let ((fibers-web-server-module (resolve-module '(fibers web server)))) (define set-nonblocking! (module-ref fibers-web-server-module 'set-nonblocking!)) (define make-default-socket (module-ref fibers-web-server-module 'make-default-socket)) (define socket-loop (module-ref fibers-web-server-module 'socket-loop)) (lambda* (handler #:key (host #f) (family AF_INET) (addr (if host (inet-pton family host) INADDR_LOOPBACK)) (port 8080) (socket (make-default-socket family addr port))) ;; We use a large backlog by default. If the server is suddenly hit ;; with a number of connections on a small backlog, clients won't ;; receive confirmation for their SYN, leading them to retry -- ;; probably successfully, but with a large latency. (listen socket 1024) (set-nonblocking! socket) (sigaction SIGPIPE SIG_IGN) (spawn-fiber (lambda () (socket-loop socket handler)))))) ;; These procedure are subject to spurious wakeups. (define (readable? port) "Test if PORT is writable." (match (select (vector port) #() #() 0) ((#() #() #()) #f) ((#(_) #() #()) #t))) (define (writable? port) "Test if PORT is writable." (match (select #() (vector port) #() 0) ((#() #() #()) #f) ((#() #(_) #()) #t))) (define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) (make-base-operation #f (lambda _ (and (ready? (port-ready-fd port)) values)) (lambda (flag sched resume) (define (commit) (match (atomic-box-compare-and-swap! flag 'W 'S) ('W (resume values)) ('C (commit)) ('S #f))) (schedule-when-ready sched (port-ready-fd port) commit)))) (define (wait-until-port-readable-operation port) "Make an operation that will succeed when PORT is readable." (unless (input-port? port) (error "refusing to wait forever for input on non-input port")) (make-wait-operation readable? schedule-task-when-fd-readable port port-read-wait-fd wait-until-port-readable-operation)) (define (wait-until-port-writable-operation port) "Make an operation that will succeed when PORT is writable." (unless (output-port? port) (error "refusing to wait forever for output on non-output port")) (make-wait-operation writable? schedule-task-when-fd-writable port port-write-wait-fd wait-until-port-writable-operation)) (define &port-timeout (make-exception-type '&port-timeout &external-error '(port))) (define make-port-timeout-error (record-constructor &port-timeout)) (define port-timeout-error? (record-predicate &port-timeout)) (define &port-read-timeout (make-exception-type '&port-read-timeout &port-timeout '())) (define make-port-read-timeout-error (record-constructor &port-read-timeout)) (define port-read-timeout-error? (record-predicate &port-read-timeout)) (define &port-write-timeout (make-exception-type '&port-write-timeout &port-timeout '())) (define make-port-write-timeout-error (record-constructor &port-write-timeout)) (define port-write-timeout-error? (record-predicate &port-write-timeout)) (define* (with-port-timeouts thunk #:key timeout (read-timeout timeout) (write-timeout timeout)) (define (no-fibers-wait port mode timeout) (define poll-timeout-ms 200) ;; When the GC runs, it restarts the poll syscall, but the timeout ;; remains unchanged! When the timeout is longer than the time ;; between the syscall restarting, I think this renders the ;; timeout useless. Therefore, this code uses a short timeout, and ;; repeatedly calls poll while watching the clock to see if it has ;; timed out overall. (let ((timeout-internal (+ (get-internal-real-time) (* internal-time-units-per-second (/ timeout 1000))))) (let loop ((poll-value (port-poll port mode poll-timeout-ms))) (if (= poll-value 0) (if (> (get-internal-real-time) timeout-internal) (raise-exception (if (string=? mode "r") (make-port-read-timeout-error port) (make-port-write-timeout-error port))) (loop (port-poll port mode poll-timeout-ms))) poll-value)))) (parameterize ((current-read-waiter (lambda (port) (if (current-scheduler) (perform-operation (choice-operation (wait-until-port-readable-operation port) (wrap-operation (sleep-operation read-timeout) (lambda () (raise-exception (make-port-read-timeout-error thunk port)))))) (no-fibers-wait port "r" read-timeout)))) (current-write-waiter (lambda (port) (if (current-scheduler) (perform-operation (choice-operation (wait-until-port-writable-operation port) (wrap-operation (sleep-operation write-timeout) (lambda () (raise-exception (make-port-write-timeout-error thunk port)))))) (no-fibers-wait port "w" write-timeout))))) (thunk)))