aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMathieu Othacehe <othacehe@gnu.org>2020-10-14 13:49:03 +0200
committerMathieu Othacehe <othacehe@gnu.org>2020-10-14 14:15:09 +0200
commitb67f38a7b91c8605a3ae9eba1e2bd3da4b579622 (patch)
tree4671725a8248dcb66e6a843321a85de715077657 /src
parent514f20a9b53ea575078ab9a413d38646bb48aa0b (diff)
downloadcuirass-b67f38a7b91c8605a3ae9eba1e2bd3da4b579622.tar
cuirass-b67f38a7b91c8605a3ae9eba1e2bd3da4b579622.tar.gz
Queue write operations.
SQLite only allows one concurrent write query operation. Having multiple database workers calling "db-update-build-status!", will thus increase worker starvation. Every write operation will also be done is a single transaction. For those reasons, create a database worker dedicated to write queries. Have this worker queue work and issue all the queued work queries in a single transaction. * .dir-locals.el: Add with-db-writer-worker-thread. * src/cuirass/database.scm (with-queue-writer-worker): Rename "with-registration-workers" macro. (%db-writer-channel): Rename "%db-registration-channel" variable. (with-queue-writer-worker): Rename "with-registration-workers". (db-register-builds): Use "with-db-writer-worker-thread" instead of "with-db-registration-worker-thread". (db-update-build-status!): Ditto * src/cuirass/utils.scm (make-worker-thread-channel): Add "queue-size" and "queue-proc" arguments. (call-with-worker-thread): Add "options" argument. * bin/cuirass.in (main): Use "with-queue-writer-worker" instead of "with-registration-workers". Modify the macro scope to include all the possible write operations.
Diffstat (limited to 'src')
-rw-r--r--src/cuirass/database.scm58
-rw-r--r--src/cuirass/utils.scm61
2 files changed, 84 insertions, 35 deletions
diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm
index 43d24a9..9c5317e 100644
--- a/src/cuirass/database.scm
+++ b/src/cuirass/database.scm
@@ -93,7 +93,7 @@
;; Macros.
with-db-worker-thread
with-database
- with-registration-workers))
+ with-queue-writer-worker))
(define (%sqlite-exec db sql . args)
"Evaluate the given SQL query with the given ARGS. Return the list of
@@ -188,7 +188,7 @@ specified."
(define %db-channel
(make-parameter #f))
-(define %db-registration-channel
+(define %db-writer-channel
(make-parameter #f))
(define %record-events?
@@ -219,14 +219,17 @@ connection."
(number->string receive-timeout)
caller-name))))))
-(define-syntax-rule (with-db-registration-worker-thread db exp ...)
- "Similar to WITH-DB-WORKER-THREAD but evaluates EXP in database workers
-dedicated to evaluation registration. It is expected those workers to be busy
-for long durations as registration involves running a large number of SQL
-queries. For this reason, do not setup a timeout here."
- (call-with-worker-thread
- (%db-registration-channel)
- (lambda (db) exp ...)))
+(define-syntax with-db-writer-worker-thread
+ (syntax-rules ()
+ "Similar to WITH-DB-WORKER-THREAD but evaluates EXP in a database worker
+dedicated to writing. EXP evaluation is queued unless #:force? is set."
+ ((_ db #:force? force exp ...)
+ (call-with-worker-thread
+ (%db-writer-channel)
+ (lambda (db) exp ...)
+ #:options `((#:force? . ,force))))
+ ((_ db exp ...)
+ (with-db-writer-worker-thread db #:force? #f exp ...))))
(define (read-sql-file file-name)
"Return a list of string containing SQL instructions from FILE-NAME."
@@ -553,16 +556,26 @@ fibers."
(min (current-processor-count) 4))))
body ...))
-(define-syntax-rule (with-registration-workers body ...)
- "Run BODY with %DB-REGISTRATION-CHANNEL being dynamically bound to a channel
-providing worker threads that allow registration database operations to run
-without interfering with fibers."
- (parameterize ((%db-registration-channel
+(define-syntax-rule (with-queue-writer-worker body ...)
+ "Run BODY with %DB-WRITER-CHANNEL being dynamically bound to a channel
+providing a worker thread that allow database write operations to run
+without interfering with fibers.
+
+The worker will queue write operations and run them in a single transaction
+when the queue is full. As write operations are exclusive in SQLite, do not
+allocate more than one worker."
+ (parameterize ((%db-writer-channel
(make-worker-thread-channel
(lambda ()
(list (db-open)))
- #:parallelism
- (min (current-processor-count) 4))))
+ #:parallelism 1
+ #:queue-size 100
+ #:queue-proc
+ (lambda (db run-queue)
+ (log-message "Running writer queue.")
+ (sqlite-exec db "BEGIN TRANSACTION;")
+ (run-queue)
+ (sqlite-exec db "COMMIT;")))))
body ...))
(define* (read-quoted-string #:optional (port (current-input-port)))
@@ -693,10 +706,11 @@ path) VALUES ("
(#:stoptime . 0))))
(db-add-build build)))))
- ;; New builds registration involves running a large number of SQL queries.
- ;; To keep database workers available, use specific database workers
- ;; dedicated to evaluation registration.
- (with-db-registration-worker-thread db
+ ;; Use the database worker dedicated to write queries. We don't want this
+ ;; query to be queued as it is already a quite large transaction by itself,
+ ;; so pass the #:FORCE? option.
+ (with-db-writer-worker-thread db
+ #:force? #t
(log-message "Registering builds for evaluation ~a." eval-id)
(sqlite-exec db "BEGIN TRANSACTION;")
(let ((derivations (filter-map register jobs)))
@@ -717,7 +731,7 @@ log file for DRV."
(,(build-status failed-other) . "failed (other)")
(,(build-status canceled) . "canceled")))
- (with-db-worker-thread db
+ (with-db-writer-worker-thread db
(if (= status (build-status started))
(begin
(sqlite-exec db "UPDATE Builds SET starttime=" now ", status="
diff --git a/src/cuirass/utils.scm b/src/cuirass/utils.scm
index 8eb0ed2..f32e3a1 100644
--- a/src/cuirass/utils.scm
+++ b/src/cuirass/utils.scm
@@ -106,9 +106,19 @@ delimited continuations and fibers."
(make-parameter #f))
(define* (make-worker-thread-channel initializer
- #:key (parallelism 1))
+ #:key
+ (parallelism 1)
+ queue-size
+ (queue-proc (const #t)))
"Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
+arguments of the worker thread procedure. This procedure supports deferring
+work sent to the worker. If QUEUE-SIZE is set, each work query will be
+appended to a queue that will be run once it reaches QUEUE-SIZE elements.
+
+When that happens, the QUEUE-PROC procedure is called with %WORKER-THREAD-ARGS
+and a procedure running the queued work as arguments. The worker thread can
+be passed options. When #:FORCE? option is set, the worker runs the sent work
+immediately even if QUEUE-SIZE has been set."
(parameterize (((@@ (fibers internal) current-fiber) #f))
(let ((channel (make-channel)))
(for-each
@@ -117,16 +127,37 @@ arguments of the worker thread procedure."
(call-with-new-thread
(lambda ()
(parameterize ((%worker-thread-args args))
- (let loop ()
+ (let loop ((queue '()))
(match (get-message channel)
- (((? channel? reply) . (? procedure? proc))
- (put-message reply
- (catch #t
- (lambda ()
- (apply proc args))
- (lambda (key . args)
- (cons* 'worker-thread-error key args))))))
- (loop)))))))
+ (((? channel? reply) options (? procedure? proc))
+ (put-message
+ reply
+ (catch #t
+ (lambda ()
+ (cond
+ ((or (not queue-size)
+ (assq-ref options #:force?))
+ (apply proc args))
+ (else
+ (length queue))))
+ (lambda (key . args)
+ (cons* 'worker-thread-error key args))))
+ (let ((new-queue
+ (cond
+ ((or (not queue-size)
+ (assq-ref options #:force?))
+ '())
+ ((= (1+ (length queue)) queue-size)
+ (let ((run-queue
+ (lambda ()
+ (for-each (lambda (thunk)
+ (apply thunk args))
+ (append queue (list proc))))))
+ (apply queue-proc (append args (list run-queue)))
+ '()))
+ (else
+ (append queue (list proc))))))
+ (loop new-queue))))))))))
(iota parallelism))
channel)))
@@ -194,6 +225,7 @@ put-operation until it succeeds."
(define* (call-with-worker-thread channel proc
#:key
+ options
send-timeout
send-timeout-proc
receive-timeout
@@ -207,12 +239,15 @@ to a worker thread.
The same goes for RECEIVE-TIMEOUT and RECEIVE-TIMEOUT-PROC, except that the
timer expires if there is no response from the database worker PROC was sent
-to."
+to.
+
+OPTIONS are forwarded to the worker thread. See MAKE-WORKER-THREAD-CHANNEL
+for a description of the supported options."
(let ((args (%worker-thread-args)))
(if args
(apply proc args)
(let* ((reply (make-channel))
- (message (cons reply proc)))
+ (message (list reply options proc)))
(if (and send-timeout (current-fiber))
(put-message-with-timeout channel message
#:seconds send-timeout