diff options
-rw-r--r-- | guix-data-service/jobs/load-new-guix-revision.scm | 22 | ||||
-rw-r--r-- | guix-data-service/utils.scm | 1120 | ||||
-rw-r--r-- | guix-data-service/web/build-server/controller.scm | 1 | ||||
-rw-r--r-- | guix-data-service/web/build/controller.scm | 6 | ||||
-rw-r--r-- | guix-data-service/web/compare/controller.scm | 38 | ||||
-rw-r--r-- | guix-data-service/web/controller.scm | 18 | ||||
-rw-r--r-- | guix-data-service/web/jobs/controller.scm | 4 | ||||
-rw-r--r-- | guix-data-service/web/nar/controller.scm | 4 | ||||
-rw-r--r-- | guix-data-service/web/package/controller.scm | 4 | ||||
-rw-r--r-- | guix-data-service/web/repository/controller.scm | 36 | ||||
-rw-r--r-- | guix-data-service/web/revision/controller.scm | 62 | ||||
-rw-r--r-- | guix-data-service/web/server.scm | 11 | ||||
-rw-r--r-- | guix-dev.scm | 33 |
13 files changed, 154 insertions, 1205 deletions
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index dfa41ec..e18528a 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -38,6 +38,12 @@ #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) + #:use-module (knots) + #:use-module (knots queue) + #:use-module (knots promise) + #:use-module (knots parallelism) + #:use-module (knots resource-pool) + #:use-module (knots worker-threads) #:use-module (guix monads) #:use-module (guix base32) #:use-module (guix store) @@ -1127,7 +1133,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (insert-derivations))) (unless (null? derivations) - (parallel-via-fibers + (fibers-parallel (insert-sources derivations derivation-ids) (with-time-logging @@ -1906,7 +1912,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (inferior-lint-checkers inferior))))) (when inferior-lint-checkers-data - (letpar& ((lint-checker-ids + (fibers-let ((lint-checker-ids (with-resource-from-pool postgresql-connection-pool conn (lint-checkers->lint-checker-ids conn @@ -2181,7 +2187,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (with-time-logging (simple-format #f "extract-information-from: ~A\n" store-item) - (parallel-via-fibers + (fibers-parallel (begin (fibers-force package-ids-promise) #f) @@ -2267,7 +2273,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" extra-inferior-environment-variables) (define utility-thread-channel ;; There might be high demand for this, so order the requests - (make-queueing-channel + (spawn-queueing-fiber (call-with-default-io-waiters (lambda () (make-worker-thread-channel @@ -2791,6 +2797,12 @@ SKIP LOCKED") (exec-query conn "BEGIN") + ;; (spawn-fiber + ;; (lambda () + ;; (while #t + ;; (sleep (* 60 5)) + ;; (profile-heap)))) + (spawn-fiber (lambda () (while (perform-operation @@ -2864,7 +2876,7 @@ SKIP LOCKED") id)))))) (when result - (parallel-via-fibers + (fibers-parallel (with-postgresql-connection (simple-format #f "post load-new-guix-revision ~A counts" id) (lambda (conn) diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index b53f33f..a447a9c 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -38,43 +38,14 @@ #:use-module (fibers timers) #:use-module (fibers conditions) #:use-module (fibers scheduler) + #:use-module (knots timeout) #:use-module (prometheus) #:export (call-with-time-logging with-time-logging prevent-inlining-for-tests - resource-pool-default-timeout - resource-pool-retry-checkout-timeout - %resource-pool-timeout-handler - resource-pool-timeout-error? - make-resource-pool - destroy-resource-pool - call-with-resource-from-pool - with-resource-from-pool - resource-pool-stats - - call-with-default-io-waiters - make-worker-thread-channel - %worker-thread-default-timeout - call-with-worker-thread - worker-thread-timeout-error? - fiberize - fibers-delay - fibers-force - fibers-promise-reset - - fibers-batch-for-each - fibers-for-each - fibers-batch-map - fibers-map - - parallel-via-fibers - par-map& - letpar& - fibers-map-with-progress - chunk chunk! chunk-for-each! @@ -84,7 +55,6 @@ get-guix-metrics-updater call-with-sigint - run-server/patched spawn-port-monitoring-fiber @@ -107,674 +77,6 @@ (define-syntax-rule (prevent-inlining-for-tests var) (set! var var)) -(define-record-type <resource-pool> - (make-resource-pool-record name channel) - resource-pool? - (name resource-pool-name) - (channel resource-pool-channel)) - -(define* (make-resource-pool initializer max-size - #:key (min-size max-size) - (idle-seconds #f) - (delay-logger (const #f)) - (duration-logger (const #f)) - destructor - lifetime - scheduler - (name "unnamed")) - (define (initializer/safe) - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running ~A resource pool initializer: ~A:\n ~A\n" - name - initializer - exn) - #f) - (lambda () - (with-throw-handler #t - initializer - (lambda args - (backtrace)))) - #:unwind? #t)) - - (define (destructor/safe args) - (let ((success? - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running resource pool destructor (~A): ~A:\n ~A\n" - name - destructor - exn) - #f) - (lambda () - (with-throw-handler #t - (lambda () - (destructor args) - #t) - (lambda _ - (backtrace)))) - #:unwind? #t))) - - (or success? - #t - (begin - (sleep 5) - (destructor/safe args))))) - - (let ((channel (make-channel)) - (checkout-failure-count 0)) - (spawn-fiber - (lambda () - (when idle-seconds - (spawn-fiber - (lambda () - (while #t - (sleep idle-seconds) - (put-message channel '(check-for-idle-resources)))))) - - (while #t - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception in the ~A pool fiber: ~A\n" - name - exn)) - (lambda () - (let loop ((resources '()) - (available '()) - (waiters '()) - (resources-last-used '())) - - (match (get-message channel) - (('checkout reply) - (if (null? available) - (if (= (length resources) max-size) - (loop resources - available - (cons reply waiters) - resources-last-used) - (let ((new-resource (initializer/safe))) - (if new-resource - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply new-resource) - (const #t)) - (wrap-operation (sleep-operation 1) - (const #f)))))) - (unless checkout-success? - (set! checkout-failure-count - (+ 1 checkout-failure-count))) - - (loop (cons new-resource resources) - (if checkout-success? - available - (cons new-resource available)) - waiters - (cons (get-internal-real-time) - resources-last-used))) - (loop resources - available - (cons reply waiters) - resources-last-used)))) - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply (car available)) - (const #t)) - (wrap-operation (sleep-operation 1) - (const #f)))))) - (unless checkout-success? - (set! checkout-failure-count - (+ 1 checkout-failure-count))) - - (if checkout-success? - (loop resources - (cdr available) - waiters - resources-last-used) - (loop resources - available - waiters - resources-last-used))))) - (('return resource) - (if (null? waiters) - (loop resources - (cons resource available) - waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)) - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation (last waiters) - resource) - (const #t)) - (wrap-operation (sleep-operation 1) - (const #f)))))) - (unless checkout-success? - (set! checkout-failure-count - (+ 1 checkout-failure-count))) - - (if checkout-success? - (loop resources - available - (drop-right! waiters 1) - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)) - (begin - (for-each - (lambda (waiter) - (spawn-fiber - (lambda () - (perform-operation - (choice-operation - (put-operation waiter 'resource-pool-retry-checkout) - (sleep-operation 10)))))) - waiters) - - (loop resources - (cons resource available) - '() - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used))))))) - (('stats reply) - (let ((stats - `((resources . ,(length resources)) - (available . ,(length available)) - (waiters . ,(length waiters)) - (checkout-failure-count . ,checkout-failure-count)))) - - (spawn-fiber - (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 1) - (const #f))))))) - - (loop resources - available - waiters - resources-last-used)) - (('check-for-idle-resources) - (let* ((resources-last-used-seconds - (map - (lambda (internal-time) - (/ (- (get-internal-real-time) internal-time) - internal-time-units-per-second)) - resources-last-used)) - (resources-to-destroy - (filter-map - (lambda (resource last-used-seconds) - (if (and (member resource available) - (> last-used-seconds idle-seconds)) - resource - #f)) - resources - resources-last-used-seconds))) - - (for-each - (lambda (resource) - (destructor/safe resource)) - resources-to-destroy) - - (loop (lset-difference eq? resources resources-to-destroy) - (lset-difference eq? available resources-to-destroy) - waiters - (filter-map - (lambda (resource last-used) - (if (memq resource resources-to-destroy) - #f - last-used)) - resources - resources-last-used)))) - (('destroy reply) - (if (= (length resources) (length available)) - (begin - (for-each - (lambda (resource) - (destructor/safe resource)) - resources) - (put-message reply 'destroy-success)) - (begin - (spawn-fiber - (lambda () - (perform-operation - (choice-operation - (put-operation reply 'resource-pool-destroy-failed) - (sleep-operation 10))))) - (loop resources - available - waiters - resources-last-used)))) - (unknown - (simple-format - (current-error-port) - "unrecognised message to ~A resource pool channel: ~A\n" - name - unknown) - (loop resources - available - waiters - resources-last-used))))) - #:unwind? #t))) - (or scheduler - (current-scheduler))) - - (make-resource-pool-record name channel))) - -(define (destroy-resource-pool pool) - (let ((reply (make-channel))) - (put-message (resource-pool-channel pool) - (list 'destroy reply)) - (let ((msg (get-message reply))) - (unless (eq? msg 'destroy-success) - (error msg))))) - -(define resource-pool-default-timeout - (make-parameter #f)) - -(define resource-pool-retry-checkout-timeout - (make-parameter 5)) - -(define &resource-pool-timeout - (make-exception-type '&recource-pool-timeout - &error - '(name))) - -(define make-resource-pool-timeout-error - (record-constructor &resource-pool-timeout)) - -(define resource-pool-timeout-error? - (record-predicate &resource-pool-timeout)) - -(define %resource-pool-timeout-handler - (make-parameter #f)) - -(define* (call-with-resource-from-pool pool proc #:key (timeout 'default) - (timeout-handler (%resource-pool-timeout-handler))) - "Call PROC with a resource from POOL, blocking until a resource becomes -available. Return the resource once PROC has returned." - - (define retry-timeout - (resource-pool-retry-checkout-timeout)) - - (define timeout-or-default - (if (eq? timeout 'default) - (resource-pool-default-timeout) - timeout)) - - (let ((resource - (let ((reply (make-channel))) - (let loop ((start-time (get-internal-real-time))) - (let ((request-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation (resource-pool-channel pool) - `(checkout ,reply)) - (const #t)) - (wrap-operation (sleep-operation (or timeout-or-default - retry-timeout)) - (const #f)))))) - (if request-success? - (let ((time-remaining - (- (or timeout-or-default - retry-timeout) - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)))) - (if (> time-remaining 0) - (let ((response - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (const #f)))))) - (if (or (not response) - (eq? response 'resource-pool-retry-checkout)) - (if (> (- (or timeout-or-default - retry-timeout) - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)) - 0) - (loop start-time) - (if (eq? timeout-or-default #f) - (loop (get-internal-real-time)) - #f)) - response)) - (if (eq? timeout-or-default #f) - (loop (get-internal-real-time)) - #f))) - (if (eq? timeout-or-default #f) - (loop (get-internal-real-time)) - #f))))))) - - (when (or (not resource) - (eq? resource 'resource-pool-retry-checkout)) - (when timeout-handler - (timeout-handler pool proc timeout)) - - (raise-exception - (make-resource-pool-timeout-error (resource-pool-name pool)))) - - (with-exception-handler - (lambda (exception) - (put-message (resource-pool-channel pool) - `(return ,resource)) - (raise-exception exception)) - (lambda () - (call-with-values - (lambda () - (with-throw-handler #t - (lambda () - (proc resource)) - (lambda _ - (backtrace)))) - (lambda vals - (put-message (resource-pool-channel pool) - `(return ,resource)) - (apply values vals)))) - #:unwind? #t))) - -(define-syntax-rule (with-resource-from-pool pool resource exp ...) - (call-with-resource-from-pool - pool - (lambda (resource) exp ...))) - -(define* (resource-pool-stats pool #:key (timeout 5)) - (let ((reply (make-channel)) - (start-time (get-internal-real-time))) - (perform-operation - (choice-operation - (wrap-operation - (put-operation (resource-pool-channel pool) - `(stats ,reply)) - (const #t)) - (wrap-operation (sleep-operation timeout) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error)))))) - - (let ((time-remaining - (- timeout - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)))) - (if (> time-remaining 0) - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error)))))) - (raise-exception - (make-resource-pool-timeout-error)))))) - -(define (call-with-default-io-waiters thunk) - (parameterize - ((current-read-waiter (@@ (ice-9 suspendable-ports) - default-read-waiter)) - (current-write-waiter (@@ (ice-9 suspendable-ports) - default-write-waiter))) - (thunk))) - -(define %worker-thread-args - (make-parameter #f)) - -(define* (make-worker-thread-channel initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - destructor - lifetime - (log-exception? (const #t)) - (expire-on-exception? #f) - (name "unnamed")) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." - (define thread-proc-vector - (make-vector parallelism #f)) - - (define (initializer/safe) - (let ((args - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running initializer in worker thread (~A): ~A:\n ~A\n" - name - initializer - exn) - #f) - (lambda () - (with-throw-handler #t - initializer - (lambda args - (backtrace)))) - #:unwind? #t))) - - (if args - args - ;; never give up, just keep retrying - (begin - (sleep 1) - (initializer/safe))))) - - (define (destructor/safe args) - (let ((success? - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running destructor in worker thread (~A): ~A:\n ~A\n" - name - destructor - exn) - #f) - (lambda () - (with-throw-handler #t - (lambda () - (apply destructor args) - #t) - (lambda _ - (backtrace)))) - #:unwind? #t))) - - (or success? - #t - (begin - (sleep 1) - (destructor/safe args))))) - - (define (process thread-index channel args) - (let loop ((current-lifetime lifetime)) - (let ((exception? - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second)) - - (let* ((start-time (get-internal-real-time)) - (response - (with-exception-handler - (lambda (exn) - (list 'worker-thread-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (vector-set! thread-proc-vector - thread-index - proc) - (with-throw-handler #t - (lambda () - (call-with-values - (lambda () - (start-stack - 'worker-thread - (apply proc args))) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - vals)))) - (lambda args - (when (match args - (('%exception exn) - (log-exception? exn)) - (_ #t)) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" args) - (backtrace))))) - #:unwind? #t))) - (put-message reply - response) - - (vector-set! thread-proc-vector - thread-index - #f) - - (match response - (('worker-thread-error duration _) - (when duration-logger - (duration-logger duration proc)) - #t) - ((duration . _) - (when duration-logger - (duration-logger duration proc)) - #f)))))))) - (unless (and expire-on-exception? - exception?) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))))) - - (let ((channel (make-channel))) - (for-each - (lambda (thread-index) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append - name " w t " - (number->string thread-index)))) - (const #t)) - - (let init ((args (initializer/safe))) - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "worker-thread-channel: exception: ~A\n" exn)) - (lambda () - (parameterize ((%worker-thread-args args)) - (process thread-index channel args))) - #:unwind? #t) - - (when destructor - (destructor/safe args)) - - (init (initializer/safe)))))) - (iota parallelism)) - - (values channel - thread-proc-vector))) - -(define &worker-thread-timeout - (make-exception-type '&worker-thread-timeout - &error - '())) - -(define make-worker-thread-timeout-error - (record-constructor &worker-thread-timeout)) - -(define worker-thread-timeout-error? - (record-predicate &worker-thread-timeout)) - -(define %worker-thread-default-timeout - (make-parameter 30)) - -(define* (call-with-worker-thread channel proc #:key duration-logger - (timeout (%worker-thread-default-timeout))) - "Send PROC to the worker thread through CHANNEL. Return the result of PROC. -If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) - (if args - (apply proc args) - (let* ((reply (make-channel)) - (operation-success? - (perform-operation - (let ((put - (wrap-operation - (put-operation channel - (list reply - (get-internal-real-time) - proc)) - (const #t)))) - - (if timeout - (choice-operation - put - (wrap-operation (sleep-operation timeout) - (const #f))) - put))))) - - (unless operation-success? - (raise-exception - (make-worker-thread-timeout-error))) - - (match (get-message reply) - (('worker-thread-error duration exn) - (when duration-logger - (duration-logger duration)) - (raise-exception exn)) - ((duration . result) - (when duration-logger - (duration-logger duration)) - (apply values result))))))) - (define* (fiberize proc #:key (parallelism 1)) (let ((channel (make-channel))) (for-each @@ -810,235 +112,6 @@ If already in the worker thread, call PROC immediately." (('result . vals) (apply values vals)) (('exception . exn) (raise-exception exn))))))) -(define-record-type <fibers-promise> - (make-fibers-promise thunk values-box evaluated-condition) - fibers-promise? - (thunk fibers-promise-thunk) - (values-box fibers-promise-values-box) - (evaluated-condition fibers-promise-evaluated-condition)) - -(define (fibers-delay thunk) - (make-fibers-promise - thunk - (make-atomic-box #f) - (make-condition))) - -(define (fibers-force fp) - (let ((res (atomic-box-compare-and-swap! - (fibers-promise-values-box fp) - #f - 'started))) - (if (eq? #f res) - (call-with-values - (lambda () - (with-exception-handler - (lambda (exn) - (atomic-box-set! (fibers-promise-values-box fp) - exn) - (signal-condition! - (fibers-promise-evaluated-condition fp)) - (raise-exception exn)) - (fibers-promise-thunk fp) - #:unwind? #t)) - (lambda vals - (atomic-box-set! (fibers-promise-values-box fp) - vals) - (signal-condition! - (fibers-promise-evaluated-condition fp)) - (apply values vals))) - (if (eq? res 'started) - (begin - (wait (fibers-promise-evaluated-condition fp)) - (let ((result (atomic-box-ref (fibers-promise-values-box fp)))) - (if (exception? result) - (raise-exception result) - (apply values result)))) - (if (exception? res) - (raise-exception res) - (apply values res)))))) - -(define (fibers-promise-reset fp) - (atomic-box-set! (fibers-promise-values-box fp) - #f)) - -;; Like split-at, but don't care about the order of the resulting lists, and -;; don't error if the list is shorter than i elements -(define (split-at* lst i) - (let lp ((l lst) (n i) (acc '())) - (if (or (<= n 0) (null? l)) - (values (reverse! acc) l) - (lp (cdr l) (- n 1) (cons (car l) acc))))) - -;; As this can be called with lists with tens of thousands of items in them, -;; batch the -(define (get-batch batch-size lists) - (let ((split-lists - (map (lambda (lst) - (let ((batch rest (split-at* lst batch-size))) - (cons batch rest))) - lists))) - (values (map car split-lists) - (map cdr split-lists)))) - -(define (defer-to-parallel-fiber thunk) - (let ((reply (make-channel))) - (spawn-fiber - (lambda () - (with-exception-handler - (lambda (exn) - (put-message reply (cons 'exception exn))) - (lambda () - (call-with-values - (lambda () - (with-throw-handler #t - thunk - (lambda _ - (backtrace)))) - (lambda vals - (put-message reply vals)))) - #:unwind? #t)) - #:parallel? #t) - reply)) - -(define (fetch-result-of-defered-thunks . reply-channels) - (let ((responses (map get-message - reply-channels))) - (map - (match-lambda - (('exception . exn) - (raise-exception exn)) - (result - (apply values result))) - responses))) - -(define (fibers-batch-map proc batch-size . lists) - (let loop ((lists lists) - (result '())) - (let ((batch - rest - (get-batch batch-size lists))) - (if (any null? batch) - result - (let ((response-channels - (apply map - (lambda args - (defer-to-parallel-fiber - (lambda () - (apply proc args)))) - batch))) - (loop rest - (append! result - (apply fetch-result-of-defered-thunks - response-channels)))))))) - -(define (fibers-map proc . lists) - (apply fibers-batch-map proc 20 lists)) - -(define (fibers-batch-for-each proc batch-size . lists) - (let loop ((lists lists)) - (let ((batch - rest - (get-batch batch-size lists))) - (if (any null? batch) - *unspecified* - (let ((response-channels - (apply map - (lambda args - (defer-to-parallel-fiber - (lambda () - (apply proc args)))) - batch))) - (apply fetch-result-of-defered-thunks - response-channels) - (loop rest)))))) - -(define (fibers-for-each proc . lists) - (apply fibers-batch-for-each proc 20 lists)) - -(define-syntax parallel-via-fibers - (lambda (x) - (syntax-case x () - ((_ e0 ...) - (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) - #'(let ((tmp0 (defer-to-parallel-fiber - (lambda () - e0))) - ...) - (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) - -(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) - (call-with-values - (lambda () (parallel-via-fibers e ...)) - (lambda (v ...) - b0 b1 ...))) - -(define (par-mapper' mapper cons) - (lambda (proc . lists) - (apply - fetch-result-of-defered-thunks - (let loop ((lists lists)) - (match lists - (((heads tails ...) ...) - (let ((tail (loop tails)) - (head (defer-to-parallel-fiber - (lambda () - (apply proc heads))))) - (cons head tail))) - (_ - '())))))) - -(define par-map& (par-mapper' map cons)) - -(define* (fibers-map-with-progress proc lists #:key report) - (let loop ((channels-to-results - (apply map - (lambda args - (cons (defer-to-parallel-fiber - (lambda () - (apply proc args))) - #f)) - lists))) - (let ((active-channels - (filter-map car channels-to-results))) - (when report - (report (apply map - list - (map cdr channels-to-results) - lists))) - (if (null? active-channels) - (map - (match-lambda - ((#f . ('exception . exn)) - (raise-exception exn)) - ((#f . ('result . val)) - val)) - channels-to-results) - (loop - (perform-operation - (apply - choice-operation - (filter-map - (lambda (p) - (match p - ((channel . _) - (if channel - (wrap-operation - (get-operation channel) - (lambda (result) - (map (match-lambda - ((c . r) - (if (eq? channel c) - (cons #f - (match result - (('exception . exn) - result) - (_ - (cons 'result result)))) - (cons c r)))) - channels-to-results))) - #f)))) - channels-to-results)))))))) - (define (chunk lst max-length) (if (> (length lst) max-length) @@ -1126,173 +199,6 @@ If already in the worker thread, call PROC immediately." 0))) #:unwind? #t)))) -;; This variant of run-server from the fibers library supports running -;; multiple servers within one process. -(define run-server/patched - (let ((fibers-web-server-module - (resolve-module '(fibers web server)))) - - (define set-nonblocking! - (module-ref fibers-web-server-module 'set-nonblocking!)) - - (define make-default-socket - (module-ref fibers-web-server-module 'make-default-socket)) - - (define socket-loop - (module-ref fibers-web-server-module 'socket-loop)) - - (lambda* (handler - #:key - (host #f) - (family AF_INET) - (addr (if host - (inet-pton family host) - INADDR_LOOPBACK)) - (port 8080) - (socket (make-default-socket family addr port))) - ;; We use a large backlog by default. If the server is suddenly hit - ;; with a number of connections on a small backlog, clients won't - ;; receive confirmation for their SYN, leading them to retry -- - ;; probably successfully, but with a large latency. - (listen socket 1024) - (set-nonblocking! socket) - (sigaction SIGPIPE SIG_IGN) - (spawn-fiber (lambda () (socket-loop socket handler)))))) - -(define &port-timeout - (make-exception-type '&port-timeout - &external-error - '(port))) - -(define make-port-timeout-error - (record-constructor &port-timeout)) - -(define port-timeout-error? - (record-predicate &port-timeout)) - -(define &port-read-timeout - (make-exception-type '&port-read-timeout - &port-timeout - '())) - -(define make-port-read-timeout-error - (record-constructor &port-read-timeout)) - -(define port-read-timeout-error? - (record-predicate &port-read-timeout)) - -(define &port-write-timeout - (make-exception-type '&port-write-timeout - &port-timeout - '())) - -(define make-port-write-timeout-error - (record-constructor &port-write-timeout)) - -(define port-write-timeout-error? - (record-predicate &port-write-timeout)) - -;; These procedure are subject to spurious wakeups. - -(define (readable? port) - "Test if PORT is writable." - (match (select (vector port) #() #() 0) - ((#() #() #()) #f) - ((#(_) #() #()) #t))) - -(define (writable? port) - "Test if PORT is writable." - (match (select #() (vector port) #() 0) - ((#() #() #()) #f) - ((#() #(_) #()) #t))) - -(define (make-wait-operation ready? schedule-when-ready port - port-ready-fd this-procedure) - (make-base-operation - #f - (lambda _ - (and (ready? (port-ready-fd port)) values)) - (lambda (flag sched resume) - (define (commit) - (match (atomic-box-compare-and-swap! flag 'W 'S) - ('W (resume values)) - ('C (commit)) - ('S #f))) - (schedule-when-ready - sched (port-ready-fd port) commit)))) - -(define (wait-until-port-readable-operation port) - "Make an operation that will succeed when PORT is readable." - (unless (input-port? port) - (error "refusing to wait forever for input on non-input port")) - (make-wait-operation readable? schedule-task-when-fd-readable port - port-read-wait-fd - wait-until-port-readable-operation)) - -(define (wait-until-port-writable-operation port) - "Make an operation that will succeed when PORT is writable." - (unless (output-port? port) - (error "refusing to wait forever for output on non-output port")) - (make-wait-operation writable? schedule-task-when-fd-writable port - port-write-wait-fd - wait-until-port-writable-operation)) - -(define* (with-fibers-port-timeouts thunk - #:key timeout - (read-timeout timeout) - (write-timeout timeout)) - (define (no-fibers-wait port mode timeout) - (define poll-timeout-ms 200) - - ;; When the GC runs, it restarts the poll syscall, but the timeout - ;; remains unchanged! When the timeout is longer than the time - ;; between the syscall restarting, I think this renders the - ;; timeout useless. Therefore, this code uses a short timeout, and - ;; repeatedly calls poll while watching the clock to see if it has - ;; timed out overall. - (let ((timeout-internal - (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) - (let loop ((poll-value - (port-poll port mode poll-timeout-ms))) - (if (= poll-value 0) - (if (> (get-internal-real-time) - timeout-internal) - (raise-exception - (if (string=? mode "r") - (make-port-read-timeout-error port) - (make-port-write-timeout-error port))) - (loop (port-poll port mode poll-timeout-ms))) - poll-value)))) - - (parameterize - ((current-read-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-readable-operation port) - (wrap-operation - (sleep-operation read-timeout) - (lambda () - (raise-exception - (make-port-read-timeout-error thunk port)))))) - (no-fibers-wait port "r" read-timeout)))) - (current-write-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-writable-operation port) - (wrap-operation - (sleep-operation write-timeout) - (lambda () - (raise-exception - (make-port-write-timeout-error thunk port)))))) - (no-fibers-wait port "w" write-timeout))))) - (thunk))) - (define (spawn-port-monitoring-fiber port error-condition) (spawn-fiber (lambda () @@ -1305,7 +211,7 @@ If already in the worker thread, call PROC immediately." port exn) (signal-condition! error-condition)) (lambda () - (with-fibers-port-timeouts + (with-port-timeouts (lambda () (let ((sock (socket PF_INET SOCK_STREAM 0))) (connect sock AF_INET INADDR_LOOPBACK port) @@ -1327,25 +233,3 @@ If already in the worker thread, call PROC immediately." (sigaction SIGINT (car handler) (cdr handler)) ;; restore original C handler. (sigaction SIGINT #f)))))) - -(define (make-queueing-channel channel) - (define queue (make-q)) - - (let ((queue-channel (make-channel))) - (spawn-fiber - (lambda () - (while #t - (if (q-empty? queue) - (enq! queue - (perform-operation - (get-operation queue-channel))) - (let ((front (q-front queue))) - (perform-operation - (choice-operation - (wrap-operation (get-operation queue-channel) - (lambda (val) - (enq! queue val))) - (wrap-operation (put-operation channel front) - (lambda _ - (q-pop! queue)))))))))) - queue-channel)) diff --git a/guix-data-service/web/build-server/controller.scm b/guix-data-service/web/build-server/controller.scm index 22088b1..7d2bd24 100644 --- a/guix-data-service/web/build-server/controller.scm +++ b/guix-data-service/web/build-server/controller.scm @@ -22,6 +22,7 @@ #:use-module (json) #:use-module (squee) #:use-module (fibers) + #:use-module (knots resource-pool) #:use-module (prometheus) #:use-module (guix-data-service utils) #:use-module (guix-data-service database) diff --git a/guix-data-service/web/build/controller.scm b/guix-data-service/web/build/controller.scm index bf77e03..7924dbb 100644 --- a/guix-data-service/web/build/controller.scm +++ b/guix-data-service/web/build/controller.scm @@ -18,6 +18,8 @@ (define-module (guix-data-service web build controller) #:use-module (srfi srfi-1) #:use-module (ice-9 match) + #:use-module (knots parallelism) + #:use-module (knots resource-pool) #:use-module (guix-data-service utils) #:use-module (guix-data-service database) #:use-module (guix-data-service web render) @@ -41,7 +43,7 @@ (define parse-build-server (lambda (v) - (letpar& ((build-servers + (fibers-let ((build-servers (call-with-resource-from-pool (connection-pool) select-build-servers))) (or (any (match-lambda @@ -88,7 +90,7 @@ '())) (let ((system (assq-ref parsed-query-parameters 'system)) (target (assq-ref parsed-query-parameters 'target))) - (letpar& ((build-server-options + (fibers-let ((build-server-options (with-resource-from-pool (connection-pool) conn (map (match-lambda ((id url lookup-all-derivations diff --git a/guix-data-service/web/compare/controller.scm b/guix-data-service/web/compare/controller.scm index e1fab78..dbb4975 100644 --- a/guix-data-service/web/compare/controller.scm +++ b/guix-data-service/web/compare/controller.scm @@ -24,6 +24,8 @@ #:use-module (texinfo) #:use-module (texinfo html) #:use-module (texinfo plain-text) + #:use-module (knots parallelism) + #:use-module (knots resource-pool) #:use-module (guix-data-service utils) #:use-module (guix-data-service database) #:use-module (guix-data-service web sxml) @@ -229,7 +231,7 @@ (define (render-compare mime-types query-parameters) (if (any-invalid-query-parameters? query-parameters) - (letpar& ((base-job + (fibers-let ((base-job (match (assq-ref query-parameters 'base_commit) (($ <invalid-query-parameter> value) (with-resource-from-pool (connection-pool) conn @@ -275,7 +277,7 @@ #f #f #f))))) - (letpar& ((base-revision-id + (fibers-let ((base-revision-id (with-resource-from-pool (connection-pool) conn (commit->revision-id conn @@ -303,7 +305,7 @@ (version-changes (package-data-version-changes base-packages-vhash target-packages-vhash))) - (letpar& ((lint-warnings-data + (fibers-let ((lint-warnings-data (with-resource-from-pool (connection-pool) conn (group-list-by-first-n-fields 2 @@ -396,7 +398,7 @@ lint-warnings-data)))) #:extra-headers http-headers-for-unchanging-content)) (else - (letpar& ((lint-warnings-locale-options + (fibers-let ((lint-warnings-locale-options (map (match-lambda ((locale) @@ -449,7 +451,7 @@ (target-branch (assq-ref query-parameters 'target_branch)) (target-datetime (assq-ref query-parameters 'target_datetime)) (locale (assq-ref query-parameters 'locale))) - (letpar& ((base-revision-details + (fibers-let ((base-revision-details (with-resource-from-pool (connection-pool) conn (select-guix-revision-for-branch-and-datetime conn @@ -624,7 +626,7 @@ '(application/json text/html) mime-types) ((application/json) - (letpar& ((base-job + (fibers-let ((base-job (and=> (match (assq-ref query-parameters 'base_commit) (($ <invalid-query-parameter> value) (and (string? value) value)) @@ -663,7 +665,7 @@ (base_job . ,base-job) (target_job . ,target-job))))) (else - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -695,7 +697,7 @@ (limit-results (assq-ref query-parameters 'limit_results))) (let ((data (concatenate! - (par-map& + (fibers-map (lambda (system) (with-resource-from-pool (connection-pool) conn (package-derivation-differences-data @@ -734,7 +736,7 @@ . ,derivation-changes)) #:stream? #t)) (else - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -788,7 +790,7 @@ string->symbol)) (after-name (assq-ref query-parameters 'after_name)) (limit-results (assq-ref query-parameters 'limit_results))) - (letpar& + (fibers-let ((base-revision-details (with-resource-from-pool (connection-pool) conn (select-guix-revision-for-branch-and-datetime conn @@ -800,7 +802,7 @@ target-branch target-datetime)))) (let ((data - (par-map& + (fibers-map (lambda (system) (with-resource-from-pool (connection-pool) conn (package-derivation-differences-data @@ -875,7 +877,7 @@ (render-json '((error . "invalid query")))) (else - (letpar& ((base-job + (fibers-let ((base-job (match (assq-ref query-parameters 'base_commit) (($ <invalid-query-parameter> value) (with-resource-from-pool (connection-pool) conn @@ -895,7 +897,7 @@ (let ((base-commit (assq-ref query-parameters 'base_commit)) (target-commit (assq-ref query-parameters 'target_commit))) - (letpar& ((base-revision-id + (fibers-let ((base-revision-id (with-resource-from-pool (connection-pool) conn (commit->revision-id conn @@ -944,7 +946,7 @@ (render-json '((error . "invalid query")))) (else - (letpar& ((systems + (fibers-let ((systems (with-resource-from-pool (connection-pool) conn list-systems)) (build-server-urls @@ -963,7 +965,7 @@ (let ((base-commit (assq-ref query-parameters 'base_commit)) (target-commit (assq-ref query-parameters 'target_commit)) (system (assq-ref query-parameters 'system))) - (letpar& ((data + (fibers-let ((data (with-resource-from-pool (connection-pool) conn (system-test-derivations-differences-data conn @@ -1014,7 +1016,7 @@ (render-json '((error . "invalid query")))) (else - (letpar& ((systems + (fibers-let ((systems (with-resource-from-pool (connection-pool) conn list-systems)) (build-server-urls @@ -1035,7 +1037,7 @@ (target-branch (assq-ref query-parameters 'target_branch)) (target-datetime (assq-ref query-parameters 'target_datetime)) (system (assq-ref query-parameters 'system))) - (letpar& + (fibers-let ((base-revision-details (with-resource-from-pool (connection-pool) conn (select-guix-revision-for-branch-and-datetime conn @@ -1046,7 +1048,7 @@ (select-guix-revision-for-branch-and-datetime conn target-branch target-datetime)))) - (letpar& ((data + (fibers-let ((data (with-resource-from-pool (connection-pool) conn (system-test-derivations-differences-data conn diff --git a/guix-data-service/web/controller.scm b/guix-data-service/web/controller.scm index d23c2f3..cdf2318 100644 --- a/guix-data-service/web/controller.scm +++ b/guix-data-service/web/controller.scm @@ -35,6 +35,8 @@ #:use-module (texinfo html) #:use-module (squee) #:use-module (json) + #:use-module (knots parallelism) + #:use-module (knots resource-pool) #:use-module (prometheus) #:use-module (guix-data-service utils) #:use-module (guix-data-service config) @@ -234,7 +236,7 @@ #:always-rollback? #t)) (lambda () - (letpar& ((metric-values + (fibers-let ((metric-values (with-exception-handler (lambda (exn) (simple-format @@ -456,12 +458,12 @@ (write-metrics registry port)))))))) (define (render-derivation derivation-file-name) - (letpar& ((derivation + (fibers-let ((derivation (with-resource-from-pool (connection-pool) conn (select-derivation-by-file-name conn derivation-file-name)))) (if derivation - (letpar& ((derivation-inputs + (fibers-let ((derivation-inputs (with-resource-from-pool (connection-pool) conn (select-derivation-inputs-by-derivation-id conn @@ -495,7 +497,7 @@ (select-derivation-by-file-name conn derivation-file-name)))) (if derivation - (letpar& ((derivation-inputs + (fibers-let ((derivation-inputs (with-resource-from-pool (connection-pool) conn (select-derivation-inputs-by-derivation-id conn @@ -551,7 +553,7 @@ (select-derivation-by-file-name conn derivation-file-name)))) (if derivation - (letpar& ((derivation-inputs + (fibers-let ((derivation-inputs (with-resource-from-pool (connection-pool) conn (select-derivation-inputs-by-derivation-id conn @@ -596,7 +598,7 @@ #:sxml (view-narinfos narinfos))))) (define (render-store-item filename) - (letpar& ((derivation + (fibers-let ((derivation (with-resource-from-pool (connection-pool) conn (select-derivation-by-output-filename conn filename)))) (match derivation @@ -619,7 +621,7 @@ filename))) #:extra-headers http-headers-for-unchanging-content)))) (derivations - (letpar& ((nars + (fibers-let ((nars (with-resource-from-pool (connection-pool) conn (select-nars-for-output conn filename))) (builds @@ -656,7 +658,7 @@ conn filename)))))))))) (derivations - (letpar& ((nars + (fibers-let ((nars (with-resource-from-pool (connection-pool) conn (select-nars-for-output conn filename)))) (render-json diff --git a/guix-data-service/web/jobs/controller.scm b/guix-data-service/web/jobs/controller.scm index 7e5084f..96621f9 100644 --- a/guix-data-service/web/jobs/controller.scm +++ b/guix-data-service/web/jobs/controller.scm @@ -17,6 +17,8 @@ (define-module (guix-data-service web jobs controller) #:use-module (ice-9 match) + #:use-module (knots parallelism) + #:use-module (knots resource-pool) #:use-module (guix-data-service utils) #:use-module (guix-data-service database) #:use-module (guix-data-service web render) @@ -74,7 +76,7 @@ (define (render-jobs mime-types query-parameters) (define limit-results (assq-ref query-parameters 'limit_results)) - (letpar& ((jobs + (fibers-let ((jobs (with-resource-from-pool (connection-pool) conn (select-jobs-and-events conn diff --git a/guix-data-service/web/nar/controller.scm b/guix-data-service/web/nar/controller.scm index e2ace7a..f7edac6 100644 --- a/guix-data-service/web/nar/controller.scm +++ b/guix-data-service/web/nar/controller.scm @@ -27,6 +27,8 @@ #:use-module (web uri) #:use-module (web request) #:use-module (web response) + #:use-module (knots parallelism) + #:use-module (knots resource-pool) #:use-module (guix pki) #:use-module (guix base32) #:use-module (guix base64) @@ -155,7 +157,7 @@ #:code 200 #:headers '((content-type . (application/x-narinfo)))) (let ((derivation-file-name (second derivation))) - (letpar& + (fibers-let ((derivation-text (with-resource-from-pool (reserved-connection-pool) conn (select-serialized-derivation-by-file-name diff --git a/guix-data-service/web/package/controller.scm b/guix-data-service/web/package/controller.scm index 8dc6b0f..792394c 100644 --- a/guix-data-service/web/package/controller.scm +++ b/guix-data-service/web/package/controller.scm @@ -19,6 +19,8 @@ #:use-module (ice-9 match) #:use-module (web uri) #:use-module (web request) + #:use-module (knots parallelism) + #:use-module (knots resource-pool) #:use-module (guix-data-service utils) #:use-module (guix-data-service database) #:use-module (guix-data-service web render) @@ -40,7 +42,7 @@ request `((system ,parse-system #:default "x86_64-linux") (target ,parse-target #:default ""))))) - (letpar& ((package-versions-with-branches + (fibers-let ((package-versions-with-branches (with-resource-from-pool (connection-pool) conn (branches-by-package-version conn name (assq-ref parsed-query-parameters diff --git a/guix-data-service/web/repository/controller.scm b/guix-data-service/web/repository/controller.scm index 0d9434c..101687c 100644 --- a/guix-data-service/web/repository/controller.scm +++ b/guix-data-service/web/repository/controller.scm @@ -19,6 +19,8 @@ #:use-module (ice-9 match) #:use-module (web uri) #:use-module (web request) + #:use-module (knots parallelism) + #:use-module (knots resource-pool) #:use-module (guix-data-service utils) #:use-module (guix-data-service database) #:use-module (guix-data-service web render) @@ -47,7 +49,7 @@ (match method-and-path-components (('GET "repositories") - (letpar& ((git-repositories + (fibers-let ((git-repositories (call-with-resource-from-pool (connection-pool) all-git-repositories))) (case (most-appropriate-mime-type @@ -71,7 +73,7 @@ (match (with-resource-from-pool (connection-pool) conn (select-git-repository conn id)) ((label url cgit-url-base fetch-with-authentication? poll-interval) - (letpar& ((branches + (fibers-let ((branches (with-resource-from-pool (connection-pool) conn (all-branches-with-most-recent-commit conn @@ -119,7 +121,7 @@ `((after_date ,parse-datetime) (before_date ,parse-datetime) (limit_results ,parse-result-limit #:default 100))))) - (letpar& ((revisions + (fibers-let ((revisions (with-resource-from-pool (connection-pool) conn (most-recent-commits-for-branch conn @@ -160,7 +162,7 @@ parsed-query-parameters revisions))))))))) (('GET "repository" repository-id "branch" branch-name "package" package-name) - (letpar& ((package-versions + (fibers-let ((package-versions (with-resource-from-pool (connection-pool) conn (package-versions-for-branch conn (string->number repository-id) @@ -211,7 +213,7 @@ (parse-query-parameters request `((system ,parse-system #:default "x86_64-linux"))))) - (letpar& ((system-test-history + (fibers-let ((system-test-history (with-resource-from-pool (connection-pool) conn (system-test-derivations-for-branch conn @@ -256,7 +258,7 @@ valid-systems system-test-history))))))) (('GET "repository" repository-id "branch" branch-name "latest-processed-revision") - (letpar& ((commit-hash + (fibers-let ((commit-hash (with-resource-from-pool (connection-pool) conn (latest-processed-commit-for-branch conn repository-id @@ -273,7 +275,7 @@ repository-id branch-name)))) (('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "packages") - (letpar& ((commit-hash + (fibers-let ((commit-hash (with-resource-from-pool (connection-pool) conn (latest-processed-commit-for-branch conn repository-id @@ -313,7 +315,7 @@ repository-id branch-name)))) (('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "package-derivations") - (letpar& ((commit-hash + (fibers-let ((commit-hash (with-resource-from-pool (connection-pool) conn (latest-processed-commit-for-branch conn repository-id @@ -422,7 +424,7 @@ branch-name)))) (('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "system-tests") - (letpar& ((commit-hash + (fibers-let ((commit-hash (with-resource-from-pool (connection-pool) conn (latest-processed-commit-for-branch conn repository-id @@ -440,7 +442,7 @@ repository-id branch-name)))) (('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "package-reproducibility") - (letpar& ((commit-hash + (fibers-let ((commit-hash (with-resource-from-pool (connection-pool) conn (latest-processed-commit-for-branch conn repository-id @@ -462,7 +464,7 @@ repository-id branch-name)))) (('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "package-substitute-availability") - (letpar& ((commit-hash + (fibers-let ((commit-hash (with-resource-from-pool (connection-pool) conn (latest-processed-commit-for-branch conn repository-id @@ -476,7 +478,7 @@ branch-name)))) (('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "lint-warnings") - (letpar& ((commit-hash + (fibers-let ((commit-hash (with-resource-from-pool (connection-pool) conn (latest-processed-commit-for-branch conn repository-id @@ -510,7 +512,7 @@ repository-id branch-name)))) (('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "package" name version) - (letpar& ((commit-hash + (fibers-let ((commit-hash (with-resource-from-pool (connection-pool) conn (latest-processed-commit-for-branch conn repository-id @@ -583,7 +585,7 @@ (assq-ref parsed-query-parameters 'system)) (target (assq-ref parsed-query-parameters 'target))) - (letpar& + (fibers-let ((package-derivations (with-resource-from-pool (connection-pool) conn (package-derivations-for-branch conn @@ -620,7 +622,7 @@ . ,(list->vector builds))))) package-derivations)))))) (else - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -657,7 +659,7 @@ (assq-ref parsed-query-parameters 'target)) (output-name (assq-ref parsed-query-parameters 'output))) - (letpar& + (fibers-let ((package-outputs (with-resource-from-pool (connection-pool) conn (package-outputs-for-branch conn @@ -695,7 +697,7 @@ . ,(list->vector builds))))) package-outputs)))))) (else - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets diff --git a/guix-data-service/web/revision/controller.scm b/guix-data-service/web/revision/controller.scm index 14a721a..c4a25f7 100644 --- a/guix-data-service/web/revision/controller.scm +++ b/guix-data-service/web/revision/controller.scm @@ -24,6 +24,8 @@ #:use-module (texinfo html) #:use-module (texinfo plain-text) #:use-module (json) + #:use-module (knots parallelism) + #:use-module (knots resource-pool) #:use-module (guix-data-service utils) #:use-module (guix-data-service database) #:use-module (guix-data-service web render) @@ -84,7 +86,7 @@ status)))) (define (parse-build-server v) - (letpar& ((build-servers + (fibers-let ((build-servers (call-with-resource-from-pool (connection-pool) select-build-servers))) (or (any (match-lambda @@ -395,7 +397,7 @@ `((unknown_commit . ,commit-hash)) #:code 404)) (else - (letpar& ((job + (fibers-let ((job (with-resource-from-pool (connection-pool) conn (select-job-for-commit conn commit-hash))) (git-repositories-and-branches @@ -423,7 +425,7 @@ `((unknown_commit . ,commit-hash)) #:code 404)) (else - (letpar& ((job + (fibers-let ((job (with-resource-from-pool (connection-pool) conn (select-job-for-commit conn commit-hash))) (git-repositories-and-branches @@ -448,7 +450,7 @@ (header-text `("Revision " (samp ,commit-hash))) (max-age cache-control-default-max-age)) - (letpar& ((packages-count + (fibers-let ((packages-count (with-resource-from-pool (connection-pool) conn (count-packages-in-revision conn commit-hash))) (git-repositories-and-branches @@ -514,7 +516,7 @@ `("Revision " (samp ,commit-hash))) (header-link (string-append "/revision/" commit-hash))) - (letpar& ((system-tests + (fibers-let ((system-tests (with-resource-from-pool (connection-pool) conn (select-system-tests-for-guix-revision conn @@ -542,7 +544,7 @@ (builds . ,(list->vector builds))))) system-tests)))))) (else - (letpar& ((git-repositories + (fibers-let ((git-repositories (with-resource-from-pool (connection-pool) conn (git-repositories-containing-commit conn commit-hash))) @@ -568,7 +570,7 @@ (header-link (string-append "/revision/" commit-hash))) - (letpar& ((channel-instances + (fibers-let ((channel-instances (with-resource-from-pool (connection-pool) conn (select-channel-instances-for-guix-revision conn commit-hash)))) (case (most-appropriate-mime-type @@ -596,7 +598,7 @@ (define* (render-revision-package-substitute-availability mime-types commit-hash #:key path-base) - (letpar& ((substitute-availability + (fibers-let ((substitute-availability (with-resource-from-pool (connection-pool) conn (select-package-output-availability-for-revision conn commit-hash))) @@ -610,7 +612,7 @@ ((application/json) (render-json `((commit . ,commit-hash) - (substitute_servers + (xsubstitute_servers . ,(list->vector (map (match-lambda ((build-server-id . data) @@ -642,7 +644,7 @@ (header-link (string-append "/revision/" commit-hash))) - (letpar& ((output-consistency + (fibers-let ((output-consistency (with-resource-from-pool (connection-pool) conn (select-output-consistency-for-revision conn commit-hash)))) (case (most-appropriate-mime-type @@ -676,7 +678,7 @@ #:sxml (view-revision-news commit-hash query-parameters '())))) - (letpar& ((news-entries + (fibers-let ((news-entries (with-resource-from-pool (connection-pool) conn (select-channel-news-entries-contained-in-guix-revision conn @@ -735,7 +737,7 @@ 99999)) ; TODO There shouldn't be a limit (fields (assq-ref query-parameters 'field)) (locale (assq-ref query-parameters 'locale))) - (letpar& + (fibers-let ((packages (with-resource-from-pool (connection-pool) conn (if search-query @@ -832,7 +834,7 @@ "/revision/" commit-hash)) (header-text `("Revision " (samp ,commit-hash)))) - (letpar& ((package-synopsis-counts + (fibers-let ((package-synopsis-counts (with-resource-from-pool (connection-pool) conn (synopsis-counts-by-locale conn (commit->revision-id @@ -872,7 +874,7 @@ (header-link (string-append "/revision/" commit-hash))) - (letpar& ((package-versions + (fibers-let ((package-versions (with-resource-from-pool (connection-pool) conn (select-package-versions-for-revision conn commit-hash @@ -929,7 +931,7 @@ (define has-replacement? (assq-ref query-parameters 'has_replacement)) - (letpar& ((metadata + (fibers-let ((metadata (with-resource-from-pool (connection-pool) conn (select-package-metadata-by-revision-name-and-version conn @@ -1041,7 +1043,7 @@ (render-json `((error . "invalid query")))) (else - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -1067,7 +1069,7 @@ (assq-ref query-parameters 'search_query)) (fields (assq-ref query-parameters 'field))) - (letpar& + (fibers-let ((derivations (if search-query (with-resource-from-pool (connection-pool) conn @@ -1090,7 +1092,7 @@ #:after-name (assq-ref query-parameters 'after_name) #:include-builds? (member "builds" fields))) (concatenate! - (par-map& + (fibers-map (lambda (system) (with-resource-from-pool (connection-pool) conn (select-package-derivations-in-revision @@ -1149,7 +1151,7 @@ derivations)))) #:stream? #t)) (else - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -1187,7 +1189,7 @@ (render-json `((error . "invalid query")))) (else - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -1213,7 +1215,7 @@ (assq-ref query-parameters 'search_query)) (fields (assq-ref query-parameters 'field))) - (letpar& + (fibers-let ((derivations (with-resource-from-pool (connection-pool) conn (select-fixed-output-package-derivations-in-revision @@ -1242,7 +1244,7 @@ (render-json `((derivations . ,(list->vector derivations))))) (else - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -1284,7 +1286,7 @@ (render-json `((error . "invalid query")))) (else - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -1308,7 +1310,7 @@ (assq-ref query-parameters 'all_results)) (fields (assq-ref query-parameters 'field))) - (letpar& + (fibers-let ((derivation-outputs (with-resource-from-pool (connection-pool) conn (select-derivation-outputs-in-revision @@ -1390,7 +1392,7 @@ "not-matching"))))))) derivation-outputs)))))) (else - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -1419,7 +1421,7 @@ (header-link (string-append "/revision/" commit-hash))) (if (any-invalid-query-parameters? query-parameters) - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -1437,7 +1439,7 @@ '()))) (let ((system (assq-ref query-parameters 'system)) (target (assq-ref query-parameters 'target))) - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -1492,7 +1494,7 @@ (header-link (string-append "/revision/" commit-hash))) (if (any-invalid-query-parameters? query-parameters) - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -1509,7 +1511,7 @@ '()))) (let ((system (assq-ref query-parameters 'system)) (target (assq-ref query-parameters 'target))) - (letpar& ((systems + (fibers-let ((systems (call-with-resource-from-pool (connection-pool) list-systems)) (targets @@ -1592,7 +1594,7 @@ (linters (assq-ref query-parameters 'linter)) (message-query (assq-ref query-parameters 'message_query)) (fields (assq-ref query-parameters 'field))) - (letpar& + (fibers-let ((git-repositories (with-resource-from-pool (connection-pool) conn (git-repositories-containing-commit conn diff --git a/guix-data-service/web/server.scm b/guix-data-service/web/server.scm index 4e08161..a1a888b 100644 --- a/guix-data-service/web/server.scm +++ b/guix-data-service/web/server.scm @@ -30,6 +30,8 @@ #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers conditions) + #:use-module (knots web-server) + #:use-module (knots resource-pool) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (prometheus) @@ -246,7 +248,7 @@ port. Also, the port used can be changed by passing the --port option.\n" (make-counter-metric registry "resource_pool_checkout_timeouts_total" #:labels '(pool_name)))) - (%resource-pool-timeout-handler + (resource-pool-default-timeout-handler (lambda (pool proc timeout) (let ((pool-name (cond @@ -269,11 +271,12 @@ port. Also, the port used can be changed by passing the --port option.\n" request-scheduler) (let ((render-metrics (make-render-metrics registry))) - (run-server/patched - (lambda (request body) + (run-knots-web-server + (lambda (request) (metric-increment requests-metric) - (let ((reply (make-channel))) + (let ((body (read-request-body request)) + (reply (make-channel))) (spawn-fiber (lambda () (call-with-values diff --git a/guix-dev.scm b/guix-dev.scm index 8d33657..eec15ec 100644 --- a/guix-dev.scm +++ b/guix-dev.scm @@ -41,6 +41,38 @@ (gnu packages ruby) (srfi srfi-1)) +(define guile-knots + (let ((commit "0fab93e9ff5b16813ae1356c13d3c974d7277d81") + (revision "1")) + (package + (name "guile-knots") + (version (git-version "0" revision commit)) + (source (origin + (method git-fetch) + (uri (git-reference + (url "https://git.cbaines.net/git/guile/knots") + (commit commit))) + (sha256 + (base32 + "1x0wirq0db2704784ig00kz5kh8j6szp2gwm67wn714m1jbhz9ky")) + (file-name (string-append name "-" version "-checkout")))) + (build-system gnu-build-system) + (native-inputs + (list pkg-config + autoconf + automake + guile-3.0 + guile-fibers)) + (inputs + (list guile-3.0)) + (propagated-inputs + (list guile-fibers)) + (home-page "https://git.cbaines.net/guile/knots") + (synopsis "Patterns and functionality to use with Guile Fibers") + (description + "") + (license license:gpl3+)))) + (package (name "guix-data-service") (version "0.0.0") @@ -52,6 +84,7 @@ guile-json-4 guile-squee guile-fibers + guile-knots guile-gcrypt guile-lzlib guile-readline |