;;; Guix Data Service -- Information about Guix over time ;;; Copyright © 2020 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (guix-data-service utils) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 atomic) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (ice-9 ports internal) #:use-module (ice-9 suspendable-ports) #:use-module (lzlib) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (fibers timers) #:use-module (fibers conditions) #:use-module (fibers scheduler) #:use-module (prometheus) #:export (call-with-time-logging with-time-logging prevent-inlining-for-tests resource-pool-default-timeout %resource-pool-timeout-handler make-resource-pool destroy-resource-pool call-with-resource-from-pool with-resource-from-pool resource-pool-stats parallel-via-fibers par-map& letpar& chunk chunk! chunk-for-each! delete-duplicates/sort! get-guix-metrics-updater call-with-sigint run-server/patched spawn-port-monitoring-fiber)) (define (call-with-time-logging action thunk) (simple-format #t "debug: Starting ~A\n" action) (let ((start-time (current-time))) (let-values ((result (thunk))) (let ((time-taken (- (current-time) start-time))) (simple-format #t "debug: Finished ~A, took ~A seconds\n" action time-taken)) (apply values result)))) (define-syntax-rule (with-time-logging action exp ...) "Log under NAME the time taken to evaluate EXP." (call-with-time-logging action (lambda () exp ...))) (define-syntax-rule (prevent-inlining-for-tests var) (set! var var)) (define* (make-resource-pool initializer max-size #:key (min-size max-size) (idle-seconds #f) (delay-logger (const #f)) (duration-logger (const #f)) destructor lifetime (name "unnamed")) (define (initializer/safe) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running ~A resource pool initializer: ~A:\n ~A\n" name initializer exn) #f) (lambda () (with-throw-handler #t initializer (lambda args (backtrace)))) #:unwind? #t)) (define (destructor/safe args) (let ((success? (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running resource pool destructor (~A): ~A:\n ~A\n" name destructor exn) #f) (lambda () (with-throw-handler #t (lambda () (destructor args) #t) (lambda _ (backtrace)))) #:unwind? #t))) (or success? #t (begin (sleep 5) (destructor/safe args))))) (let ((channel (make-channel)) (checkout-failure-count 0)) (spawn-fiber (lambda () (while #t (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception in the ~A pool fiber: ~A\n" name exn)) (lambda () (let loop ((resources '()) (available '()) (waiters '()) (resources-last-used '())) (match (if idle-seconds (perform-operation (choice-operation (get-operation channel) (wrap-operation ;; TODO Do something smarter (sleep-operation 10) (const '(check-for-idle-resources))))) (get-message channel)) (('checkout reply) (if (null? available) (if (= (length resources) max-size) (loop resources available (cons reply waiters) resources-last-used) (let ((new-resource (initializer/safe))) (if new-resource (let ((checkout-success? (perform-operation (choice-operation (wrap-operation (put-operation reply new-resource) (const #t)) (wrap-operation (sleep-operation 1) (const #f)))))) (unless checkout-success? (set! checkout-failure-count (+ 1 checkout-failure-count))) (loop (cons new-resource resources) (if checkout-success? available (cons new-resource available)) waiters (cons (get-internal-real-time) resources-last-used))) (loop resources available (cons reply waiters) resources-last-used)))) (let ((checkout-success? (perform-operation (choice-operation (wrap-operation (put-operation reply (car available)) (const #t)) (wrap-operation (sleep-operation 1) (const #f)))))) (unless checkout-success? (set! checkout-failure-count (+ 1 checkout-failure-count))) (if checkout-success? (loop resources (cdr available) waiters resources-last-used) (loop resources available waiters resources-last-used))))) (('return resource) ;; When a resource is returned, prompt all the waiters to request ;; again. This is to avoid the pool waiting on channels that may ;; be dead. (for-each (lambda (waiter) (spawn-fiber (lambda () (perform-operation (choice-operation (put-operation waiter 'resource-pool-retry-checkout) (sleep-operation 0.2)))))) waiters) (loop resources (cons resource available) ;; clear waiters, as they've been notified '() (begin (list-set! resources-last-used (list-index (lambda (x) (eq? x resource)) resources) (get-internal-real-time)) resources-last-used))) (('stats reply) (let ((stats `((resources . ,(length resources)) (available . ,(length available)) (waiters . ,(length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (perform-operation (choice-operation (wrap-operation (put-operation reply stats) (const #t)) (wrap-operation (sleep-operation 1) (const #f))))))) (loop resources available waiters resources-last-used)) (('check-for-idle-resources) (let* ((resources-last-used-seconds (map (lambda (internal-time) (/ (- (get-internal-real-time) internal-time) internal-time-units-per-second)) resources-last-used)) (resources-to-destroy (filter-map (lambda (resource last-used-seconds) (if (and (member resource available) (> last-used-seconds idle-seconds)) resource #f)) resources resources-last-used-seconds))) (for-each (lambda (resource) (destructor/safe resource)) resources-to-destroy) (loop (lset-difference eq? resources resources-to-destroy) (lset-difference eq? available resources-to-destroy) waiters (filter-map (lambda (resource last-used) (if (memq resource resources-to-destroy) #f last-used)) resources resources-last-used)))) (('destroy reply) (if (= (length resources) (length available)) (begin (for-each (lambda (resource) (destructor/safe resource)) resources) (put-message reply 'destroy-success)) (begin (spawn-fiber (lambda () (perform-operation (choice-operation (put-operation reply 'resource-pool-destroy-failed) (sleep-operation 10))))) (loop resources available waiters resources-last-used)))) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop resources available waiters resources-last-used))))) #:unwind? #t)))) channel)) (define (destroy-resource-pool pool) (let ((reply (make-channel))) (put-message pool (list 'destroy reply)) (let ((msg (get-message reply))) (unless (eq? msg 'destroy-success) (error msg))))) (define resource-pool-default-timeout (make-parameter #f)) (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout &error '())) (define make-resource-pool-timeout-error (record-constructor &resource-pool-timeout)) (define resource-pool-timeout-error? (record-predicate &resource-pool-timeout)) (define %resource-pool-timeout-handler (make-parameter #f)) (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) (timeout-handler (%resource-pool-timeout-handler))) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." (define timeout-or-default (if (eq? timeout 'default) (resource-pool-default-timeout) timeout)) (let ((resource (let ((reply (make-channel))) (if timeout-or-default (let loop ((start-time (get-internal-real-time))) (perform-operation (choice-operation (wrap-operation (put-operation pool `(checkout ,reply)) (const #t)) (wrap-operation (sleep-operation timeout-or-default) (const #f)))) (let ((time-remaining (- timeout-or-default (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (let ((response (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (const #f)))))) (if (or (not response) (eq? response 'resource-pool-retry-checkout)) (if (> (- timeout-or-default (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)) 0) (loop start-time) #f) response)) #f))) (let loop () (put-message pool `(checkout ,reply)) (let ((response (get-message reply))) (if (eq? response 'resource-pool-retry-checkout) (loop) response))))))) (when (or (not resource) (eq? resource 'resource-pool-retry-checkout)) (when timeout-handler (timeout-handler pool proc timeout)) (raise-exception (make-resource-pool-timeout-error))) (with-exception-handler (lambda (exception) (put-message pool `(return ,resource)) (raise-exception exception)) (lambda () (call-with-values (lambda () (with-throw-handler #t (lambda () (proc resource)) (lambda _ (backtrace)))) (lambda vals (put-message pool `(return ,resource)) (apply values vals)))) #:unwind? #t))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool pool (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) (let ((reply (make-channel)) (start-time (get-internal-real-time))) (perform-operation (choice-operation (wrap-operation (put-operation pool `(stats ,reply)) (const #t)) (wrap-operation (sleep-operation timeout) (const #f)))) (let ((time-remaining (- timeout (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (let ((response (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (const #f)))))) response) (raise-exception (make-resource-pool-timeout-error)))))) (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) (spawn-fiber (lambda () (with-exception-handler (lambda (exn) (put-message reply (cons 'exception exn))) (lambda () (call-with-values (lambda () (with-throw-handler #t thunk (lambda _ (backtrace)))) (lambda vals (put-message reply vals)))) #:unwind? #t)) #:parallel? #t) reply)) (define (fetch-result-of-defered-thunks . reply-channels) (let ((responses (map get-message reply-channels))) (map (match-lambda (('exception . exn) (raise-exception exn)) (result (apply values result))) responses))) (define-syntax parallel-via-fibers (lambda (x) (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) #'(let ((tmp0 (defer-to-parallel-fiber (lambda () e0))) ...) (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) (call-with-values (lambda () (parallel-via-fibers e ...)) (lambda (v ...) b0 b1 ...))) (define (par-mapper' mapper cons) (lambda (proc . lists) (apply fetch-result-of-defered-thunks (let loop ((lists lists)) (match lists (((heads tails ...) ...) (let ((tail (loop tails)) (head (defer-to-parallel-fiber (lambda () (apply proc heads))))) (cons head tail))) (_ '())))))) (define par-map& (par-mapper' map cons)) (define (chunk lst max-length) (if (> (length lst) max-length) (call-with-values (lambda () (split-at lst max-length)) (lambda (first-lst rest) (cons first-lst (chunk rest max-length)))) (list lst))) (define (chunk! lst max-length) (if (> (length lst) max-length) (call-with-values (lambda () (split-at! lst max-length)) (lambda (first-lst rest) (cons first-lst (chunk! rest max-length)))) (list lst))) (define* (chunk-for-each! proc chunk-size #:rest lsts) (define (do-one-iteration lsts) (if (> (length (car lsts)) chunk-size) (let ((chunks-and-rest (map (lambda (lst) (call-with-values (lambda () (split-at! lst chunk-size)) (lambda (first-lst rest) (cons first-lst rest)))) lsts))) (apply proc (map car chunks-and-rest)) (do-one-iteration (map cdr chunks-and-rest))) (apply proc lsts))) (let ((list-lengths (map length lsts))) (unless (eq? 1 (length (delete-duplicates list-lengths))) (error "lists not equal length")) (unless (eq? 0 (first list-lengths)) (do-one-iteration lsts))) #t) (define* (delete-duplicates/sort! unsorted-lst less #:optional (equal? equal?)) (if (null? unsorted-lst) unsorted-lst (let ((sorted-lst (sort! unsorted-lst less))) (let loop ((lst (cdr sorted-lst)) (last-element (car sorted-lst)) (result (list (car sorted-lst)))) (if (null? lst) result (let ((current-element (car lst))) (if (equal? current-element last-element) (loop (cdr lst) last-element result) (loop (cdr lst) current-element (cons current-element result))))))))) (define (get-guix-metrics-updater registry) (define guix-db "/var/guix/db/db.sqlite") (define guix-db-wal (string-append guix-db "-wal")) (let ((guix-db-bytes-metric (make-gauge-metric registry "guix_db_bytes")) (guix-db-wal-bytes-metric (make-gauge-metric registry "guix_db_wal_bytes"))) (lambda () (with-exception-handler (lambda _ #f) (lambda () (metric-set guix-db-bytes-metric (stat:size (stat guix-db))) (metric-set guix-db-wal-bytes-metric (if (file-exists? guix-db-wal) (stat:size (stat guix-db-wal)) 0))) #:unwind? #t)))) ;; This variant of run-server from the fibers library supports running ;; multiple servers within one process. (define run-server/patched (let ((fibers-web-server-module (resolve-module '(fibers web server)))) (define set-nonblocking! (module-ref fibers-web-server-module 'set-nonblocking!)) (define make-default-socket (module-ref fibers-web-server-module 'make-default-socket)) (define socket-loop (module-ref fibers-web-server-module 'socket-loop)) (lambda* (handler #:key (host #f) (family AF_INET) (addr (if host (inet-pton family host) INADDR_LOOPBACK)) (port 8080) (socket (make-default-socket family addr port))) ;; We use a large backlog by default. If the server is suddenly hit ;; with a number of connections on a small backlog, clients won't ;; receive confirmation for their SYN, leading them to retry -- ;; probably successfully, but with a large latency. (listen socket 1024) (set-nonblocking! socket) (sigaction SIGPIPE SIG_IGN) (spawn-fiber (lambda () (socket-loop socket handler)))))) (define &port-timeout (make-exception-type '&port-timeout &external-error '(port))) (define make-port-timeout-error (record-constructor &port-timeout)) (define port-timeout-error? (record-predicate &port-timeout)) (define &port-read-timeout (make-exception-type '&port-read-timeout &port-timeout '())) (define make-port-read-timeout-error (record-constructor &port-read-timeout)) (define port-read-timeout-error? (record-predicate &port-read-timeout)) (define &port-write-timeout (make-exception-type '&port-write-timeout &port-timeout '())) (define make-port-write-timeout-error (record-constructor &port-write-timeout)) (define port-write-timeout-error? (record-predicate &port-write-timeout)) ;; These procedure are subject to spurious wakeups. (define (readable? port) "Test if PORT is writable." (match (select (vector port) #() #() 0) ((#() #() #()) #f) ((#(_) #() #()) #t))) (define (writable? port) "Test if PORT is writable." (match (select #() (vector port) #() 0) ((#() #() #()) #f) ((#() #(_) #()) #t))) (define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) (make-base-operation #f (lambda _ (and (ready? (port-ready-fd port)) values)) (lambda (flag sched resume) (define (commit) (match (atomic-box-compare-and-swap! flag 'W 'S) ('W (resume values)) ('C (commit)) ('S #f))) (schedule-when-ready sched (port-ready-fd port) commit)))) (define (wait-until-port-readable-operation port) "Make an operation that will succeed when PORT is readable." (unless (input-port? port) (error "refusing to wait forever for input on non-input port")) (make-wait-operation readable? schedule-task-when-fd-readable port port-read-wait-fd wait-until-port-readable-operation)) (define (wait-until-port-writable-operation port) "Make an operation that will succeed when PORT is writable." (unless (output-port? port) (error "refusing to wait forever for output on non-output port")) (make-wait-operation writable? schedule-task-when-fd-writable port port-write-wait-fd wait-until-port-writable-operation)) (define* (with-fibers-port-timeouts thunk #:key timeout (read-timeout timeout) (write-timeout timeout)) (define (no-fibers-wait port mode timeout) (define poll-timeout-ms 200) ;; When the GC runs, it restarts the poll syscall, but the timeout ;; remains unchanged! When the timeout is longer than the time ;; between the syscall restarting, I think this renders the ;; timeout useless. Therefore, this code uses a short timeout, and ;; repeatedly calls poll while watching the clock to see if it has ;; timed out overall. (let ((timeout-internal (+ (get-internal-real-time) (* internal-time-units-per-second (/ timeout 1000))))) (let loop ((poll-value (port-poll port mode poll-timeout-ms))) (if (= poll-value 0) (if (> (get-internal-real-time) timeout-internal) (raise-exception (if (string=? mode "r") (make-port-read-timeout-error port) (make-port-write-timeout-error port))) (loop (port-poll port mode poll-timeout-ms))) poll-value)))) (parameterize ((current-read-waiter (lambda (port) (if (current-scheduler) (perform-operation (choice-operation (wait-until-port-readable-operation port) (wrap-operation (sleep-operation read-timeout) (lambda () (raise-exception (make-port-read-timeout-error thunk port)))))) (no-fibers-wait port "r" read-timeout)))) (current-write-waiter (lambda (port) (if (current-scheduler) (perform-operation (choice-operation (wait-until-port-writable-operation port) (wrap-operation (sleep-operation write-timeout) (lambda () (raise-exception (make-port-write-timeout-error thunk port)))))) (no-fibers-wait port "w" write-timeout))))) (thunk))) (define (spawn-port-monitoring-fiber port error-condition) (spawn-fiber (lambda () (while #t (sleep 20) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "port monitoring fiber failed to connect to ~A: ~A\n" port exn) (signal-condition! error-condition)) (lambda () (with-fibers-port-timeouts (lambda () (let ((sock (socket PF_INET SOCK_STREAM 0))) (connect sock AF_INET INADDR_LOOPBACK port) (close-port sock))) #:timeout 20)) #:unwind? #t))))) ;; Copied from (fibers web server) (define (call-with-sigint thunk cvar) (let ((handler #f)) (dynamic-wind (lambda () (set! handler (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) thunk (lambda () (if handler ;; restore Scheme handler, SIG_IGN or SIG_DFL. (sigaction SIGINT (car handler) (cdr handler)) ;; restore original C handler. (sigaction SIGINT #f))))))