From b67f38a7b91c8605a3ae9eba1e2bd3da4b579622 Mon Sep 17 00:00:00 2001 From: Mathieu Othacehe Date: Wed, 14 Oct 2020 13:49:03 +0200 Subject: Queue write operations. SQLite only allows one concurrent write query operation. Having multiple database workers calling "db-update-build-status!", will thus increase worker starvation. Every write operation will also be done is a single transaction. For those reasons, create a database worker dedicated to write queries. Have this worker queue work and issue all the queued work queries in a single transaction. * .dir-locals.el: Add with-db-writer-worker-thread. * src/cuirass/database.scm (with-queue-writer-worker): Rename "with-registration-workers" macro. (%db-writer-channel): Rename "%db-registration-channel" variable. (with-queue-writer-worker): Rename "with-registration-workers". (db-register-builds): Use "with-db-writer-worker-thread" instead of "with-db-registration-worker-thread". (db-update-build-status!): Ditto * src/cuirass/utils.scm (make-worker-thread-channel): Add "queue-size" and "queue-proc" arguments. (call-with-worker-thread): Add "options" argument. * bin/cuirass.in (main): Use "with-queue-writer-worker" instead of "with-registration-workers". Modify the macro scope to include all the possible write operations. --- .dir-locals.el | 3 +- bin/cuirass.in | 92 +++++++++++++++++++++++------------------------- src/cuirass/database.scm | 58 ++++++++++++++++++------------ src/cuirass/utils.scm | 61 +++++++++++++++++++++++++------- 4 files changed, 131 insertions(+), 83 deletions(-) diff --git a/.dir-locals.el b/.dir-locals.el index a62798b..0e5705d 100644 --- a/.dir-locals.el +++ b/.dir-locals.el @@ -13,7 +13,8 @@ (eval put 'test-error 'scheme-indent-function 1) (eval put 'make-parameter 'scheme-indent-function 1) (eval put 'with-database 'scheme-indent-function 0) - (eval put 'with-db-worker-thread 'scheme-indent-function 1)) + (eval put 'with-db-worker-thread 'scheme-indent-function 1) + (eval put 'with-db-writer-worker-thread 'scheme-indent-function 1)) (texinfo-mode (indent-tabs-mode) (fill-column . 72) diff --git a/bin/cuirass.in b/bin/cuirass.in index 8da9369..23d8c68 100644 --- a/bin/cuirass.in +++ b/bin/cuirass.in @@ -171,53 +171,51 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@" (while #t (log-monitoring-stats) (sleep 600)))))) - (begin - - (clear-build-queue) - - ;; If Cuirass was stopped during an evaluation, - ;; abort it. Builds that were not registered - ;; during this evaluation will be registered - ;; during the next evaluation. - (db-abort-pending-evaluations) - - ;; First off, restart builds that had not - ;; completed or were not even started on a - ;; previous run. - (spawn-fiber - (essential-task - 'restart-builds exit-channel - (lambda () - (restart-builds)))) - - (spawn-fiber - (essential-task - 'build exit-channel - (lambda () - (with-registration-workers - (while #t - (process-specs (db-get-specifications)) - (log-message - "next evaluation in ~a seconds" interval) - (sleep interval)))))) - - (spawn-fiber - (essential-task - 'metrics exit-channel - (lambda () - (while #t - (with-time-logging - "Metrics update" - (db-update-metrics)) - (sleep 3600))))) - - (spawn-fiber - (essential-task - 'monitor exit-channel - (lambda () - (while #t - (log-monitoring-stats) - (sleep 600))))))) + (with-queue-writer-worker + (clear-build-queue) + + ;; If Cuirass was stopped during an evaluation, + ;; abort it. Builds that were not registered + ;; during this evaluation will be registered + ;; during the next evaluation. + (db-abort-pending-evaluations) + + ;; First off, restart builds that had not + ;; completed or were not even started on a + ;; previous run. + (spawn-fiber + (essential-task + 'restart-builds exit-channel + (lambda () + (restart-builds)))) + + (spawn-fiber + (essential-task + 'build exit-channel + (lambda () + (while #t + (process-specs (db-get-specifications)) + (log-message + "next evaluation in ~a seconds" interval) + (sleep interval))))) + + (spawn-fiber + (essential-task + 'metrics exit-channel + (lambda () + (while #t + (with-time-logging + "Metrics update" + (db-update-metrics)) + (sleep 3600))))) + + (spawn-fiber + (essential-task + 'monitor exit-channel + (lambda () + (while #t + (log-monitoring-stats) + (sleep 600))))))) (primitive-exit (get-message exit-channel)))))) ;; Most of our code is I/O so preemption doesn't matter much (it diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm index 43d24a9..9c5317e 100644 --- a/src/cuirass/database.scm +++ b/src/cuirass/database.scm @@ -93,7 +93,7 @@ ;; Macros. with-db-worker-thread with-database - with-registration-workers)) + with-queue-writer-worker)) (define (%sqlite-exec db sql . args) "Evaluate the given SQL query with the given ARGS. Return the list of @@ -188,7 +188,7 @@ specified." (define %db-channel (make-parameter #f)) -(define %db-registration-channel +(define %db-writer-channel (make-parameter #f)) (define %record-events? @@ -219,14 +219,17 @@ connection." (number->string receive-timeout) caller-name)))))) -(define-syntax-rule (with-db-registration-worker-thread db exp ...) - "Similar to WITH-DB-WORKER-THREAD but evaluates EXP in database workers -dedicated to evaluation registration. It is expected those workers to be busy -for long durations as registration involves running a large number of SQL -queries. For this reason, do not setup a timeout here." - (call-with-worker-thread - (%db-registration-channel) - (lambda (db) exp ...))) +(define-syntax with-db-writer-worker-thread + (syntax-rules () + "Similar to WITH-DB-WORKER-THREAD but evaluates EXP in a database worker +dedicated to writing. EXP evaluation is queued unless #:force? is set." + ((_ db #:force? force exp ...) + (call-with-worker-thread + (%db-writer-channel) + (lambda (db) exp ...) + #:options `((#:force? . ,force)))) + ((_ db exp ...) + (with-db-writer-worker-thread db #:force? #f exp ...)))) (define (read-sql-file file-name) "Return a list of string containing SQL instructions from FILE-NAME." @@ -553,16 +556,26 @@ fibers." (min (current-processor-count) 4)))) body ...)) -(define-syntax-rule (with-registration-workers body ...) - "Run BODY with %DB-REGISTRATION-CHANNEL being dynamically bound to a channel -providing worker threads that allow registration database operations to run -without interfering with fibers." - (parameterize ((%db-registration-channel +(define-syntax-rule (with-queue-writer-worker body ...) + "Run BODY with %DB-WRITER-CHANNEL being dynamically bound to a channel +providing a worker thread that allow database write operations to run +without interfering with fibers. + +The worker will queue write operations and run them in a single transaction +when the queue is full. As write operations are exclusive in SQLite, do not +allocate more than one worker." + (parameterize ((%db-writer-channel (make-worker-thread-channel (lambda () (list (db-open))) - #:parallelism - (min (current-processor-count) 4)))) + #:parallelism 1 + #:queue-size 100 + #:queue-proc + (lambda (db run-queue) + (log-message "Running writer queue.") + (sqlite-exec db "BEGIN TRANSACTION;") + (run-queue) + (sqlite-exec db "COMMIT;"))))) body ...)) (define* (read-quoted-string #:optional (port (current-input-port))) @@ -693,10 +706,11 @@ path) VALUES (" (#:stoptime . 0)))) (db-add-build build))))) - ;; New builds registration involves running a large number of SQL queries. - ;; To keep database workers available, use specific database workers - ;; dedicated to evaluation registration. - (with-db-registration-worker-thread db + ;; Use the database worker dedicated to write queries. We don't want this + ;; query to be queued as it is already a quite large transaction by itself, + ;; so pass the #:FORCE? option. + (with-db-writer-worker-thread db + #:force? #t (log-message "Registering builds for evaluation ~a." eval-id) (sqlite-exec db "BEGIN TRANSACTION;") (let ((derivations (filter-map register jobs))) @@ -717,7 +731,7 @@ log file for DRV." (,(build-status failed-other) . "failed (other)") (,(build-status canceled) . "canceled"))) - (with-db-worker-thread db + (with-db-writer-worker-thread db (if (= status (build-status started)) (begin (sqlite-exec db "UPDATE Builds SET starttime=" now ", status=" diff --git a/src/cuirass/utils.scm b/src/cuirass/utils.scm index 8eb0ed2..f32e3a1 100644 --- a/src/cuirass/utils.scm +++ b/src/cuirass/utils.scm @@ -106,9 +106,19 @@ delimited continuations and fibers." (make-parameter #f)) (define* (make-worker-thread-channel initializer - #:key (parallelism 1)) + #:key + (parallelism 1) + queue-size + (queue-proc (const #t))) "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." +arguments of the worker thread procedure. This procedure supports deferring +work sent to the worker. If QUEUE-SIZE is set, each work query will be +appended to a queue that will be run once it reaches QUEUE-SIZE elements. + +When that happens, the QUEUE-PROC procedure is called with %WORKER-THREAD-ARGS +and a procedure running the queued work as arguments. The worker thread can +be passed options. When #:FORCE? option is set, the worker runs the sent work +immediately even if QUEUE-SIZE has been set." (parameterize (((@@ (fibers internal) current-fiber) #f)) (let ((channel (make-channel))) (for-each @@ -117,16 +127,37 @@ arguments of the worker thread procedure." (call-with-new-thread (lambda () (parameterize ((%worker-thread-args args)) - (let loop () + (let loop ((queue '())) (match (get-message channel) - (((? channel? reply) . (? procedure? proc)) - (put-message reply - (catch #t - (lambda () - (apply proc args)) - (lambda (key . args) - (cons* 'worker-thread-error key args)))))) - (loop))))))) + (((? channel? reply) options (? procedure? proc)) + (put-message + reply + (catch #t + (lambda () + (cond + ((or (not queue-size) + (assq-ref options #:force?)) + (apply proc args)) + (else + (length queue)))) + (lambda (key . args) + (cons* 'worker-thread-error key args)))) + (let ((new-queue + (cond + ((or (not queue-size) + (assq-ref options #:force?)) + '()) + ((= (1+ (length queue)) queue-size) + (let ((run-queue + (lambda () + (for-each (lambda (thunk) + (apply thunk args)) + (append queue (list proc)))))) + (apply queue-proc (append args (list run-queue))) + '())) + (else + (append queue (list proc)))))) + (loop new-queue)))))))))) (iota parallelism)) channel))) @@ -194,6 +225,7 @@ put-operation until it succeeds." (define* (call-with-worker-thread channel proc #:key + options send-timeout send-timeout-proc receive-timeout @@ -207,12 +239,15 @@ to a worker thread. The same goes for RECEIVE-TIMEOUT and RECEIVE-TIMEOUT-PROC, except that the timer expires if there is no response from the database worker PROC was sent -to." +to. + +OPTIONS are forwarded to the worker thread. See MAKE-WORKER-THREAD-CHANNEL +for a description of the supported options." (let ((args (%worker-thread-args))) (if args (apply proc args) (let* ((reply (make-channel)) - (message (cons reply proc))) + (message (list reply options proc))) (if (and send-timeout (current-fiber)) (put-message-with-timeout channel message #:seconds send-timeout -- cgit v1.2.3