summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathieu Othacehe <othacehe@gnu.org>2020-10-14 13:49:03 +0200
committerMathieu Othacehe <othacehe@gnu.org>2020-10-14 14:15:09 +0200
commitb67f38a7b91c8605a3ae9eba1e2bd3da4b579622 (patch)
tree4671725a8248dcb66e6a843321a85de715077657
parent514f20a9b53ea575078ab9a413d38646bb48aa0b (diff)
downloadcuirass-b67f38a7b91c8605a3ae9eba1e2bd3da4b579622.tar
cuirass-b67f38a7b91c8605a3ae9eba1e2bd3da4b579622.tar.gz
Queue write operations.
SQLite only allows one concurrent write query operation. Having multiple database workers calling "db-update-build-status!", will thus increase worker starvation. Every write operation will also be done is a single transaction. For those reasons, create a database worker dedicated to write queries. Have this worker queue work and issue all the queued work queries in a single transaction. * .dir-locals.el: Add with-db-writer-worker-thread. * src/cuirass/database.scm (with-queue-writer-worker): Rename "with-registration-workers" macro. (%db-writer-channel): Rename "%db-registration-channel" variable. (with-queue-writer-worker): Rename "with-registration-workers". (db-register-builds): Use "with-db-writer-worker-thread" instead of "with-db-registration-worker-thread". (db-update-build-status!): Ditto * src/cuirass/utils.scm (make-worker-thread-channel): Add "queue-size" and "queue-proc" arguments. (call-with-worker-thread): Add "options" argument. * bin/cuirass.in (main): Use "with-queue-writer-worker" instead of "with-registration-workers". Modify the macro scope to include all the possible write operations.
-rw-r--r--.dir-locals.el3
-rw-r--r--bin/cuirass.in92
-rw-r--r--src/cuirass/database.scm58
-rw-r--r--src/cuirass/utils.scm61
4 files changed, 131 insertions, 83 deletions
diff --git a/.dir-locals.el b/.dir-locals.el
index a62798b..0e5705d 100644
--- a/.dir-locals.el
+++ b/.dir-locals.el
@@ -13,7 +13,8 @@
(eval put 'test-error 'scheme-indent-function 1)
(eval put 'make-parameter 'scheme-indent-function 1)
(eval put 'with-database 'scheme-indent-function 0)
- (eval put 'with-db-worker-thread 'scheme-indent-function 1))
+ (eval put 'with-db-worker-thread 'scheme-indent-function 1)
+ (eval put 'with-db-writer-worker-thread 'scheme-indent-function 1))
(texinfo-mode
(indent-tabs-mode)
(fill-column . 72)
diff --git a/bin/cuirass.in b/bin/cuirass.in
index 8da9369..23d8c68 100644
--- a/bin/cuirass.in
+++ b/bin/cuirass.in
@@ -171,53 +171,51 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@"
(while #t
(log-monitoring-stats)
(sleep 600))))))
- (begin
-
- (clear-build-queue)
-
- ;; If Cuirass was stopped during an evaluation,
- ;; abort it. Builds that were not registered
- ;; during this evaluation will be registered
- ;; during the next evaluation.
- (db-abort-pending-evaluations)
-
- ;; First off, restart builds that had not
- ;; completed or were not even started on a
- ;; previous run.
- (spawn-fiber
- (essential-task
- 'restart-builds exit-channel
- (lambda ()
- (restart-builds))))
-
- (spawn-fiber
- (essential-task
- 'build exit-channel
- (lambda ()
- (with-registration-workers
- (while #t
- (process-specs (db-get-specifications))
- (log-message
- "next evaluation in ~a seconds" interval)
- (sleep interval))))))
-
- (spawn-fiber
- (essential-task
- 'metrics exit-channel
- (lambda ()
- (while #t
- (with-time-logging
- "Metrics update"
- (db-update-metrics))
- (sleep 3600)))))
-
- (spawn-fiber
- (essential-task
- 'monitor exit-channel
- (lambda ()
- (while #t
- (log-monitoring-stats)
- (sleep 600)))))))
+ (with-queue-writer-worker
+ (clear-build-queue)
+
+ ;; If Cuirass was stopped during an evaluation,
+ ;; abort it. Builds that were not registered
+ ;; during this evaluation will be registered
+ ;; during the next evaluation.
+ (db-abort-pending-evaluations)
+
+ ;; First off, restart builds that had not
+ ;; completed or were not even started on a
+ ;; previous run.
+ (spawn-fiber
+ (essential-task
+ 'restart-builds exit-channel
+ (lambda ()
+ (restart-builds))))
+
+ (spawn-fiber
+ (essential-task
+ 'build exit-channel
+ (lambda ()
+ (while #t
+ (process-specs (db-get-specifications))
+ (log-message
+ "next evaluation in ~a seconds" interval)
+ (sleep interval)))))
+
+ (spawn-fiber
+ (essential-task
+ 'metrics exit-channel
+ (lambda ()
+ (while #t
+ (with-time-logging
+ "Metrics update"
+ (db-update-metrics))
+ (sleep 3600)))))
+
+ (spawn-fiber
+ (essential-task
+ 'monitor exit-channel
+ (lambda ()
+ (while #t
+ (log-monitoring-stats)
+ (sleep 600)))))))
(primitive-exit (get-message exit-channel))))))
;; Most of our code is I/O so preemption doesn't matter much (it
diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm
index 43d24a9..9c5317e 100644
--- a/src/cuirass/database.scm
+++ b/src/cuirass/database.scm
@@ -93,7 +93,7 @@
;; Macros.
with-db-worker-thread
with-database
- with-registration-workers))
+ with-queue-writer-worker))
(define (%sqlite-exec db sql . args)
"Evaluate the given SQL query with the given ARGS. Return the list of
@@ -188,7 +188,7 @@ specified."
(define %db-channel
(make-parameter #f))
-(define %db-registration-channel
+(define %db-writer-channel
(make-parameter #f))
(define %record-events?
@@ -219,14 +219,17 @@ connection."
(number->string receive-timeout)
caller-name))))))
-(define-syntax-rule (with-db-registration-worker-thread db exp ...)
- "Similar to WITH-DB-WORKER-THREAD but evaluates EXP in database workers
-dedicated to evaluation registration. It is expected those workers to be busy
-for long durations as registration involves running a large number of SQL
-queries. For this reason, do not setup a timeout here."
- (call-with-worker-thread
- (%db-registration-channel)
- (lambda (db) exp ...)))
+(define-syntax with-db-writer-worker-thread
+ (syntax-rules ()
+ "Similar to WITH-DB-WORKER-THREAD but evaluates EXP in a database worker
+dedicated to writing. EXP evaluation is queued unless #:force? is set."
+ ((_ db #:force? force exp ...)
+ (call-with-worker-thread
+ (%db-writer-channel)
+ (lambda (db) exp ...)
+ #:options `((#:force? . ,force))))
+ ((_ db exp ...)
+ (with-db-writer-worker-thread db #:force? #f exp ...))))
(define (read-sql-file file-name)
"Return a list of string containing SQL instructions from FILE-NAME."
@@ -553,16 +556,26 @@ fibers."
(min (current-processor-count) 4))))
body ...))
-(define-syntax-rule (with-registration-workers body ...)
- "Run BODY with %DB-REGISTRATION-CHANNEL being dynamically bound to a channel
-providing worker threads that allow registration database operations to run
-without interfering with fibers."
- (parameterize ((%db-registration-channel
+(define-syntax-rule (with-queue-writer-worker body ...)
+ "Run BODY with %DB-WRITER-CHANNEL being dynamically bound to a channel
+providing a worker thread that allow database write operations to run
+without interfering with fibers.
+
+The worker will queue write operations and run them in a single transaction
+when the queue is full. As write operations are exclusive in SQLite, do not
+allocate more than one worker."
+ (parameterize ((%db-writer-channel
(make-worker-thread-channel
(lambda ()
(list (db-open)))
- #:parallelism
- (min (current-processor-count) 4))))
+ #:parallelism 1
+ #:queue-size 100
+ #:queue-proc
+ (lambda (db run-queue)
+ (log-message "Running writer queue.")
+ (sqlite-exec db "BEGIN TRANSACTION;")
+ (run-queue)
+ (sqlite-exec db "COMMIT;")))))
body ...))
(define* (read-quoted-string #:optional (port (current-input-port)))
@@ -693,10 +706,11 @@ path) VALUES ("
(#:stoptime . 0))))
(db-add-build build)))))
- ;; New builds registration involves running a large number of SQL queries.
- ;; To keep database workers available, use specific database workers
- ;; dedicated to evaluation registration.
- (with-db-registration-worker-thread db
+ ;; Use the database worker dedicated to write queries. We don't want this
+ ;; query to be queued as it is already a quite large transaction by itself,
+ ;; so pass the #:FORCE? option.
+ (with-db-writer-worker-thread db
+ #:force? #t
(log-message "Registering builds for evaluation ~a." eval-id)
(sqlite-exec db "BEGIN TRANSACTION;")
(let ((derivations (filter-map register jobs)))
@@ -717,7 +731,7 @@ log file for DRV."
(,(build-status failed-other) . "failed (other)")
(,(build-status canceled) . "canceled")))
- (with-db-worker-thread db
+ (with-db-writer-worker-thread db
(if (= status (build-status started))
(begin
(sqlite-exec db "UPDATE Builds SET starttime=" now ", status="
diff --git a/src/cuirass/utils.scm b/src/cuirass/utils.scm
index 8eb0ed2..f32e3a1 100644
--- a/src/cuirass/utils.scm
+++ b/src/cuirass/utils.scm
@@ -106,9 +106,19 @@ delimited continuations and fibers."
(make-parameter #f))
(define* (make-worker-thread-channel initializer
- #:key (parallelism 1))
+ #:key
+ (parallelism 1)
+ queue-size
+ (queue-proc (const #t)))
"Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
+arguments of the worker thread procedure. This procedure supports deferring
+work sent to the worker. If QUEUE-SIZE is set, each work query will be
+appended to a queue that will be run once it reaches QUEUE-SIZE elements.
+
+When that happens, the QUEUE-PROC procedure is called with %WORKER-THREAD-ARGS
+and a procedure running the queued work as arguments. The worker thread can
+be passed options. When #:FORCE? option is set, the worker runs the sent work
+immediately even if QUEUE-SIZE has been set."
(parameterize (((@@ (fibers internal) current-fiber) #f))
(let ((channel (make-channel)))
(for-each
@@ -117,16 +127,37 @@ arguments of the worker thread procedure."
(call-with-new-thread
(lambda ()
(parameterize ((%worker-thread-args args))
- (let loop ()
+ (let loop ((queue '()))
(match (get-message channel)
- (((? channel? reply) . (? procedure? proc))
- (put-message reply
- (catch #t
- (lambda ()
- (apply proc args))
- (lambda (key . args)
- (cons* 'worker-thread-error key args))))))
- (loop)))))))
+ (((? channel? reply) options (? procedure? proc))
+ (put-message
+ reply
+ (catch #t
+ (lambda ()
+ (cond
+ ((or (not queue-size)
+ (assq-ref options #:force?))
+ (apply proc args))
+ (else
+ (length queue))))
+ (lambda (key . args)
+ (cons* 'worker-thread-error key args))))
+ (let ((new-queue
+ (cond
+ ((or (not queue-size)
+ (assq-ref options #:force?))
+ '())
+ ((= (1+ (length queue)) queue-size)
+ (let ((run-queue
+ (lambda ()
+ (for-each (lambda (thunk)
+ (apply thunk args))
+ (append queue (list proc))))))
+ (apply queue-proc (append args (list run-queue)))
+ '()))
+ (else
+ (append queue (list proc))))))
+ (loop new-queue))))))))))
(iota parallelism))
channel)))
@@ -194,6 +225,7 @@ put-operation until it succeeds."
(define* (call-with-worker-thread channel proc
#:key
+ options
send-timeout
send-timeout-proc
receive-timeout
@@ -207,12 +239,15 @@ to a worker thread.
The same goes for RECEIVE-TIMEOUT and RECEIVE-TIMEOUT-PROC, except that the
timer expires if there is no response from the database worker PROC was sent
-to."
+to.
+
+OPTIONS are forwarded to the worker thread. See MAKE-WORKER-THREAD-CHANNEL
+for a description of the supported options."
(let ((args (%worker-thread-args)))
(if args
(apply proc args)
(let* ((reply (make-channel))
- (message (cons reply proc)))
+ (message (list reply options proc)))
(if (and send-timeout (current-fiber))
(put-message-with-timeout channel message
#:seconds send-timeout