;;; Guix QA Frontpage ;;; ;;; Copyright © 2023 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (guix-qa-frontpage utils) #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (fibers) #:use-module (fibers channels) #:use-module ((guix-build-coordinator utils) #:select (with-port-timeouts)) #:use-module (guix-build-coordinator utils fibers) #:export (fiberize fibers-map fibers-batch-for-each fibers-for-each non-blocking) #:re-export (with-fibers-port-timeouts)) (define* (fiberize proc #:key (parallelism 1)) (let ((channel (make-channel))) (for-each (lambda _ (spawn-fiber (lambda () (while #t (let ((reply-channel args (car+cdr (get-message channel)))) (put-message reply-channel (with-exception-handler (lambda (exn) (cons 'exception exn)) (lambda () (with-throw-handler #t (lambda () (call-with-values (lambda () (apply proc args)) (lambda vals (cons 'result vals)))) (lambda _ (backtrace)))) #:unwind? #t))))) #:parallel? #t)) (iota parallelism)) (lambda args (let ((reply-channel (make-channel))) (put-message channel (cons reply-channel args)) (match (get-message reply-channel) (('result . vals) (apply values vals)) (('exception . exn) (raise-exception exn))))))) (define (fibers-map proc . lists) (let ((channels (apply map (lambda args (let ((channel (make-channel))) (spawn-fiber (lambda () (put-message channel (with-exception-handler (lambda (exn) (cons 'exception exn)) (lambda () (with-throw-handler #t (lambda () (call-with-values (lambda () (apply proc args)) (lambda val (cons 'result val)))) (lambda _ (backtrace)))) #:unwind? #t)))) channel)) lists))) (map (match-lambda (('result . val) val) (('exception . exn) (raise-exception exn))) (map get-message channels)))) (define (fibers-batch-for-each proc batch-size . lists) ;; Like split-at, but don't care about the order of the resulting lists, and ;; don't error if the list is shorter than i elements (define (split-at* lst i) (let lp ((l lst) (n i) (acc '())) (if (or (<= n 0) (null? l)) (values (reverse! acc) l) (lp (cdr l) (- n 1) (cons (car l) acc))))) ;; As this can be called with lists with tens of thousands of items in them, ;; batch the (define (get-batch lists) (let ((split-lists (map (lambda (lst) (let ((batch rest (split-at* lst batch-size))) (cons batch rest))) lists))) (values (map car split-lists) (map cdr split-lists)))) (let loop ((lists lists)) (call-with-values (lambda () (get-batch lists)) (lambda (batch rest) (apply fibers-map proc batch) (unless (null? (car rest)) (loop rest))))) *unspecified*) (define (fibers-for-each proc . lists) (apply fibers-batch-for-each proc 20 lists)) (define (non-blocking thunk) (let ((channel (make-channel))) (call-with-new-thread (lambda () (with-exception-handler (lambda (exn) (put-message channel `(exception ,exn))) (lambda () (with-throw-handler #t (lambda () (call-with-values (lambda () ;; This is mostly to set non fibers IO waiters (with-port-timeouts thunk #:timeout (* 300 1000))) (lambda values (put-message channel `(values ,@values))))) (lambda args (display (backtrace) (current-error-port)) (newline (current-error-port))))) #:unwind? #t))) (match (get-message channel) (('values . results) (apply values results)) (('exception . exn) (raise-exception exn)))))