aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2020-10-01 19:13:30 +0100
committerChristopher Baines <mail@cbaines.net>2020-10-01 19:13:30 +0100
commit614f9888a58fbd7b2e708cbbf0262f3eb42d2d49 (patch)
treef43ca73330755302e2c45f68e65a8947b851c591
parent3330f034a486a05ab1037c5ecc6814a0939ef441 (diff)
downloaddata-service-614f9888a58fbd7b2e708cbbf0262f3eb42d2d49.tar
data-service-614f9888a58fbd7b2e708cbbf0262f3eb42d2d49.tar.gz
Add some utilities to use PostgreSQL/Squee through a channel
To allow for some concurrency.
-rw-r--r--guix-data-service/database.scm102
1 files changed, 102 insertions, 0 deletions
diff --git a/guix-data-service/database.scm b/guix-data-service/database.scm
index df4daac..8298b93 100644
--- a/guix-data-service/database.scm
+++ b/guix-data-service/database.scm
@@ -18,9 +18,19 @@
(define-module (guix-data-service database)
#:use-module (system foreign)
#:use-module (ice-9 match)
+ #:use-module (ice-9 threads)
#:use-module (squee)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers conditions)
#:use-module (guix-data-service config)
#:export (with-postgresql-connection
+
+ make-postgresql-connection-channel
+ close-postgresql-connection-channel
+ exec-query/through-channel
+ with-postgresql-transaction/through-channel
+
with-postgresql-transaction
check-test-database!
@@ -61,6 +71,98 @@
(lambda (key . args)
(pg-conn-finish conn)))))
+(define* (make-postgresql-connection-channel name
+ #:key
+ (statement-timeout #f)
+ (threads 4))
+ (parameterize (((@@ (fibers internal) current-fiber) #f))
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda _
+ (call-with-new-thread
+ (lambda ()
+ (with-postgresql-connection
+ name
+ (lambda (conn)
+ (let loop ()
+ (match (get-message channel)
+ (((? channel? reply) f (? boolean? allways-rollback?))
+ (put-message
+ reply
+ (with-exception-handler
+ (lambda (exn)
+ (cons 'worker-thread-error exn))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "postgresql connection thread: exception: ~A\n"
+ exn)
+ (backtrace)
+ (raise-exception exn))
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (with-postgresql-transaction
+ conn
+ (lambda (conn)
+ (f conn))))
+ (lambda vals vals)))))
+ #:unwind? #t))
+ (loop))
+ (((? channel? reply) . (? list? args))
+ (put-message
+ reply
+ (with-exception-handler
+ (lambda (exn)
+ (cons 'worker-thread-error exn))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "postgresql connection thread: exception: ~A\n"
+ exn)
+ (backtrace)
+ (raise-exception exn))
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (apply exec-query
+ conn
+ args))
+ (lambda vals vals)))))
+ #:unwind? #t))
+ (loop))
+ (_ #f))))
+ #:statement-timeout statement-timeout))))
+ (iota threads))
+ channel)))
+
+(define (close-postgresql-connection-channel channel)
+ (put-message channel #f))
+
+(define (exec-query/through-channel channel . args)
+ (let ((reply (make-channel)))
+ (put-message channel (cons reply args))
+ (match (get-message reply)
+ (('worker-thread-error . exn)
+ (raise-exception exn))
+ (result
+ (apply values result)))))
+
+(define* (with-postgresql-transaction/through-channel channel
+ f
+ #:key always-rollback?)
+ (let ((reply (make-channel)))
+ (put-message channel (list reply f always-rollback?))
+ (match (get-message reply)
+ (('worker-thread-error . exn)
+ (raise-exception exn))
+ (result
+ (apply values result)))))
+
(define* (with-postgresql-transaction conn f
#:key always-rollback?)
(exec-query conn "BEGIN;")