aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-data-service/jobs/load-new-guix-revision.scm22
-rw-r--r--guix-data-service/utils.scm1120
-rw-r--r--guix-data-service/web/build-server/controller.scm1
-rw-r--r--guix-data-service/web/build/controller.scm6
-rw-r--r--guix-data-service/web/compare/controller.scm38
-rw-r--r--guix-data-service/web/controller.scm18
-rw-r--r--guix-data-service/web/jobs/controller.scm4
-rw-r--r--guix-data-service/web/nar/controller.scm4
-rw-r--r--guix-data-service/web/package/controller.scm4
-rw-r--r--guix-data-service/web/repository/controller.scm36
-rw-r--r--guix-data-service/web/revision/controller.scm62
-rw-r--r--guix-data-service/web/server.scm11
-rw-r--r--guix-dev.scm33
13 files changed, 154 insertions, 1205 deletions
diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm
index dfa41ec..e18528a 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -38,6 +38,12 @@
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers operations)
+ #:use-module (knots)
+ #:use-module (knots queue)
+ #:use-module (knots promise)
+ #:use-module (knots parallelism)
+ #:use-module (knots resource-pool)
+ #:use-module (knots worker-threads)
#:use-module (guix monads)
#:use-module (guix base32)
#:use-module (guix store)
@@ -1127,7 +1133,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(insert-derivations)))
(unless (null? derivations)
- (parallel-via-fibers
+ (fibers-parallel
(insert-sources derivations
derivation-ids)
(with-time-logging
@@ -1906,7 +1912,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(inferior-lint-checkers inferior)))))
(when inferior-lint-checkers-data
- (letpar& ((lint-checker-ids
+ (fibers-let ((lint-checker-ids
(with-resource-from-pool postgresql-connection-pool conn
(lint-checkers->lint-checker-ids
conn
@@ -2181,7 +2187,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(with-time-logging
(simple-format #f "extract-information-from: ~A\n" store-item)
- (parallel-via-fibers
+ (fibers-parallel
(begin
(fibers-force package-ids-promise)
#f)
@@ -2267,7 +2273,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
extra-inferior-environment-variables)
(define utility-thread-channel
;; There might be high demand for this, so order the requests
- (make-queueing-channel
+ (spawn-queueing-fiber
(call-with-default-io-waiters
(lambda ()
(make-worker-thread-channel
@@ -2791,6 +2797,12 @@ SKIP LOCKED")
(exec-query conn "BEGIN")
+ ;; (spawn-fiber
+ ;; (lambda ()
+ ;; (while #t
+ ;; (sleep (* 60 5))
+ ;; (profile-heap))))
+
(spawn-fiber
(lambda ()
(while (perform-operation
@@ -2864,7 +2876,7 @@ SKIP LOCKED")
id))))))
(when result
- (parallel-via-fibers
+ (fibers-parallel
(with-postgresql-connection
(simple-format #f "post load-new-guix-revision ~A counts" id)
(lambda (conn)
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index b53f33f..a447a9c 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -38,43 +38,14 @@
#:use-module (fibers timers)
#:use-module (fibers conditions)
#:use-module (fibers scheduler)
+ #:use-module (knots timeout)
#:use-module (prometheus)
#:export (call-with-time-logging
with-time-logging
prevent-inlining-for-tests
- resource-pool-default-timeout
- resource-pool-retry-checkout-timeout
- %resource-pool-timeout-handler
- resource-pool-timeout-error?
- make-resource-pool
- destroy-resource-pool
- call-with-resource-from-pool
- with-resource-from-pool
- resource-pool-stats
-
- call-with-default-io-waiters
- make-worker-thread-channel
- %worker-thread-default-timeout
- call-with-worker-thread
- worker-thread-timeout-error?
-
fiberize
- fibers-delay
- fibers-force
- fibers-promise-reset
-
- fibers-batch-for-each
- fibers-for-each
- fibers-batch-map
- fibers-map
-
- parallel-via-fibers
- par-map&
- letpar&
- fibers-map-with-progress
-
chunk
chunk!
chunk-for-each!
@@ -84,7 +55,6 @@
get-guix-metrics-updater
call-with-sigint
- run-server/patched
spawn-port-monitoring-fiber
@@ -107,674 +77,6 @@
(define-syntax-rule (prevent-inlining-for-tests var)
(set! var var))
-(define-record-type <resource-pool>
- (make-resource-pool-record name channel)
- resource-pool?
- (name resource-pool-name)
- (channel resource-pool-channel))
-
-(define* (make-resource-pool initializer max-size
- #:key (min-size max-size)
- (idle-seconds #f)
- (delay-logger (const #f))
- (duration-logger (const #f))
- destructor
- lifetime
- scheduler
- (name "unnamed"))
- (define (initializer/safe)
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running ~A resource pool initializer: ~A:\n ~A\n"
- name
- initializer
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- initializer
- (lambda args
- (backtrace))))
- #:unwind? #t))
-
- (define (destructor/safe args)
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running resource pool destructor (~A): ~A:\n ~A\n"
- name
- destructor
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (destructor args)
- #t)
- (lambda _
- (backtrace))))
- #:unwind? #t)))
-
- (or success?
- #t
- (begin
- (sleep 5)
- (destructor/safe args)))))
-
- (let ((channel (make-channel))
- (checkout-failure-count 0))
- (spawn-fiber
- (lambda ()
- (when idle-seconds
- (spawn-fiber
- (lambda ()
- (while #t
- (sleep idle-seconds)
- (put-message channel '(check-for-idle-resources))))))
-
- (while #t
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception in the ~A pool fiber: ~A\n"
- name
- exn))
- (lambda ()
- (let loop ((resources '())
- (available '())
- (waiters '())
- (resources-last-used '()))
-
- (match (get-message channel)
- (('checkout reply)
- (if (null? available)
- (if (= (length resources) max-size)
- (loop resources
- available
- (cons reply waiters)
- resources-last-used)
- (let ((new-resource (initializer/safe)))
- (if new-resource
- (let ((checkout-success?
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation reply new-resource)
- (const #t))
- (wrap-operation (sleep-operation 1)
- (const #f))))))
- (unless checkout-success?
- (set! checkout-failure-count
- (+ 1 checkout-failure-count)))
-
- (loop (cons new-resource resources)
- (if checkout-success?
- available
- (cons new-resource available))
- waiters
- (cons (get-internal-real-time)
- resources-last-used)))
- (loop resources
- available
- (cons reply waiters)
- resources-last-used))))
- (let ((checkout-success?
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation reply (car available))
- (const #t))
- (wrap-operation (sleep-operation 1)
- (const #f))))))
- (unless checkout-success?
- (set! checkout-failure-count
- (+ 1 checkout-failure-count)))
-
- (if checkout-success?
- (loop resources
- (cdr available)
- waiters
- resources-last-used)
- (loop resources
- available
- waiters
- resources-last-used)))))
- (('return resource)
- (if (null? waiters)
- (loop resources
- (cons resource available)
- waiters
- (begin
- (list-set!
- resources-last-used
- (list-index (lambda (x)
- (eq? x resource))
- resources)
- (get-internal-real-time))
- resources-last-used))
- (let ((checkout-success?
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation (last waiters)
- resource)
- (const #t))
- (wrap-operation (sleep-operation 1)
- (const #f))))))
- (unless checkout-success?
- (set! checkout-failure-count
- (+ 1 checkout-failure-count)))
-
- (if checkout-success?
- (loop resources
- available
- (drop-right! waiters 1)
- (begin
- (list-set!
- resources-last-used
- (list-index (lambda (x)
- (eq? x resource))
- resources)
- (get-internal-real-time))
- resources-last-used))
- (begin
- (for-each
- (lambda (waiter)
- (spawn-fiber
- (lambda ()
- (perform-operation
- (choice-operation
- (put-operation waiter 'resource-pool-retry-checkout)
- (sleep-operation 10))))))
- waiters)
-
- (loop resources
- (cons resource available)
- '()
- (begin
- (list-set!
- resources-last-used
- (list-index (lambda (x)
- (eq? x resource))
- resources)
- (get-internal-real-time))
- resources-last-used)))))))
- (('stats reply)
- (let ((stats
- `((resources . ,(length resources))
- (available . ,(length available))
- (waiters . ,(length waiters))
- (checkout-failure-count . ,checkout-failure-count))))
-
- (spawn-fiber
- (lambda ()
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation reply stats)
- (const #t))
- (wrap-operation (sleep-operation 1)
- (const #f)))))))
-
- (loop resources
- available
- waiters
- resources-last-used))
- (('check-for-idle-resources)
- (let* ((resources-last-used-seconds
- (map
- (lambda (internal-time)
- (/ (- (get-internal-real-time) internal-time)
- internal-time-units-per-second))
- resources-last-used))
- (resources-to-destroy
- (filter-map
- (lambda (resource last-used-seconds)
- (if (and (member resource available)
- (> last-used-seconds idle-seconds))
- resource
- #f))
- resources
- resources-last-used-seconds)))
-
- (for-each
- (lambda (resource)
- (destructor/safe resource))
- resources-to-destroy)
-
- (loop (lset-difference eq? resources resources-to-destroy)
- (lset-difference eq? available resources-to-destroy)
- waiters
- (filter-map
- (lambda (resource last-used)
- (if (memq resource resources-to-destroy)
- #f
- last-used))
- resources
- resources-last-used))))
- (('destroy reply)
- (if (= (length resources) (length available))
- (begin
- (for-each
- (lambda (resource)
- (destructor/safe resource))
- resources)
- (put-message reply 'destroy-success))
- (begin
- (spawn-fiber
- (lambda ()
- (perform-operation
- (choice-operation
- (put-operation reply 'resource-pool-destroy-failed)
- (sleep-operation 10)))))
- (loop resources
- available
- waiters
- resources-last-used))))
- (unknown
- (simple-format
- (current-error-port)
- "unrecognised message to ~A resource pool channel: ~A\n"
- name
- unknown)
- (loop resources
- available
- waiters
- resources-last-used)))))
- #:unwind? #t)))
- (or scheduler
- (current-scheduler)))
-
- (make-resource-pool-record name channel)))
-
-(define (destroy-resource-pool pool)
- (let ((reply (make-channel)))
- (put-message (resource-pool-channel pool)
- (list 'destroy reply))
- (let ((msg (get-message reply)))
- (unless (eq? msg 'destroy-success)
- (error msg)))))
-
-(define resource-pool-default-timeout
- (make-parameter #f))
-
-(define resource-pool-retry-checkout-timeout
- (make-parameter 5))
-
-(define &resource-pool-timeout
- (make-exception-type '&recource-pool-timeout
- &error
- '(name)))
-
-(define make-resource-pool-timeout-error
- (record-constructor &resource-pool-timeout))
-
-(define resource-pool-timeout-error?
- (record-predicate &resource-pool-timeout))
-
-(define %resource-pool-timeout-handler
- (make-parameter #f))
-
-(define* (call-with-resource-from-pool pool proc #:key (timeout 'default)
- (timeout-handler (%resource-pool-timeout-handler)))
- "Call PROC with a resource from POOL, blocking until a resource becomes
-available. Return the resource once PROC has returned."
-
- (define retry-timeout
- (resource-pool-retry-checkout-timeout))
-
- (define timeout-or-default
- (if (eq? timeout 'default)
- (resource-pool-default-timeout)
- timeout))
-
- (let ((resource
- (let ((reply (make-channel)))
- (let loop ((start-time (get-internal-real-time)))
- (let ((request-success?
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation (resource-pool-channel pool)
- `(checkout ,reply))
- (const #t))
- (wrap-operation (sleep-operation (or timeout-or-default
- retry-timeout))
- (const #f))))))
- (if request-success?
- (let ((time-remaining
- (- (or timeout-or-default
- retry-timeout)
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second))))
- (if (> time-remaining 0)
- (let ((response
- (perform-operation
- (choice-operation
- (get-operation reply)
- (wrap-operation (sleep-operation time-remaining)
- (const #f))))))
- (if (or (not response)
- (eq? response 'resource-pool-retry-checkout))
- (if (> (- (or timeout-or-default
- retry-timeout)
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second))
- 0)
- (loop start-time)
- (if (eq? timeout-or-default #f)
- (loop (get-internal-real-time))
- #f))
- response))
- (if (eq? timeout-or-default #f)
- (loop (get-internal-real-time))
- #f)))
- (if (eq? timeout-or-default #f)
- (loop (get-internal-real-time))
- #f)))))))
-
- (when (or (not resource)
- (eq? resource 'resource-pool-retry-checkout))
- (when timeout-handler
- (timeout-handler pool proc timeout))
-
- (raise-exception
- (make-resource-pool-timeout-error (resource-pool-name pool))))
-
- (with-exception-handler
- (lambda (exception)
- (put-message (resource-pool-channel pool)
- `(return ,resource))
- (raise-exception exception))
- (lambda ()
- (call-with-values
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (proc resource))
- (lambda _
- (backtrace))))
- (lambda vals
- (put-message (resource-pool-channel pool)
- `(return ,resource))
- (apply values vals))))
- #:unwind? #t)))
-
-(define-syntax-rule (with-resource-from-pool pool resource exp ...)
- (call-with-resource-from-pool
- pool
- (lambda (resource) exp ...)))
-
-(define* (resource-pool-stats pool #:key (timeout 5))
- (let ((reply (make-channel))
- (start-time (get-internal-real-time)))
- (perform-operation
- (choice-operation
- (wrap-operation
- (put-operation (resource-pool-channel pool)
- `(stats ,reply))
- (const #t))
- (wrap-operation (sleep-operation timeout)
- (lambda _
- (raise-exception
- (make-resource-pool-timeout-error))))))
-
- (let ((time-remaining
- (- timeout
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second))))
- (if (> time-remaining 0)
- (perform-operation
- (choice-operation
- (get-operation reply)
- (wrap-operation (sleep-operation time-remaining)
- (lambda _
- (raise-exception
- (make-resource-pool-timeout-error))))))
- (raise-exception
- (make-resource-pool-timeout-error))))))
-
-(define (call-with-default-io-waiters thunk)
- (parameterize
- ((current-read-waiter (@@ (ice-9 suspendable-ports)
- default-read-waiter))
- (current-write-waiter (@@ (ice-9 suspendable-ports)
- default-write-waiter)))
- (thunk)))
-
-(define %worker-thread-args
- (make-parameter #f))
-
-(define* (make-worker-thread-channel initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- (duration-logger (const #f))
- destructor
- lifetime
- (log-exception? (const #t))
- (expire-on-exception? #f)
- (name "unnamed"))
- "Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
- (define thread-proc-vector
- (make-vector parallelism #f))
-
- (define (initializer/safe)
- (let ((args
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running initializer in worker thread (~A): ~A:\n ~A\n"
- name
- initializer
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- initializer
- (lambda args
- (backtrace))))
- #:unwind? #t)))
-
- (if args
- args
- ;; never give up, just keep retrying
- (begin
- (sleep 1)
- (initializer/safe)))))
-
- (define (destructor/safe args)
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running destructor in worker thread (~A): ~A:\n ~A\n"
- name
- destructor
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply destructor args)
- #t)
- (lambda _
- (backtrace))))
- #:unwind? #t)))
-
- (or success?
- #t
- (begin
- (sleep 1)
- (destructor/safe args)))))
-
- (define (process thread-index channel args)
- (let loop ((current-lifetime lifetime))
- (let ((exception?
- (match (get-message channel)
- (((? channel? reply) sent-time (? procedure? proc))
- (let ((time-delay
- (- (get-internal-real-time)
- sent-time)))
- (delay-logger (/ time-delay
- internal-time-units-per-second))
-
- (let* ((start-time (get-internal-real-time))
- (response
- (with-exception-handler
- (lambda (exn)
- (list 'worker-thread-error
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- exn))
- (lambda ()
- (vector-set! thread-proc-vector
- thread-index
- proc)
- (with-throw-handler #t
- (lambda ()
- (call-with-values
- (lambda ()
- (start-stack
- 'worker-thread
- (apply proc args)))
- (lambda vals
- (cons (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- vals))))
- (lambda args
- (when (match args
- (('%exception exn)
- (log-exception? exn))
- (_ #t))
- (simple-format
- (current-error-port)
- "worker-thread: exception: ~A\n" args)
- (backtrace)))))
- #:unwind? #t)))
- (put-message reply
- response)
-
- (vector-set! thread-proc-vector
- thread-index
- #f)
-
- (match response
- (('worker-thread-error duration _)
- (when duration-logger
- (duration-logger duration proc))
- #t)
- ((duration . _)
- (when duration-logger
- (duration-logger duration proc))
- #f))))))))
- (unless (and expire-on-exception?
- exception?)
- (if (number? current-lifetime)
- (unless (< current-lifetime 0)
- (loop (if current-lifetime
- (- current-lifetime 1)
- #f)))
- (loop #f))))))
-
- (let ((channel (make-channel)))
- (for-each
- (lambda (thread-index)
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append
- name " w t "
- (number->string thread-index))))
- (const #t))
-
- (let init ((args (initializer/safe)))
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "worker-thread-channel: exception: ~A\n" exn))
- (lambda ()
- (parameterize ((%worker-thread-args args))
- (process thread-index channel args)))
- #:unwind? #t)
-
- (when destructor
- (destructor/safe args))
-
- (init (initializer/safe))))))
- (iota parallelism))
-
- (values channel
- thread-proc-vector)))
-
-(define &worker-thread-timeout
- (make-exception-type '&worker-thread-timeout
- &error
- '()))
-
-(define make-worker-thread-timeout-error
- (record-constructor &worker-thread-timeout))
-
-(define worker-thread-timeout-error?
- (record-predicate &worker-thread-timeout))
-
-(define %worker-thread-default-timeout
- (make-parameter 30))
-
-(define* (call-with-worker-thread channel proc #:key duration-logger
- (timeout (%worker-thread-default-timeout)))
- "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
-If already in the worker thread, call PROC immediately."
- (let ((args (%worker-thread-args)))
- (if args
- (apply proc args)
- (let* ((reply (make-channel))
- (operation-success?
- (perform-operation
- (let ((put
- (wrap-operation
- (put-operation channel
- (list reply
- (get-internal-real-time)
- proc))
- (const #t))))
-
- (if timeout
- (choice-operation
- put
- (wrap-operation (sleep-operation timeout)
- (const #f)))
- put)))))
-
- (unless operation-success?
- (raise-exception
- (make-worker-thread-timeout-error)))
-
- (match (get-message reply)
- (('worker-thread-error duration exn)
- (when duration-logger
- (duration-logger duration))
- (raise-exception exn))
- ((duration . result)
- (when duration-logger
- (duration-logger duration))
- (apply values result)))))))
-
(define* (fiberize proc #:key (parallelism 1))
(let ((channel (make-channel)))
(for-each
@@ -810,235 +112,6 @@ If already in the worker thread, call PROC immediately."
(('result . vals) (apply values vals))
(('exception . exn) (raise-exception exn)))))))
-(define-record-type <fibers-promise>
- (make-fibers-promise thunk values-box evaluated-condition)
- fibers-promise?
- (thunk fibers-promise-thunk)
- (values-box fibers-promise-values-box)
- (evaluated-condition fibers-promise-evaluated-condition))
-
-(define (fibers-delay thunk)
- (make-fibers-promise
- thunk
- (make-atomic-box #f)
- (make-condition)))
-
-(define (fibers-force fp)
- (let ((res (atomic-box-compare-and-swap!
- (fibers-promise-values-box fp)
- #f
- 'started)))
- (if (eq? #f res)
- (call-with-values
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (atomic-box-set! (fibers-promise-values-box fp)
- exn)
- (signal-condition!
- (fibers-promise-evaluated-condition fp))
- (raise-exception exn))
- (fibers-promise-thunk fp)
- #:unwind? #t))
- (lambda vals
- (atomic-box-set! (fibers-promise-values-box fp)
- vals)
- (signal-condition!
- (fibers-promise-evaluated-condition fp))
- (apply values vals)))
- (if (eq? res 'started)
- (begin
- (wait (fibers-promise-evaluated-condition fp))
- (let ((result (atomic-box-ref (fibers-promise-values-box fp))))
- (if (exception? result)
- (raise-exception result)
- (apply values result))))
- (if (exception? res)
- (raise-exception res)
- (apply values res))))))
-
-(define (fibers-promise-reset fp)
- (atomic-box-set! (fibers-promise-values-box fp)
- #f))
-
-;; Like split-at, but don't care about the order of the resulting lists, and
-;; don't error if the list is shorter than i elements
-(define (split-at* lst i)
- (let lp ((l lst) (n i) (acc '()))
- (if (or (<= n 0) (null? l))
- (values (reverse! acc) l)
- (lp (cdr l) (- n 1) (cons (car l) acc)))))
-
-;; As this can be called with lists with tens of thousands of items in them,
-;; batch the
-(define (get-batch batch-size lists)
- (let ((split-lists
- (map (lambda (lst)
- (let ((batch rest (split-at* lst batch-size)))
- (cons batch rest)))
- lists)))
- (values (map car split-lists)
- (map cdr split-lists))))
-
-(define (defer-to-parallel-fiber thunk)
- (let ((reply (make-channel)))
- (spawn-fiber
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (put-message reply (cons 'exception exn)))
- (lambda ()
- (call-with-values
- (lambda ()
- (with-throw-handler #t
- thunk
- (lambda _
- (backtrace))))
- (lambda vals
- (put-message reply vals))))
- #:unwind? #t))
- #:parallel? #t)
- reply))
-
-(define (fetch-result-of-defered-thunks . reply-channels)
- (let ((responses (map get-message
- reply-channels)))
- (map
- (match-lambda
- (('exception . exn)
- (raise-exception exn))
- (result
- (apply values result)))
- responses)))
-
-(define (fibers-batch-map proc batch-size . lists)
- (let loop ((lists lists)
- (result '()))
- (let ((batch
- rest
- (get-batch batch-size lists)))
- (if (any null? batch)
- result
- (let ((response-channels
- (apply map
- (lambda args
- (defer-to-parallel-fiber
- (lambda ()
- (apply proc args))))
- batch)))
- (loop rest
- (append! result
- (apply fetch-result-of-defered-thunks
- response-channels))))))))
-
-(define (fibers-map proc . lists)
- (apply fibers-batch-map proc 20 lists))
-
-(define (fibers-batch-for-each proc batch-size . lists)
- (let loop ((lists lists))
- (let ((batch
- rest
- (get-batch batch-size lists)))
- (if (any null? batch)
- *unspecified*
- (let ((response-channels
- (apply map
- (lambda args
- (defer-to-parallel-fiber
- (lambda ()
- (apply proc args))))
- batch)))
- (apply fetch-result-of-defered-thunks
- response-channels)
- (loop rest))))))
-
-(define (fibers-for-each proc . lists)
- (apply fibers-batch-for-each proc 20 lists))
-
-(define-syntax parallel-via-fibers
- (lambda (x)
- (syntax-case x ()
- ((_ e0 ...)
- (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
- #'(let ((tmp0 (defer-to-parallel-fiber
- (lambda ()
- e0)))
- ...)
- (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
-
-(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
- (call-with-values
- (lambda () (parallel-via-fibers e ...))
- (lambda (v ...)
- b0 b1 ...)))
-
-(define (par-mapper' mapper cons)
- (lambda (proc . lists)
- (apply
- fetch-result-of-defered-thunks
- (let loop ((lists lists))
- (match lists
- (((heads tails ...) ...)
- (let ((tail (loop tails))
- (head (defer-to-parallel-fiber
- (lambda ()
- (apply proc heads)))))
- (cons head tail)))
- (_
- '()))))))
-
-(define par-map& (par-mapper' map cons))
-
-(define* (fibers-map-with-progress proc lists #:key report)
- (let loop ((channels-to-results
- (apply map
- (lambda args
- (cons (defer-to-parallel-fiber
- (lambda ()
- (apply proc args)))
- #f))
- lists)))
- (let ((active-channels
- (filter-map car channels-to-results)))
- (when report
- (report (apply map
- list
- (map cdr channels-to-results)
- lists)))
- (if (null? active-channels)
- (map
- (match-lambda
- ((#f . ('exception . exn))
- (raise-exception exn))
- ((#f . ('result . val))
- val))
- channels-to-results)
- (loop
- (perform-operation
- (apply
- choice-operation
- (filter-map
- (lambda (p)
- (match p
- ((channel . _)
- (if channel
- (wrap-operation
- (get-operation channel)
- (lambda (result)
- (map (match-lambda
- ((c . r)
- (if (eq? channel c)
- (cons #f
- (match result
- (('exception . exn)
- result)
- (_
- (cons 'result result))))
- (cons c r))))
- channels-to-results)))
- #f))))
- channels-to-results))))))))
-
(define (chunk lst max-length)
(if (> (length lst)
max-length)
@@ -1126,173 +199,6 @@ If already in the worker thread, call PROC immediately."
0)))
#:unwind? #t))))
-;; This variant of run-server from the fibers library supports running
-;; multiple servers within one process.
-(define run-server/patched
- (let ((fibers-web-server-module
- (resolve-module '(fibers web server))))
-
- (define set-nonblocking!
- (module-ref fibers-web-server-module 'set-nonblocking!))
-
- (define make-default-socket
- (module-ref fibers-web-server-module 'make-default-socket))
-
- (define socket-loop
- (module-ref fibers-web-server-module 'socket-loop))
-
- (lambda* (handler
- #:key
- (host #f)
- (family AF_INET)
- (addr (if host
- (inet-pton family host)
- INADDR_LOOPBACK))
- (port 8080)
- (socket (make-default-socket family addr port)))
- ;; We use a large backlog by default. If the server is suddenly hit
- ;; with a number of connections on a small backlog, clients won't
- ;; receive confirmation for their SYN, leading them to retry --
- ;; probably successfully, but with a large latency.
- (listen socket 1024)
- (set-nonblocking! socket)
- (sigaction SIGPIPE SIG_IGN)
- (spawn-fiber (lambda () (socket-loop socket handler))))))
-
-(define &port-timeout
- (make-exception-type '&port-timeout
- &external-error
- '(port)))
-
-(define make-port-timeout-error
- (record-constructor &port-timeout))
-
-(define port-timeout-error?
- (record-predicate &port-timeout))
-
-(define &port-read-timeout
- (make-exception-type '&port-read-timeout
- &port-timeout
- '()))
-
-(define make-port-read-timeout-error
- (record-constructor &port-read-timeout))
-
-(define port-read-timeout-error?
- (record-predicate &port-read-timeout))
-
-(define &port-write-timeout
- (make-exception-type '&port-write-timeout
- &port-timeout
- '()))
-
-(define make-port-write-timeout-error
- (record-constructor &port-write-timeout))
-
-(define port-write-timeout-error?
- (record-predicate &port-write-timeout))
-
-;; These procedure are subject to spurious wakeups.
-
-(define (readable? port)
- "Test if PORT is writable."
- (match (select (vector port) #() #() 0)
- ((#() #() #()) #f)
- ((#(_) #() #()) #t)))
-
-(define (writable? port)
- "Test if PORT is writable."
- (match (select #() (vector port) #() 0)
- ((#() #() #()) #f)
- ((#() #(_) #()) #t)))
-
-(define (make-wait-operation ready? schedule-when-ready port
- port-ready-fd this-procedure)
- (make-base-operation
- #f
- (lambda _
- (and (ready? (port-ready-fd port)) values))
- (lambda (flag sched resume)
- (define (commit)
- (match (atomic-box-compare-and-swap! flag 'W 'S)
- ('W (resume values))
- ('C (commit))
- ('S #f)))
- (schedule-when-ready
- sched (port-ready-fd port) commit))))
-
-(define (wait-until-port-readable-operation port)
- "Make an operation that will succeed when PORT is readable."
- (unless (input-port? port)
- (error "refusing to wait forever for input on non-input port"))
- (make-wait-operation readable? schedule-task-when-fd-readable port
- port-read-wait-fd
- wait-until-port-readable-operation))
-
-(define (wait-until-port-writable-operation port)
- "Make an operation that will succeed when PORT is writable."
- (unless (output-port? port)
- (error "refusing to wait forever for output on non-output port"))
- (make-wait-operation writable? schedule-task-when-fd-writable port
- port-write-wait-fd
- wait-until-port-writable-operation))
-
-(define* (with-fibers-port-timeouts thunk
- #:key timeout
- (read-timeout timeout)
- (write-timeout timeout))
- (define (no-fibers-wait port mode timeout)
- (define poll-timeout-ms 200)
-
- ;; When the GC runs, it restarts the poll syscall, but the timeout
- ;; remains unchanged! When the timeout is longer than the time
- ;; between the syscall restarting, I think this renders the
- ;; timeout useless. Therefore, this code uses a short timeout, and
- ;; repeatedly calls poll while watching the clock to see if it has
- ;; timed out overall.
- (let ((timeout-internal
- (+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
- (let loop ((poll-value
- (port-poll port mode poll-timeout-ms)))
- (if (= poll-value 0)
- (if (> (get-internal-real-time)
- timeout-internal)
- (raise-exception
- (if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
- (loop (port-poll port mode poll-timeout-ms)))
- poll-value))))
-
- (parameterize
- ((current-read-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-readable-operation port)
- (wrap-operation
- (sleep-operation read-timeout)
- (lambda ()
- (raise-exception
- (make-port-read-timeout-error thunk port))))))
- (no-fibers-wait port "r" read-timeout))))
- (current-write-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-writable-operation port)
- (wrap-operation
- (sleep-operation write-timeout)
- (lambda ()
- (raise-exception
- (make-port-write-timeout-error thunk port))))))
- (no-fibers-wait port "w" write-timeout)))))
- (thunk)))
-
(define (spawn-port-monitoring-fiber port error-condition)
(spawn-fiber
(lambda ()
@@ -1305,7 +211,7 @@ If already in the worker thread, call PROC immediately."
port exn)
(signal-condition! error-condition))
(lambda ()
- (with-fibers-port-timeouts
+ (with-port-timeouts
(lambda ()
(let ((sock (socket PF_INET SOCK_STREAM 0)))
(connect sock AF_INET INADDR_LOOPBACK port)
@@ -1327,25 +233,3 @@ If already in the worker thread, call PROC immediately."
(sigaction SIGINT (car handler) (cdr handler))
;; restore original C handler.
(sigaction SIGINT #f))))))
-
-(define (make-queueing-channel channel)
- (define queue (make-q))
-
- (let ((queue-channel (make-channel)))
- (spawn-fiber
- (lambda ()
- (while #t
- (if (q-empty? queue)
- (enq! queue
- (perform-operation
- (get-operation queue-channel)))
- (let ((front (q-front queue)))
- (perform-operation
- (choice-operation
- (wrap-operation (get-operation queue-channel)
- (lambda (val)
- (enq! queue val)))
- (wrap-operation (put-operation channel front)
- (lambda _
- (q-pop! queue))))))))))
- queue-channel))
diff --git a/guix-data-service/web/build-server/controller.scm b/guix-data-service/web/build-server/controller.scm
index 22088b1..7d2bd24 100644
--- a/guix-data-service/web/build-server/controller.scm
+++ b/guix-data-service/web/build-server/controller.scm
@@ -22,6 +22,7 @@
#:use-module (json)
#:use-module (squee)
#:use-module (fibers)
+ #:use-module (knots resource-pool)
#:use-module (prometheus)
#:use-module (guix-data-service utils)
#:use-module (guix-data-service database)
diff --git a/guix-data-service/web/build/controller.scm b/guix-data-service/web/build/controller.scm
index bf77e03..7924dbb 100644
--- a/guix-data-service/web/build/controller.scm
+++ b/guix-data-service/web/build/controller.scm
@@ -18,6 +18,8 @@
(define-module (guix-data-service web build controller)
#:use-module (srfi srfi-1)
#:use-module (ice-9 match)
+ #:use-module (knots parallelism)
+ #:use-module (knots resource-pool)
#:use-module (guix-data-service utils)
#:use-module (guix-data-service database)
#:use-module (guix-data-service web render)
@@ -41,7 +43,7 @@
(define parse-build-server
(lambda (v)
- (letpar& ((build-servers
+ (fibers-let ((build-servers
(call-with-resource-from-pool (connection-pool)
select-build-servers)))
(or (any (match-lambda
@@ -88,7 +90,7 @@
'()))
(let ((system (assq-ref parsed-query-parameters 'system))
(target (assq-ref parsed-query-parameters 'target)))
- (letpar& ((build-server-options
+ (fibers-let ((build-server-options
(with-resource-from-pool (connection-pool) conn
(map (match-lambda
((id url lookup-all-derivations
diff --git a/guix-data-service/web/compare/controller.scm b/guix-data-service/web/compare/controller.scm
index e1fab78..dbb4975 100644
--- a/guix-data-service/web/compare/controller.scm
+++ b/guix-data-service/web/compare/controller.scm
@@ -24,6 +24,8 @@
#:use-module (texinfo)
#:use-module (texinfo html)
#:use-module (texinfo plain-text)
+ #:use-module (knots parallelism)
+ #:use-module (knots resource-pool)
#:use-module (guix-data-service utils)
#:use-module (guix-data-service database)
#:use-module (guix-data-service web sxml)
@@ -229,7 +231,7 @@
(define (render-compare mime-types
query-parameters)
(if (any-invalid-query-parameters? query-parameters)
- (letpar& ((base-job
+ (fibers-let ((base-job
(match (assq-ref query-parameters 'base_commit)
(($ <invalid-query-parameter> value)
(with-resource-from-pool (connection-pool) conn
@@ -275,7 +277,7 @@
#f
#f
#f)))))
- (letpar& ((base-revision-id
+ (fibers-let ((base-revision-id
(with-resource-from-pool (connection-pool) conn
(commit->revision-id
conn
@@ -303,7 +305,7 @@
(version-changes
(package-data-version-changes base-packages-vhash
target-packages-vhash)))
- (letpar& ((lint-warnings-data
+ (fibers-let ((lint-warnings-data
(with-resource-from-pool (connection-pool) conn
(group-list-by-first-n-fields
2
@@ -396,7 +398,7 @@
lint-warnings-data))))
#:extra-headers http-headers-for-unchanging-content))
(else
- (letpar& ((lint-warnings-locale-options
+ (fibers-let ((lint-warnings-locale-options
(map
(match-lambda
((locale)
@@ -449,7 +451,7 @@
(target-branch (assq-ref query-parameters 'target_branch))
(target-datetime (assq-ref query-parameters 'target_datetime))
(locale (assq-ref query-parameters 'locale)))
- (letpar& ((base-revision-details
+ (fibers-let ((base-revision-details
(with-resource-from-pool (connection-pool) conn
(select-guix-revision-for-branch-and-datetime
conn
@@ -624,7 +626,7 @@
'(application/json text/html)
mime-types)
((application/json)
- (letpar& ((base-job
+ (fibers-let ((base-job
(and=> (match (assq-ref query-parameters 'base_commit)
(($ <invalid-query-parameter> value)
(and (string? value) value))
@@ -663,7 +665,7 @@
(base_job . ,base-job)
(target_job . ,target-job)))))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -695,7 +697,7 @@
(limit-results (assq-ref query-parameters 'limit_results)))
(let ((data
(concatenate!
- (par-map&
+ (fibers-map
(lambda (system)
(with-resource-from-pool (connection-pool) conn
(package-derivation-differences-data
@@ -734,7 +736,7 @@
. ,derivation-changes))
#:stream? #t))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -788,7 +790,7 @@
string->symbol))
(after-name (assq-ref query-parameters 'after_name))
(limit-results (assq-ref query-parameters 'limit_results)))
- (letpar&
+ (fibers-let
((base-revision-details
(with-resource-from-pool (connection-pool) conn
(select-guix-revision-for-branch-and-datetime conn
@@ -800,7 +802,7 @@
target-branch
target-datetime))))
(let ((data
- (par-map&
+ (fibers-map
(lambda (system)
(with-resource-from-pool (connection-pool) conn
(package-derivation-differences-data
@@ -875,7 +877,7 @@
(render-json
'((error . "invalid query"))))
(else
- (letpar& ((base-job
+ (fibers-let ((base-job
(match (assq-ref query-parameters 'base_commit)
(($ <invalid-query-parameter> value)
(with-resource-from-pool (connection-pool) conn
@@ -895,7 +897,7 @@
(let ((base-commit (assq-ref query-parameters 'base_commit))
(target-commit (assq-ref query-parameters 'target_commit)))
- (letpar& ((base-revision-id
+ (fibers-let ((base-revision-id
(with-resource-from-pool (connection-pool) conn
(commit->revision-id
conn
@@ -944,7 +946,7 @@
(render-json
'((error . "invalid query"))))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(with-resource-from-pool (connection-pool) conn
list-systems))
(build-server-urls
@@ -963,7 +965,7 @@
(let ((base-commit (assq-ref query-parameters 'base_commit))
(target-commit (assq-ref query-parameters 'target_commit))
(system (assq-ref query-parameters 'system)))
- (letpar& ((data
+ (fibers-let ((data
(with-resource-from-pool (connection-pool) conn
(system-test-derivations-differences-data
conn
@@ -1014,7 +1016,7 @@
(render-json
'((error . "invalid query"))))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(with-resource-from-pool (connection-pool) conn
list-systems))
(build-server-urls
@@ -1035,7 +1037,7 @@
(target-branch (assq-ref query-parameters 'target_branch))
(target-datetime (assq-ref query-parameters 'target_datetime))
(system (assq-ref query-parameters 'system)))
- (letpar&
+ (fibers-let
((base-revision-details
(with-resource-from-pool (connection-pool) conn
(select-guix-revision-for-branch-and-datetime conn
@@ -1046,7 +1048,7 @@
(select-guix-revision-for-branch-and-datetime conn
target-branch
target-datetime))))
- (letpar& ((data
+ (fibers-let ((data
(with-resource-from-pool (connection-pool) conn
(system-test-derivations-differences-data
conn
diff --git a/guix-data-service/web/controller.scm b/guix-data-service/web/controller.scm
index d23c2f3..cdf2318 100644
--- a/guix-data-service/web/controller.scm
+++ b/guix-data-service/web/controller.scm
@@ -35,6 +35,8 @@
#:use-module (texinfo html)
#:use-module (squee)
#:use-module (json)
+ #:use-module (knots parallelism)
+ #:use-module (knots resource-pool)
#:use-module (prometheus)
#:use-module (guix-data-service utils)
#:use-module (guix-data-service config)
@@ -234,7 +236,7 @@
#:always-rollback? #t))
(lambda ()
- (letpar& ((metric-values
+ (fibers-let ((metric-values
(with-exception-handler
(lambda (exn)
(simple-format
@@ -456,12 +458,12 @@
(write-metrics registry port))))))))
(define (render-derivation derivation-file-name)
- (letpar& ((derivation
+ (fibers-let ((derivation
(with-resource-from-pool (connection-pool) conn
(select-derivation-by-file-name conn derivation-file-name))))
(if derivation
- (letpar& ((derivation-inputs
+ (fibers-let ((derivation-inputs
(with-resource-from-pool (connection-pool) conn
(select-derivation-inputs-by-derivation-id
conn
@@ -495,7 +497,7 @@
(select-derivation-by-file-name conn
derivation-file-name))))
(if derivation
- (letpar& ((derivation-inputs
+ (fibers-let ((derivation-inputs
(with-resource-from-pool (connection-pool) conn
(select-derivation-inputs-by-derivation-id
conn
@@ -551,7 +553,7 @@
(select-derivation-by-file-name conn
derivation-file-name))))
(if derivation
- (letpar& ((derivation-inputs
+ (fibers-let ((derivation-inputs
(with-resource-from-pool (connection-pool) conn
(select-derivation-inputs-by-derivation-id
conn
@@ -596,7 +598,7 @@
#:sxml (view-narinfos narinfos)))))
(define (render-store-item filename)
- (letpar& ((derivation
+ (fibers-let ((derivation
(with-resource-from-pool (connection-pool) conn
(select-derivation-by-output-filename conn filename))))
(match derivation
@@ -619,7 +621,7 @@
filename)))
#:extra-headers http-headers-for-unchanging-content))))
(derivations
- (letpar& ((nars
+ (fibers-let ((nars
(with-resource-from-pool (connection-pool) conn
(select-nars-for-output conn filename)))
(builds
@@ -656,7 +658,7 @@
conn
filename))))))))))
(derivations
- (letpar& ((nars
+ (fibers-let ((nars
(with-resource-from-pool (connection-pool) conn
(select-nars-for-output conn filename))))
(render-json
diff --git a/guix-data-service/web/jobs/controller.scm b/guix-data-service/web/jobs/controller.scm
index 7e5084f..96621f9 100644
--- a/guix-data-service/web/jobs/controller.scm
+++ b/guix-data-service/web/jobs/controller.scm
@@ -17,6 +17,8 @@
(define-module (guix-data-service web jobs controller)
#:use-module (ice-9 match)
+ #:use-module (knots parallelism)
+ #:use-module (knots resource-pool)
#:use-module (guix-data-service utils)
#:use-module (guix-data-service database)
#:use-module (guix-data-service web render)
@@ -74,7 +76,7 @@
(define (render-jobs mime-types query-parameters)
(define limit-results (assq-ref query-parameters 'limit_results))
- (letpar& ((jobs
+ (fibers-let ((jobs
(with-resource-from-pool (connection-pool) conn
(select-jobs-and-events
conn
diff --git a/guix-data-service/web/nar/controller.scm b/guix-data-service/web/nar/controller.scm
index e2ace7a..f7edac6 100644
--- a/guix-data-service/web/nar/controller.scm
+++ b/guix-data-service/web/nar/controller.scm
@@ -27,6 +27,8 @@
#:use-module (web uri)
#:use-module (web request)
#:use-module (web response)
+ #:use-module (knots parallelism)
+ #:use-module (knots resource-pool)
#:use-module (guix pki)
#:use-module (guix base32)
#:use-module (guix base64)
@@ -155,7 +157,7 @@
#:code 200
#:headers '((content-type . (application/x-narinfo))))
(let ((derivation-file-name (second derivation)))
- (letpar&
+ (fibers-let
((derivation-text
(with-resource-from-pool (reserved-connection-pool) conn
(select-serialized-derivation-by-file-name
diff --git a/guix-data-service/web/package/controller.scm b/guix-data-service/web/package/controller.scm
index 8dc6b0f..792394c 100644
--- a/guix-data-service/web/package/controller.scm
+++ b/guix-data-service/web/package/controller.scm
@@ -19,6 +19,8 @@
#:use-module (ice-9 match)
#:use-module (web uri)
#:use-module (web request)
+ #:use-module (knots parallelism)
+ #:use-module (knots resource-pool)
#:use-module (guix-data-service utils)
#:use-module (guix-data-service database)
#:use-module (guix-data-service web render)
@@ -40,7 +42,7 @@
request
`((system ,parse-system #:default "x86_64-linux")
(target ,parse-target #:default "")))))
- (letpar& ((package-versions-with-branches
+ (fibers-let ((package-versions-with-branches
(with-resource-from-pool (connection-pool) conn
(branches-by-package-version conn name
(assq-ref parsed-query-parameters
diff --git a/guix-data-service/web/repository/controller.scm b/guix-data-service/web/repository/controller.scm
index 0d9434c..101687c 100644
--- a/guix-data-service/web/repository/controller.scm
+++ b/guix-data-service/web/repository/controller.scm
@@ -19,6 +19,8 @@
#:use-module (ice-9 match)
#:use-module (web uri)
#:use-module (web request)
+ #:use-module (knots parallelism)
+ #:use-module (knots resource-pool)
#:use-module (guix-data-service utils)
#:use-module (guix-data-service database)
#:use-module (guix-data-service web render)
@@ -47,7 +49,7 @@
(match method-and-path-components
(('GET "repositories")
- (letpar& ((git-repositories
+ (fibers-let ((git-repositories
(call-with-resource-from-pool (connection-pool)
all-git-repositories)))
(case (most-appropriate-mime-type
@@ -71,7 +73,7 @@
(match (with-resource-from-pool (connection-pool) conn
(select-git-repository conn id))
((label url cgit-url-base fetch-with-authentication? poll-interval)
- (letpar& ((branches
+ (fibers-let ((branches
(with-resource-from-pool (connection-pool) conn
(all-branches-with-most-recent-commit
conn
@@ -119,7 +121,7 @@
`((after_date ,parse-datetime)
(before_date ,parse-datetime)
(limit_results ,parse-result-limit #:default 100)))))
- (letpar& ((revisions
+ (fibers-let ((revisions
(with-resource-from-pool (connection-pool) conn
(most-recent-commits-for-branch
conn
@@ -160,7 +162,7 @@
parsed-query-parameters
revisions)))))))))
(('GET "repository" repository-id "branch" branch-name "package" package-name)
- (letpar& ((package-versions
+ (fibers-let ((package-versions
(with-resource-from-pool (connection-pool) conn
(package-versions-for-branch conn
(string->number repository-id)
@@ -211,7 +213,7 @@
(parse-query-parameters
request
`((system ,parse-system #:default "x86_64-linux")))))
- (letpar& ((system-test-history
+ (fibers-let ((system-test-history
(with-resource-from-pool (connection-pool) conn
(system-test-derivations-for-branch
conn
@@ -256,7 +258,7 @@
valid-systems
system-test-history)))))))
(('GET "repository" repository-id "branch" branch-name "latest-processed-revision")
- (letpar& ((commit-hash
+ (fibers-let ((commit-hash
(with-resource-from-pool (connection-pool) conn
(latest-processed-commit-for-branch conn
repository-id
@@ -273,7 +275,7 @@
repository-id
branch-name))))
(('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "packages")
- (letpar& ((commit-hash
+ (fibers-let ((commit-hash
(with-resource-from-pool (connection-pool) conn
(latest-processed-commit-for-branch conn
repository-id
@@ -313,7 +315,7 @@
repository-id
branch-name))))
(('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "package-derivations")
- (letpar& ((commit-hash
+ (fibers-let ((commit-hash
(with-resource-from-pool (connection-pool) conn
(latest-processed-commit-for-branch conn
repository-id
@@ -422,7 +424,7 @@
branch-name))))
(('GET "repository" repository-id "branch" branch-name
"latest-processed-revision" "system-tests")
- (letpar& ((commit-hash
+ (fibers-let ((commit-hash
(with-resource-from-pool (connection-pool) conn
(latest-processed-commit-for-branch conn
repository-id
@@ -440,7 +442,7 @@
repository-id
branch-name))))
(('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "package-reproducibility")
- (letpar& ((commit-hash
+ (fibers-let ((commit-hash
(with-resource-from-pool (connection-pool) conn
(latest-processed-commit-for-branch conn
repository-id
@@ -462,7 +464,7 @@
repository-id
branch-name))))
(('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "package-substitute-availability")
- (letpar& ((commit-hash
+ (fibers-let ((commit-hash
(with-resource-from-pool (connection-pool) conn
(latest-processed-commit-for-branch conn
repository-id
@@ -476,7 +478,7 @@
branch-name))))
(('GET "repository" repository-id "branch" branch-name "latest-processed-revision"
"lint-warnings")
- (letpar& ((commit-hash
+ (fibers-let ((commit-hash
(with-resource-from-pool (connection-pool) conn
(latest-processed-commit-for-branch conn
repository-id
@@ -510,7 +512,7 @@
repository-id
branch-name))))
(('GET "repository" repository-id "branch" branch-name "latest-processed-revision" "package" name version)
- (letpar& ((commit-hash
+ (fibers-let ((commit-hash
(with-resource-from-pool (connection-pool) conn
(latest-processed-commit-for-branch conn
repository-id
@@ -583,7 +585,7 @@
(assq-ref parsed-query-parameters 'system))
(target
(assq-ref parsed-query-parameters 'target)))
- (letpar&
+ (fibers-let
((package-derivations
(with-resource-from-pool (connection-pool) conn
(package-derivations-for-branch conn
@@ -620,7 +622,7 @@
. ,(list->vector builds)))))
package-derivations))))))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -657,7 +659,7 @@
(assq-ref parsed-query-parameters 'target))
(output-name
(assq-ref parsed-query-parameters 'output)))
- (letpar&
+ (fibers-let
((package-outputs
(with-resource-from-pool (connection-pool) conn
(package-outputs-for-branch conn
@@ -695,7 +697,7 @@
. ,(list->vector builds)))))
package-outputs))))))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
diff --git a/guix-data-service/web/revision/controller.scm b/guix-data-service/web/revision/controller.scm
index 14a721a..c4a25f7 100644
--- a/guix-data-service/web/revision/controller.scm
+++ b/guix-data-service/web/revision/controller.scm
@@ -24,6 +24,8 @@
#:use-module (texinfo html)
#:use-module (texinfo plain-text)
#:use-module (json)
+ #:use-module (knots parallelism)
+ #:use-module (knots resource-pool)
#:use-module (guix-data-service utils)
#:use-module (guix-data-service database)
#:use-module (guix-data-service web render)
@@ -84,7 +86,7 @@
status))))
(define (parse-build-server v)
- (letpar& ((build-servers
+ (fibers-let ((build-servers
(call-with-resource-from-pool (connection-pool)
select-build-servers)))
(or (any (match-lambda
@@ -395,7 +397,7 @@
`((unknown_commit . ,commit-hash))
#:code 404))
(else
- (letpar& ((job
+ (fibers-let ((job
(with-resource-from-pool (connection-pool) conn
(select-job-for-commit conn commit-hash)))
(git-repositories-and-branches
@@ -423,7 +425,7 @@
`((unknown_commit . ,commit-hash))
#:code 404))
(else
- (letpar& ((job
+ (fibers-let ((job
(with-resource-from-pool (connection-pool) conn
(select-job-for-commit conn commit-hash)))
(git-repositories-and-branches
@@ -448,7 +450,7 @@
(header-text
`("Revision " (samp ,commit-hash)))
(max-age cache-control-default-max-age))
- (letpar& ((packages-count
+ (fibers-let ((packages-count
(with-resource-from-pool (connection-pool) conn
(count-packages-in-revision conn commit-hash)))
(git-repositories-and-branches
@@ -514,7 +516,7 @@
`("Revision " (samp ,commit-hash)))
(header-link
(string-append "/revision/" commit-hash)))
- (letpar& ((system-tests
+ (fibers-let ((system-tests
(with-resource-from-pool (connection-pool) conn
(select-system-tests-for-guix-revision
conn
@@ -542,7 +544,7 @@
(builds . ,(list->vector builds)))))
system-tests))))))
(else
- (letpar& ((git-repositories
+ (fibers-let ((git-repositories
(with-resource-from-pool (connection-pool) conn
(git-repositories-containing-commit conn
commit-hash)))
@@ -568,7 +570,7 @@
(header-link
(string-append "/revision/"
commit-hash)))
- (letpar& ((channel-instances
+ (fibers-let ((channel-instances
(with-resource-from-pool (connection-pool) conn
(select-channel-instances-for-guix-revision conn commit-hash))))
(case (most-appropriate-mime-type
@@ -596,7 +598,7 @@
(define* (render-revision-package-substitute-availability mime-types
commit-hash
#:key path-base)
- (letpar& ((substitute-availability
+ (fibers-let ((substitute-availability
(with-resource-from-pool (connection-pool) conn
(select-package-output-availability-for-revision conn
commit-hash)))
@@ -610,7 +612,7 @@
((application/json)
(render-json
`((commit . ,commit-hash)
- (substitute_servers
+ (xsubstitute_servers
. ,(list->vector
(map (match-lambda
((build-server-id . data)
@@ -642,7 +644,7 @@
(header-link
(string-append "/revision/"
commit-hash)))
- (letpar& ((output-consistency
+ (fibers-let ((output-consistency
(with-resource-from-pool (connection-pool) conn
(select-output-consistency-for-revision conn commit-hash))))
(case (most-appropriate-mime-type
@@ -676,7 +678,7 @@
#:sxml (view-revision-news commit-hash
query-parameters
'()))))
- (letpar& ((news-entries
+ (fibers-let ((news-entries
(with-resource-from-pool (connection-pool) conn
(select-channel-news-entries-contained-in-guix-revision
conn
@@ -735,7 +737,7 @@
99999)) ; TODO There shouldn't be a limit
(fields (assq-ref query-parameters 'field))
(locale (assq-ref query-parameters 'locale)))
- (letpar&
+ (fibers-let
((packages
(with-resource-from-pool (connection-pool) conn
(if search-query
@@ -832,7 +834,7 @@
"/revision/" commit-hash))
(header-text
`("Revision " (samp ,commit-hash))))
- (letpar& ((package-synopsis-counts
+ (fibers-let ((package-synopsis-counts
(with-resource-from-pool (connection-pool) conn
(synopsis-counts-by-locale conn
(commit->revision-id
@@ -872,7 +874,7 @@
(header-link
(string-append
"/revision/" commit-hash)))
- (letpar& ((package-versions
+ (fibers-let ((package-versions
(with-resource-from-pool (connection-pool) conn
(select-package-versions-for-revision conn
commit-hash
@@ -929,7 +931,7 @@
(define has-replacement? (assq-ref query-parameters 'has_replacement))
- (letpar& ((metadata
+ (fibers-let ((metadata
(with-resource-from-pool (connection-pool) conn
(select-package-metadata-by-revision-name-and-version
conn
@@ -1041,7 +1043,7 @@
(render-json
`((error . "invalid query"))))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -1067,7 +1069,7 @@
(assq-ref query-parameters 'search_query))
(fields
(assq-ref query-parameters 'field)))
- (letpar&
+ (fibers-let
((derivations
(if search-query
(with-resource-from-pool (connection-pool) conn
@@ -1090,7 +1092,7 @@
#:after-name (assq-ref query-parameters 'after_name)
#:include-builds? (member "builds" fields)))
(concatenate!
- (par-map&
+ (fibers-map
(lambda (system)
(with-resource-from-pool (connection-pool) conn
(select-package-derivations-in-revision
@@ -1149,7 +1151,7 @@
derivations))))
#:stream? #t))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -1187,7 +1189,7 @@
(render-json
`((error . "invalid query"))))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -1213,7 +1215,7 @@
(assq-ref query-parameters 'search_query))
(fields
(assq-ref query-parameters 'field)))
- (letpar&
+ (fibers-let
((derivations
(with-resource-from-pool (connection-pool) conn
(select-fixed-output-package-derivations-in-revision
@@ -1242,7 +1244,7 @@
(render-json
`((derivations . ,(list->vector derivations)))))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -1284,7 +1286,7 @@
(render-json
`((error . "invalid query"))))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -1308,7 +1310,7 @@
(assq-ref query-parameters 'all_results))
(fields
(assq-ref query-parameters 'field)))
- (letpar&
+ (fibers-let
((derivation-outputs
(with-resource-from-pool (connection-pool) conn
(select-derivation-outputs-in-revision
@@ -1390,7 +1392,7 @@
"not-matching")))))))
derivation-outputs))))))
(else
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -1419,7 +1421,7 @@
(header-link
(string-append "/revision/" commit-hash)))
(if (any-invalid-query-parameters? query-parameters)
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -1437,7 +1439,7 @@
'())))
(let ((system (assq-ref query-parameters 'system))
(target (assq-ref query-parameters 'target)))
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -1492,7 +1494,7 @@
(header-link
(string-append "/revision/" commit-hash)))
(if (any-invalid-query-parameters? query-parameters)
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -1509,7 +1511,7 @@
'())))
(let ((system (assq-ref query-parameters 'system))
(target (assq-ref query-parameters 'target)))
- (letpar& ((systems
+ (fibers-let ((systems
(call-with-resource-from-pool (connection-pool)
list-systems))
(targets
@@ -1592,7 +1594,7 @@
(linters (assq-ref query-parameters 'linter))
(message-query (assq-ref query-parameters 'message_query))
(fields (assq-ref query-parameters 'field)))
- (letpar&
+ (fibers-let
((git-repositories
(with-resource-from-pool (connection-pool) conn
(git-repositories-containing-commit conn
diff --git a/guix-data-service/web/server.scm b/guix-data-service/web/server.scm
index 4e08161..a1a888b 100644
--- a/guix-data-service/web/server.scm
+++ b/guix-data-service/web/server.scm
@@ -30,6 +30,8 @@
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers conditions)
+ #:use-module (knots web-server)
+ #:use-module (knots resource-pool)
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:use-module (prometheus)
@@ -246,7 +248,7 @@ port. Also, the port used can be changed by passing the --port option.\n"
(make-counter-metric registry
"resource_pool_checkout_timeouts_total"
#:labels '(pool_name))))
- (%resource-pool-timeout-handler
+ (resource-pool-default-timeout-handler
(lambda (pool proc timeout)
(let ((pool-name
(cond
@@ -269,11 +271,12 @@ port. Also, the port used can be changed by passing the --port option.\n"
request-scheduler)
(let ((render-metrics (make-render-metrics registry)))
- (run-server/patched
- (lambda (request body)
+ (run-knots-web-server
+ (lambda (request)
(metric-increment requests-metric)
- (let ((reply (make-channel)))
+ (let ((body (read-request-body request))
+ (reply (make-channel)))
(spawn-fiber
(lambda ()
(call-with-values
diff --git a/guix-dev.scm b/guix-dev.scm
index 8d33657..eec15ec 100644
--- a/guix-dev.scm
+++ b/guix-dev.scm
@@ -41,6 +41,38 @@
(gnu packages ruby)
(srfi srfi-1))
+(define guile-knots
+ (let ((commit "0fab93e9ff5b16813ae1356c13d3c974d7277d81")
+ (revision "1"))
+ (package
+ (name "guile-knots")
+ (version (git-version "0" revision commit))
+ (source (origin
+ (method git-fetch)
+ (uri (git-reference
+ (url "https://git.cbaines.net/git/guile/knots")
+ (commit commit)))
+ (sha256
+ (base32
+ "1x0wirq0db2704784ig00kz5kh8j6szp2gwm67wn714m1jbhz9ky"))
+ (file-name (string-append name "-" version "-checkout"))))
+ (build-system gnu-build-system)
+ (native-inputs
+ (list pkg-config
+ autoconf
+ automake
+ guile-3.0
+ guile-fibers))
+ (inputs
+ (list guile-3.0))
+ (propagated-inputs
+ (list guile-fibers))
+ (home-page "https://git.cbaines.net/guile/knots")
+ (synopsis "Patterns and functionality to use with Guile Fibers")
+ (description
+ "")
+ (license license:gpl3+))))
+
(package
(name "guix-data-service")
(version "0.0.0")
@@ -52,6 +84,7 @@
guile-json-4
guile-squee
guile-fibers
+ guile-knots
guile-gcrypt
guile-lzlib
guile-readline