From 614f9888a58fbd7b2e708cbbf0262f3eb42d2d49 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 1 Oct 2020 19:13:30 +0100 Subject: Add some utilities to use PostgreSQL/Squee through a channel To allow for some concurrency. --- guix-data-service/database.scm | 102 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/guix-data-service/database.scm b/guix-data-service/database.scm index df4daac..8298b93 100644 --- a/guix-data-service/database.scm +++ b/guix-data-service/database.scm @@ -18,9 +18,19 @@ (define-module (guix-data-service database) #:use-module (system foreign) #:use-module (ice-9 match) + #:use-module (ice-9 threads) #:use-module (squee) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers conditions) #:use-module (guix-data-service config) #:export (with-postgresql-connection + + make-postgresql-connection-channel + close-postgresql-connection-channel + exec-query/through-channel + with-postgresql-transaction/through-channel + with-postgresql-transaction check-test-database! @@ -61,6 +71,98 @@ (lambda (key . args) (pg-conn-finish conn))))) +(define* (make-postgresql-connection-channel name + #:key + (statement-timeout #f) + (threads 4)) + (parameterize (((@@ (fibers internal) current-fiber) #f)) + (let ((channel (make-channel))) + (for-each + (lambda _ + (call-with-new-thread + (lambda () + (with-postgresql-connection + name + (lambda (conn) + (let loop () + (match (get-message channel) + (((? channel? reply) f (? boolean? allways-rollback?)) + (put-message + reply + (with-exception-handler + (lambda (exn) + (cons 'worker-thread-error exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "postgresql connection thread: exception: ~A\n" + exn) + (backtrace) + (raise-exception exn)) + (lambda () + (call-with-values + (lambda () + (with-postgresql-transaction + conn + (lambda (conn) + (f conn)))) + (lambda vals vals))))) + #:unwind? #t)) + (loop)) + (((? channel? reply) . (? list? args)) + (put-message + reply + (with-exception-handler + (lambda (exn) + (cons 'worker-thread-error exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "postgresql connection thread: exception: ~A\n" + exn) + (backtrace) + (raise-exception exn)) + (lambda () + (call-with-values + (lambda () + (apply exec-query + conn + args)) + (lambda vals vals))))) + #:unwind? #t)) + (loop)) + (_ #f)))) + #:statement-timeout statement-timeout)))) + (iota threads)) + channel))) + +(define (close-postgresql-connection-channel channel) + (put-message channel #f)) + +(define (exec-query/through-channel channel . args) + (let ((reply (make-channel))) + (put-message channel (cons reply args)) + (match (get-message reply) + (('worker-thread-error . exn) + (raise-exception exn)) + (result + (apply values result))))) + +(define* (with-postgresql-transaction/through-channel channel + f + #:key always-rollback?) + (let ((reply (make-channel))) + (put-message channel (list reply f always-rollback?)) + (match (get-message reply) + (('worker-thread-error . exn) + (raise-exception exn)) + (result + (apply values result))))) + (define* (with-postgresql-transaction conn f #:key always-rollback?) (exec-query conn "BEGIN;") -- cgit v1.2.3