From 0d23a6d374340f3f9a3b035f960abd39447e11c3 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 24 Jan 2020 18:25:47 +0000 Subject: utils: Change critical section terminology to worker threads. As far as I'm aware, it's necessary to use a separate thread for interacting with SQLite as one of the threads used for fibers will be blocked while the SQLite query is running. This doesn't mean all queries have to be executed one at a time though, providing the queries are executed outside the threads used by fibers, and a single connection isn't used in multiple threads. These changes start to move in this direction, first by just changing the terminology. * src/cuirass/base.scm (clear-build-queue, cancel-old-builds): Change with-db-critical-section to with-db-worker-thread. * src/cuirass/database.scm (with-db-critical-section): Rename syntax rule to with-db-worker-thread. (db-add-input, db-add-checkout, db-add-specification, db-remove-specification, db-get-inputs, db-get-specification, db-add-evaluation, db-set-evaluations-done, db-set-evaluation-done, db-add-derivation-output, db-add-build, db-update-build-status!, db-get-output, db-get-outputs, db-get-builds-by-search, db-get-builds, db-get-build derivation-or-id, db-add-event, db-get-events, db-delete-events-with-ids-<=-to, db-get-pending-derivations, db-get-checkouts, db-get-evaluations, db-get-evaluations-build-summary, db-get-evaluations-id-max, db-get-evaluation-summary, db-get-builds-query-min, db-get-builds-query-max, db-get-builds-min, db-get-builds-max, db-get-evaluation-specification): Change from using with-db-critical-section to with-db-worker-thread. (with-database): Change syntax rule to use make-worker-thread-channel, renaming from make-critical-section. * src/cuirass/utils.scm (%critical-section-args): Rename parameter to %worker-thread-args. (make-critical-section): Rename to make-worker-thread-channel, and adjust parameter and docstring. (call-with-critical-section): Rename to call-with-worker-thread and adjust parameter. (with-critical-section): Rename to with-worker-thread, and adjust to call call-with-worker-thread. * tests/database.scm (db-init): Use make-worker-thread-channel rather than make-critical-section. * tests/http.scm (db-init): Use make-worker-thread-channel rather than make-critical-section. --- src/cuirass/base.scm | 4 +-- src/cuirass/database.scm | 74 ++++++++++++++++++++++++------------------------ src/cuirass/utils.scm | 38 +++++++++++-------------- 3 files changed, 56 insertions(+), 60 deletions(-) (limited to 'src') diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm index 143bc2e..2b18dc6 100644 --- a/src/cuirass/base.scm +++ b/src/cuirass/base.scm @@ -607,13 +607,13 @@ updating the database accordingly." "Reset the status of builds in the database that are marked as \"started\". This procedure is meant to be called at startup." (log-message "marking stale builds as \"scheduled\"...") - (with-db-critical-section db + (with-db-worker-thread db (sqlite-exec db "UPDATE Builds SET status = -2 WHERE status = -1;"))) (define (cancel-old-builds age) "Cancel builds older than AGE seconds." (log-message "canceling builds older than ~a seconds..." age) - (with-db-critical-section db + (with-db-worker-thread db (sqlite-exec db "UPDATE Builds SET status = 4 WHERE status = -2 AND timestamp < " (- (time-second (current-time time-utc)) age) ";"))) diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm index 14cbbda..2468804 100644 --- a/src/cuirass/database.scm +++ b/src/cuirass/database.scm @@ -73,7 +73,7 @@ %db-channel %record-events? ;; Macros. - with-db-critical-section + with-db-worker-thread with-database)) (define (%sqlite-exec db sql . args) @@ -172,12 +172,12 @@ specified." (define %record-events? (make-parameter #f)) -(define-syntax-rule (with-db-critical-section db exp ...) +(define-syntax-rule (with-db-worker-thread db exp ...) "Evaluate EXP... in the critical section corresponding to %DB-CHANNEL. DB is bound to the argument of that critical section: the database connection." - (call-with-critical-section (%db-channel) - (lambda (db) exp ...))) + (call-with-worker-thread (%db-channel) + (lambda (db) exp ...))) (define (read-sql-file file-name) "Return a list of string containing SQL instructions from FILE-NAME." @@ -292,7 +292,7 @@ of the list, and returns #f when there is no result." (() #f))) (define (db-add-input spec-name input) - (with-db-critical-section db + (with-db-worker-thread db (sqlite-exec db "\ INSERT OR IGNORE INTO Inputs (specification, name, url, load_path, branch, \ tag, revision, no_compile_p) VALUES (" @@ -308,7 +308,7 @@ tag, revision, no_compile_p) VALUES (" (define (db-add-checkout spec-name eval-id checkout) "Insert CHECKOUT associated with SPEC-NAME and EVAL-ID. If a checkout with the same revision already exists for SPEC-NAME, return #f." - (with-db-critical-section db + (with-db-worker-thread db (catch-sqlite-error (sqlite-exec db "\ INSERT INTO Checkouts (specification, revision, evaluation, input, @@ -328,7 +328,7 @@ directory) VALUES (" (define (db-add-specification spec) "Store SPEC in database the database. SPEC inputs are stored in the INPUTS table." - (with-db-critical-section db + (with-db-worker-thread db (sqlite-exec db "\ INSERT OR IGNORE INTO Specifications (name, load_path_inputs, \ package_path_inputs, proc_input, proc_file, proc, proc_args) \ @@ -348,7 +348,7 @@ package_path_inputs, proc_input, proc_file, proc, proc_args) \ (define (db-remove-specification name) "Remove the specification matching NAME from the database and its inputs." - (with-db-critical-section db + (with-db-worker-thread db (sqlite-exec db "BEGIN TRANSACTION;") (sqlite-exec db "\ DELETE FROM Inputs WHERE specification=" name ";") @@ -357,7 +357,7 @@ DELETE FROM Specifications WHERE name=" name ";") (sqlite-exec db "COMMIT;"))) (define (db-get-inputs spec-name) - (with-db-critical-section db + (with-db-worker-thread db (let loop ((rows (sqlite-exec db "SELECT * FROM Inputs WHERE specification=" spec-name ";")) @@ -377,7 +377,7 @@ DELETE FROM Specifications WHERE name=" name ";") inputs))))))) (define (db-get-specifications) - (with-db-critical-section db + (with-db-worker-thread db (let loop ((rows (sqlite-exec db "SELECT * FROM Specifications ORDER BY name DESC;")) (specs '())) (match rows @@ -401,7 +401,7 @@ DELETE FROM Specifications WHERE name=" name ";") (define (db-add-evaluation spec-name checkouts) "Add a new evaluation for SPEC-NAME only if one of the CHECKOUTS is new. Otherwise, return #f." - (with-db-critical-section db + (with-db-worker-thread db (sqlite-exec db "BEGIN TRANSACTION;") (sqlite-exec db "INSERT INTO Evaluations (specification, in_progress) VALUES (" spec-name ", true);") @@ -421,11 +421,11 @@ VALUES (" spec-name ", true);") eval-id))))) (define (db-set-evaluations-done) - (with-db-critical-section db + (with-db-worker-thread db (sqlite-exec db "UPDATE Evaluations SET in_progress = false;"))) (define (db-set-evaluation-done eval-id) - (with-db-critical-section db + (with-db-worker-thread db (sqlite-exec db "UPDATE Evaluations SET in_progress = false WHERE id = " eval-id ";") (db-add-event 'evaluation @@ -449,7 +449,7 @@ a critical section that allows database operations to be serialized." ;; access blocks on PUT-MESSAGE, which allows the scheduler to schedule ;; another fiber. Also, creating one new handle for each request would ;; be costly and may defeat statement caching. - (parameterize ((%db-channel (make-critical-section db))) + (parameterize ((%db-channel (make-worker-thread-channel db))) body ...) (db-close db)))) @@ -484,7 +484,7 @@ string." (define (db-add-output derivation output) "Insert OUTPUT associated with DERIVATION. If an output with the same path already exists, return #f." - (with-db-critical-section db + (with-db-worker-thread db (catch-sqlite-error (match output ((name . path) @@ -501,7 +501,7 @@ INSERT INTO Outputs (derivation, name, path) VALUES (" (define (db-add-build build) "Store BUILD in database the database only if one of its outputs is new. Return #f otherwise. BUILD outputs are stored in the OUTPUTS table." - (with-db-critical-section db + (with-db-worker-thread db (catch-sqlite-error (sqlite-exec db "BEGIN TRANSACTION;") (sqlite-exec db " @@ -558,7 +558,7 @@ log file for DRV." (,(build-status failed-other) . "failed (other)") (,(build-status canceled) . "canceled"))) - (with-db-critical-section db + (with-db-worker-thread db (if (= status (build-status started)) (begin (sqlite-exec db "UPDATE Builds SET starttime=" now ", status=" @@ -590,7 +590,7 @@ log file for DRV." (define (db-get-output path) "Retrieve the OUTPUT for PATH." - (with-db-critical-section db + (with-db-worker-thread db ;; There isn't a unique index on path, but because Cuirass avoids adding ;; derivations which introduce the same outputs, there should only be one ;; result. @@ -605,7 +605,7 @@ LIMIT 1;") (define (db-get-outputs derivation) "Retrieve the OUTPUTS of the build identified by DERIVATION in the database." - (with-db-critical-section db + (with-db-worker-thread db (let loop ((rows (sqlite-exec db "SELECT name, path FROM Outputs WHERE derivation =" derivation ";")) @@ -668,7 +668,7 @@ WHERE derivation =" derivation ";")) "Retrieve all builds in the database which are matched by given FILTERS. FILTERS is an assoc list whose possible keys are the symbols query, border-low-id, border-high-id, and nr." - (with-db-critical-section db + (with-db-worker-thread db (let* ((stmt-text (format #f "SELECT * FROM ( SELECT Builds.rowid, Builds.timestamp, Builds.starttime, Builds.stoptime, Builds.log, Builds.status, Builds.job_name, Builds.system, @@ -725,7 +725,7 @@ ORDER BY rowid DESC;")) "Retrieve all builds in the database which are matched by given FILTERS. FILTERS is an assoc list whose possible keys are 'derivation | 'id | 'jobset | 'job | 'system | 'nr | 'order | 'status | 'evaluation." - (with-db-critical-section db + (with-db-worker-thread db (let* ((order (filters->order filters)) (stmt-text (format #f "SELECT * FROM ( SELECT Builds.derivation, Builds.rowid, Builds.timestamp, Builds.starttime, @@ -800,13 +800,13 @@ ORDER BY ~a, rowid ASC;" order)) (define (db-get-build derivation-or-id) "Retrieve a build in the database which corresponds to DERIVATION-OR-ID." - (with-db-critical-section db + (with-db-worker-thread db (let ((key (if (number? derivation-or-id) 'id 'derivation))) (expect-one-row (db-get-builds `((,key . ,derivation-or-id))))))) (define (db-add-event type timestamp details) (when (%record-events?) - (with-db-critical-section db + (with-db-worker-thread db (sqlite-exec db "\ INSERT INTO Events (type, timestamp, event_json) VALUES (" (symbol->string type) ", " @@ -816,7 +816,7 @@ INSERT INTO Events (type, timestamp, event_json) VALUES (" #t))) (define (db-get-events filters) - (with-db-critical-section db + (with-db-worker-thread db (let* ((stmt-text "\ SELECT Events.id, Events.type, @@ -856,7 +856,7 @@ LIMIT :nr;") events)))))))) (define (db-delete-events-with-ids-<=-to id) - (with-db-critical-section db + (with-db-worker-thread db (sqlite-exec db "DELETE FROM Events WHERE id <= " id ";"))) @@ -864,13 +864,13 @@ LIMIT :nr;") (define (db-get-pending-derivations) "Return the list of derivation file names corresponding to pending builds in the database. The returned list is guaranteed to not have any duplicates." - (with-db-critical-section db + (with-db-worker-thread db (map (match-lambda (#(drv) drv)) (sqlite-exec db " SELECT derivation FROM Builds WHERE Builds.status < 0;")))) (define (db-get-checkouts eval-id) - (with-db-critical-section db + (with-db-worker-thread db (let loop ((rows (sqlite-exec db "SELECT revision, input, directory FROM Checkouts WHERE evaluation =" eval-id ";")) @@ -886,7 +886,7 @@ WHERE evaluation =" eval-id ";")) checkouts))))))) (define (db-get-evaluations limit) - (with-db-critical-section db + (with-db-worker-thread db (let loop ((rows (sqlite-exec db "SELECT id, specification, in_progress FROM Evaluations ORDER BY id DESC LIMIT " limit ";")) (evaluations '())) @@ -902,7 +902,7 @@ FROM Evaluations ORDER BY id DESC LIMIT " limit ";")) evaluations))))))) (define (db-get-evaluations-build-summary spec limit border-low border-high) - (with-db-critical-section db + (with-db-worker-thread db (let loop ((rows (sqlite-exec db " SELECT E.id, E.in_progress, B.succeeded, B.failed, B.scheduled FROM @@ -935,7 +935,7 @@ ORDER BY E.id ASC;")) (define (db-get-evaluations-id-min spec) "Return the min id of evaluations for the given specification SPEC." - (with-db-critical-section db + (with-db-worker-thread db (let ((rows (sqlite-exec db " SELECT MIN(id) FROM Evaluations WHERE specification=" spec))) @@ -943,14 +943,14 @@ WHERE specification=" spec))) (define (db-get-evaluations-id-max spec) "Return the max id of evaluations for the given specification SPEC." - (with-db-critical-section db + (with-db-worker-thread db (let ((rows (sqlite-exec db " SELECT MAX(id) FROM Evaluations WHERE specification=" spec))) (and=> (expect-one-row rows) (cut vector-ref <> 0))))) (define (db-get-evaluation-summary id) - (with-db-critical-section db + (with-db-worker-thread db (let ((rows (sqlite-exec db " SELECT E.id, E.in_progress, B.total, B.succeeded, B.failed, B.scheduled FROM @@ -976,7 +976,7 @@ ORDER BY E.id ASC;"))) (define (db-get-builds-query-min query) "Return the smallest build row identifier matching QUERY." - (with-db-critical-section db + (with-db-worker-thread db (let* ((stmt-text "SELECT MIN(Builds.rowid) FROM Builds INNER JOIN Evaluations ON Builds.evaluation = Evaluations.id INNER JOIN Specifications ON Evaluations.specification = Specifications.name @@ -995,7 +995,7 @@ AND (:system IS NULL (define (db-get-builds-query-max query) "Return the largest build row identifier matching QUERY." - (with-db-critical-section db + (with-db-worker-thread db (let* ((stmt-text "SELECT MAX(Builds.rowid) FROM Builds INNER JOIN Evaluations ON Builds.evaluation = Evaluations.id INNER JOIN Specifications ON Evaluations.specification = Specifications.name @@ -1015,7 +1015,7 @@ AND (:system IS NULL (define (db-get-builds-min eval status) "Return the min build (stoptime, rowid) pair for the given evaluation EVAL and STATUS." - (with-db-critical-section db + (with-db-worker-thread db (let ((rows (sqlite-exec db " SELECT stoptime, MIN(rowid) FROM (SELECT rowid, stoptime FROM Builds @@ -1034,7 +1034,7 @@ AND (" status " IS NULL OR (" status " = 'pending' (define (db-get-builds-max eval status) "Return the max build (stoptime, rowid) pair for the given evaluation EVAL and STATUS." - (with-db-critical-section db + (with-db-worker-thread db (let ((rows (sqlite-exec db " SELECT stoptime, MAX(rowid) FROM (SELECT rowid, stoptime FROM Builds @@ -1052,7 +1052,7 @@ AND (" status " IS NULL OR (" status " = 'pending' (define (db-get-evaluation-specification eval) "Return specification of evaluation with id EVAL." - (with-db-critical-section db + (with-db-worker-thread db (let ((rows (sqlite-exec db " SELECT specification FROM Evaluations WHERE id = " eval))) diff --git a/src/cuirass/utils.scm b/src/cuirass/utils.scm index fe74b69..514899e 100644 --- a/src/cuirass/utils.scm +++ b/src/cuirass/utils.scm @@ -35,9 +35,9 @@ define-enumeration unwind-protect - make-critical-section - call-with-critical-section - with-critical-section + make-worker-thread-channel + call-with-worker-thread + with-worker-thread %non-blocking non-blocking @@ -96,21 +96,17 @@ delimited continuations and fibers." (conclusion) (apply throw args))))) -(define %critical-section-args +(define %worker-thread-args (make-parameter #f)) -(define (make-critical-section . args) - "Return a channel used to implement a critical section. That channel can -then be passed to 'join-critical-section', which will ensure sequential -ordering. ARGS are the arguments of the critical section. - -Critical sections are implemented by passing the procedure to execute to a -dedicated thread." +(define (make-worker-thread-channel . args) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the worker thread procedure." (parameterize (((@@ (fibers internal) current-fiber) #f)) (let ((channel (make-channel))) (call-with-new-thread (lambda () - (parameterize ((%critical-section-args args)) + (parameterize ((%worker-thread-args args)) (let loop () (match (get-message channel) (((? channel? reply) . (? procedure? proc)) @@ -118,21 +114,21 @@ dedicated thread." (loop))))) channel))) -(define (call-with-critical-section channel proc) - "Send PROC to the critical section through CHANNEL. Return the result of -PROC. If already in the critical section, call PROC immediately." - (let ((args (%critical-section-args))) +(define (call-with-worker-thread channel proc) + "Send PROC to the worker thread through CHANNEL. Return the result of PROC. +If already in the worker thread, call PROC immediately." + (let ((args (%worker-thread-args))) (if args (apply proc args) (let ((reply (make-channel))) (put-message channel (cons reply proc)) (get-message reply))))) -(define-syntax-rule (with-critical-section channel (vars ...) exp ...) - "Evaluate EXP... in the critical section corresponding to CHANNEL. -VARS... are bound to the arguments of the critical section." - (call-with-critical-section channel - (lambda (vars ...) exp ...))) +(define-syntax-rule (with-worker-thread channel (vars ...) exp ...) + "Evaluate EXP... in the worker thread corresponding to CHANNEL. +VARS... are bound to the arguments of the worker thread." + (call-with-worker-thread channel + (lambda (vars ...) exp ...))) (define (%non-blocking thunk) (parameterize (((@@ (fibers internal) current-fiber) #f)) -- cgit v1.2.3