aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.dir-locals.el4
-rw-r--r--guix-data-service/data-deletion.scm17
-rw-r--r--guix-data-service/database.scm95
-rw-r--r--guix-data-service/utils.scm102
4 files changed, 112 insertions, 106 deletions
diff --git a/.dir-locals.el b/.dir-locals.el
index 5d052d8..f7cbfb5 100644
--- a/.dir-locals.el
+++ b/.dir-locals.el
@@ -7,7 +7,9 @@
(scheme-mode
(indent-tabs-mode)
(eval put 'with-time-logging 'scheme-indent-function 1)
- (eval put 'make-parameter 'scheme-indent-function 1))
+ (eval put 'make-parameter 'scheme-indent-function 1)
+ (eval put 'letpar 'scheme-indent-function 1)
+ (eval put 'letpar& 'scheme-indent-function 1))
(texinfo-mode
(indent-tabs-mode)
(fill-column . 72)))
diff --git a/guix-data-service/data-deletion.scm b/guix-data-service/data-deletion.scm
index 6c4e0b9..197cef1 100644
--- a/guix-data-service/data-deletion.scm
+++ b/guix-data-service/data-deletion.scm
@@ -448,17 +448,16 @@ WHERE NOT EXISTS (
(lambda (count result)
(+ result count))
0
- (par-map (lambda (derivation-id)
- (with-postgresql-transaction/through-channel
- conn-channel
- (lambda (conn)
- (exec-query
- conn
- "
+ (par-map& (lambda (derivation-id)
+ (with-thread-postgresql-connection
+ (lambda (conn)
+ (exec-query
+ conn
+ "
SET CONSTRAINTS derivations_by_output_details_set_derivation_id_fkey DEFERRED")
- (maybe-delete-derivation conn derivation-id))))
- derivations))))
+ (maybe-delete-derivation conn derivation-id))))
+ derivations))))
(simple-format (current-error-port)
"Deleted ~A derivations\n"
deleted-count)
diff --git a/guix-data-service/database.scm b/guix-data-service/database.scm
index 89b1a09..4d1001b 100644
--- a/guix-data-service/database.scm
+++ b/guix-data-service/database.scm
@@ -20,9 +20,6 @@
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (squee)
- #:use-module (fibers)
- #:use-module (fibers channels)
- #:use-module (fibers conditions)
#:use-module (guix-data-service config)
#:export (with-postgresql-connection
@@ -136,98 +133,6 @@
(f conn)))))
-(define* (make-postgresql-connection-channel name
- #:key
- (statement-timeout #f)
- (threads 2))
- (parameterize (((@@ (fibers internal) current-fiber) #f))
- (let ((channel (make-channel)))
- (for-each
- (lambda _
- (call-with-new-thread
- (lambda ()
- (with-postgresql-connection
- name
- (lambda (conn)
- (let loop ()
- (match (get-message channel)
- (((? channel? reply) f (? boolean? allways-rollback?))
- (put-message
- reply
- (with-exception-handler
- (lambda (exn)
- (cons 'worker-thread-error exn))
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "postgresql connection thread: exception: ~A\n"
- exn)
- (backtrace)
- (raise-exception exn))
- (lambda ()
- (call-with-values
- (lambda ()
- (with-postgresql-transaction
- conn
- (lambda (conn)
- (f conn))))
- (lambda vals vals)))))
- #:unwind? #t))
- (loop))
- (((? channel? reply) . (? list? args))
- (put-message
- reply
- (with-exception-handler
- (lambda (exn)
- (cons 'worker-thread-error exn))
- (lambda ()
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "postgresql connection thread: exception: ~A\n"
- exn)
- (backtrace)
- (raise-exception exn))
- (lambda ()
- (call-with-values
- (lambda ()
- (apply exec-query
- conn
- args))
- (lambda vals vals)))))
- #:unwind? #t))
- (loop))
- (_ #f))))
- #:statement-timeout statement-timeout))))
- (iota threads))
- channel)))
-
-(define (close-postgresql-connection-channel channel)
- (put-message channel #f))
-
-(define (exec-query/through-channel channel . args)
- (let ((reply (make-channel)))
- (put-message channel (cons reply args))
- (match (get-message reply)
- (('worker-thread-error . exn)
- (raise-exception exn))
- (result
- (apply values result)))))
-
-(define* (with-postgresql-transaction/through-channel channel
- f
- #:key always-rollback?)
- (let ((reply (make-channel)))
- (put-message channel (list reply f always-rollback?))
- (match (get-message reply)
- (('worker-thread-error . exn)
- (raise-exception exn))
- (result
- (apply values result)))))
-
(define* (with-postgresql-transaction conn f
#:key always-rollback?)
(exec-query conn "BEGIN;")
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index 738f839..855c819 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -17,9 +17,18 @@
(define-module (guix-data-service utils)
#:use-module (srfi srfi-11)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 threads)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers conditions)
#:export (call-with-time-logging
with-time-logging
- prevent-inlining-for-tests))
+ prevent-inlining-for-tests
+
+ parallel-via-thread-pool-channel
+ par-map&
+ letpar&))
(define (call-with-time-logging action thunk)
(simple-format #t "debug: Starting ~A\n" action)
@@ -38,3 +47,94 @@
(define-syntax-rule (prevent-inlining-for-tests var)
(set! var var))
+
+
+(define* (make-thread-pool-channel #:key (threads 8))
+ (parameterize (((@@ (fibers internal) current-fiber) #f))
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda _
+ (call-with-new-thread
+ (lambda ()
+ (let loop ()
+ (match (get-message channel)
+ (((? channel? reply) . (? procedure? proc))
+ (put-message
+ reply
+ (with-exception-handler
+ (lambda (exn)
+ (cons 'worker-thread-error exn))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "worker thread: exception: ~A\n"
+ exn)
+ (backtrace)
+ (raise-exception exn))
+ (lambda ()
+ (call-with-values
+ proc
+ (lambda vals
+ vals)))))
+ #:unwind? #t))
+ (loop))
+ (_ #f))))))
+ (iota threads))
+ channel)))
+
+(define %thread-pool-mutex (make-mutex))
+(define %thread-pool-channel #f)
+
+(define (make-thread-pool-channel!')
+ (with-mutex %thread-pool-mutex
+ (unless %thread-pool-channel
+ (set! %thread-pool-channel (make-thread-pool-channel))
+ (set! make-thread-pool-channel! (lambda () #t)))))
+
+(define make-thread-pool-channel!
+ (lambda () (make-thread-pool-channel!')))
+
+(define (defer-to-thread-pool-channel thunk)
+ (make-thread-pool-channel!)
+ (let ((reply (make-channel)))
+ (put-message %thread-pool-channel (cons reply thunk))
+ reply))
+
+(define (fetch-result-of-defered-thunk reply-channel)
+ (match (get-message reply-channel)
+ (('worker-thread-error . exn)
+ (raise-exception exn))
+ (result
+ (apply values result))))
+
+(define-syntax parallel-via-thread-pool-channel
+ (lambda (x)
+ (syntax-case x ()
+ ((_ e0 ...)
+ (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
+ #'(let ((tmp0 (defer-to-thread-pool-channel
+ (lambda ()
+ e0)))
+ ...)
+ (values (fetch-result-of-defered-thunk tmp0) ...)))))))
+
+(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
+ (call-with-values
+ (lambda () (parallel-via-thread-pool-channel e ...))
+ (lambda (v ...)
+ b0 b1 ...)))
+
+(define (par-mapper' mapper cons)
+ (lambda (proc . lists)
+ (let loop ((lists lists))
+ (match lists
+ (((heads tails ...) ...)
+ (let ((tail (defer-to-thread-pool-channel (loop tails)))
+ (head (apply proc heads)))
+ (cons head (fetch-result-of-defered-thunk tail))))
+ (_
+ '())))))
+
+(define par-map& (par-mapper' map cons))