;;; Guix Data Service -- Information about Guix over time ;;; Copyright © 2020 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (guix-data-service utils) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (fibers timers) #:use-module (fibers conditions) #:use-module (prometheus) #:export (call-with-time-logging with-time-logging prevent-inlining-for-tests %thread-pool-threads %thread-pool-idle-seconds %thread-pool-idle-thunk parallel-via-thread-pool-channel par-map& letpar& chunk chunk! chunk-for-each! delete-duplicates/sort! get-gc-metrics-updater)) (define (call-with-time-logging action thunk) (simple-format #t "debug: Starting ~A\n" action) (let ((start-time (current-time))) (let-values ((result (thunk))) (let ((time-taken (- (current-time) start-time))) (simple-format #t "debug: Finished ~A, took ~A seconds\n" action time-taken)) (apply values result)))) (define-syntax-rule (with-time-logging action exp ...) "Log under NAME the time taken to evaluate EXP." (call-with-time-logging action (lambda () exp ...))) (define-syntax-rule (prevent-inlining-for-tests var) (set! var var)) (define %thread-pool-threads (make-parameter 8)) (define %thread-pool-idle-seconds (make-parameter #f)) (define %thread-pool-idle-thunk (make-parameter #f)) (define* (make-thread-pool-channel threads) (define (delay-logger seconds-delayed) (when (> seconds-delayed 1) (format (current-error-port) "warning: thread pool delayed by ~1,2f seconds~%" seconds-delayed))) (define idle-thunk (%thread-pool-idle-thunk)) (define idle-seconds (%thread-pool-idle-seconds)) (let ((channel (make-channel))) (for-each (lambda _ (call-with-new-thread (lambda () (let loop () (match (if idle-seconds (perform-operation (choice-operation (get-operation channel) (wrap-operation (sleep-operation idle-seconds) (const 'timeout)))) (get-message channel)) ('timeout (when idle-thunk (with-exception-handler (lambda (exn) (simple-format (current-error-port) "worker thread idle thunk exception: ~A\n" exn)) idle-thunk #:unwind? #t)) (loop)) (((? channel? reply) sent-time (? procedure? proc)) (let ((time-delay (- (get-internal-real-time) sent-time))) (delay-logger (/ time-delay internal-time-units-per-second)) (put-message reply (with-exception-handler (lambda (exn) (cons 'worker-thread-error exn)) (lambda () (with-exception-handler (lambda (exn) (simple-format (current-error-port) "worker thread: exception: ~A\n" exn) (backtrace) (raise-exception exn)) (lambda () (call-with-values proc (lambda vals vals))))) #:unwind? #t))) (loop)) (_ #f)))))) (iota threads)) channel)) (define %thread-pool-mutex (make-mutex)) (define %thread-pool-channel #f) (define (make-thread-pool-channel!') (with-mutex %thread-pool-mutex (unless %thread-pool-channel (set! %thread-pool-channel (make-thread-pool-channel (%thread-pool-threads))) (set! make-thread-pool-channel! (lambda () #t))))) (define make-thread-pool-channel! (lambda () (make-thread-pool-channel!'))) (define (defer-to-thread-pool-channel thunk) (make-thread-pool-channel!) (let ((reply (make-channel))) (spawn-fiber (lambda () (put-message %thread-pool-channel (list reply (get-internal-real-time) thunk)))) reply)) (define (fetch-result-of-defered-thunk reply-channel) (match (get-message reply-channel) (('worker-thread-error . exn) (raise-exception exn)) (result (apply values result)))) (define (fetch-result-of-defered-thunks . reply-channels) (let ((responses (map get-message reply-channels))) (map (match-lambda (('worker-thread-error . exn) (raise-exception exn)) (result (apply values result))) responses))) (define-syntax parallel-via-thread-pool-channel (lambda (x) (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) #'(let ((tmp0 (defer-to-thread-pool-channel (lambda () e0))) ...) (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) (call-with-values (lambda () (parallel-via-thread-pool-channel e ...)) (lambda (v ...) b0 b1 ...))) (define (par-mapper' mapper cons) (lambda (proc . lists) (let loop ((lists lists)) (match lists (((heads tails ...) ...) (let ((tail (loop tails)) (head (defer-to-thread-pool-channel (lambda () (apply proc heads))))) (cons (fetch-result-of-defered-thunk head) tail))) (_ '()))))) (define par-map& (par-mapper' map cons)) (define (chunk lst max-length) (if (> (length lst) max-length) (call-with-values (lambda () (split-at lst max-length)) (lambda (first-lst rest) (cons first-lst (chunk rest max-length)))) (list lst))) (define (chunk! lst max-length) (if (> (length lst) max-length) (call-with-values (lambda () (split-at! lst max-length)) (lambda (first-lst rest) (cons first-lst (chunk! rest max-length)))) (list lst))) (define* (chunk-for-each! proc chunk-size #:rest lsts) (define (do-one-iteration lsts) (if (> (length (car lsts)) chunk-size) (let ((chunks-and-rest (map (lambda (lst) (call-with-values (lambda () (split-at! lst chunk-size)) (lambda (first-lst rest) (cons first-lst rest)))) lsts))) (apply proc (map car chunks-and-rest)) (do-one-iteration (map cdr chunks-and-rest))) (apply proc lsts))) (let ((list-lengths (map length lsts))) (unless (eq? 1 (length (delete-duplicates list-lengths))) (error "lists not equal length")) (unless (eq? 0 (first list-lengths)) (do-one-iteration lsts))) #t) (define (delete-duplicates/sort! unsorted-lst less) (if (null? unsorted-lst) unsorted-lst (let ((sorted-lst (sort! unsorted-lst less))) (let loop ((lst (cdr sorted-lst)) (last-element (car sorted-lst)) (result (list (car sorted-lst)))) (if (null? lst) result (let ((current-element (car lst))) (if (eq? current-element last-element) (loop (cdr lst) last-element result) (loop (cdr lst) current-element (cons current-element result))))))))) (define (get-gc-metrics-updater registry) (define metrics `((gc-time-taken . ,(make-gauge-metric registry "guile_gc_time_taken")) (heap-size . ,(make-gauge-metric registry "guile_heap_size")) (heap-free-size . ,(make-gauge-metric registry "guile_heap_free_size")) (heap-total-allocated . ,(make-gauge-metric registry "guile_heap_total_allocated")) (heap-allocated-since-gc . ,(make-gauge-metric registry "guile_allocated_since_gc")) (protected-objects . ,(make-gauge-metric registry "guile_gc_protected_objects")) (gc-times . ,(make-gauge-metric registry "guile_gc_times")))) (lambda () (let ((stats (gc-stats))) (for-each (match-lambda ((name . metric) (let ((value (assq-ref stats name))) (metric-set metric value)))) metrics))))