diff options
-rw-r--r-- | nar-herder/cached-compression.scm | 3 | ||||
-rw-r--r-- | nar-herder/database.scm | 4 | ||||
-rw-r--r-- | nar-herder/utils.scm | 40 |
3 files changed, 28 insertions, 19 deletions
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm index 1537169..e30469f 100644 --- a/nar-herder/cached-compression.scm +++ b/nar-herder/cached-compression.scm @@ -176,7 +176,7 @@ #:key (cached-compression-workers 2)) (let ((consider-nar-request-channel - (make-worker-thread-channel + (make-worker-thread-set (lambda () (let ((cached-bytes-by-compression-box (make-atomic-box #f))) @@ -198,7 +198,6 @@ (proc nar-cached-compression-usage-hash-table))))) (define (proc narinfo-id cached-bytes-by-compression-box) - (peek "NARINFO ID" narinfo-id) (let* ((existing-cached-files ;; This is important both to avoid trying to create ;; files twice, but also in the case where additional diff --git a/nar-herder/database.scm b/nar-herder/database.scm index 028fb50..49c0392 100644 --- a/nar-herder/database.scm +++ b/nar-herder/database.scm @@ -196,7 +196,7 @@ CREATE INDEX cached_narinfo_files_narinfo_id (sqlite-close db)) (let ((reader-thread-channel - (make-worker-thread-channel + (make-worker-thread-set (lambda () (let ((db (db-open database-file #:write? #f))) @@ -225,7 +225,7 @@ CREATE INDEX cached_narinfo_files_narinfo_id seconds-delayed)))))) (writer-thread-channel - (make-worker-thread-channel + (make-worker-thread-set (lambda () (let ((db (db-open database-file))) diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index 2d62360..f4c39f8 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -18,6 +18,7 @@ (define-module (nar-herder utils) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) ; time #:use-module (ice-9 q) ;; #:use-module (ice-9 ftw) @@ -55,7 +56,7 @@ chunked-input-ended-prematurely-error? make-chunked-input-port* - make-worker-thread-channel + make-worker-thread-set call-with-worker-thread call-with-time-logging @@ -565,24 +566,30 @@ falling back to en_US.utf8\n" (setlocale LC_ALL "")) #:unwind? #t)) -(define %worker-thread-args - (make-parameter #f)) - -(define* (make-worker-thread-channel initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - destructor - lifetime - (log-exception? (const #t))) +(define-record-type <worker-thread-set> + (worker-thread-set channel arguments-parameter) + worker-thread-set? + (channel worker-thread-set-channel) + (arguments-parameter worker-thread-set-arguments-parameter)) + +(define* (make-worker-thread-set initializer + #:key (parallelism 1) + (delay-logger (lambda _ #f)) + destructor + lifetime + (log-exception? (const #t))) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the worker thread procedure." + (define param + (make-parameter #f)) + (let ((channel (make-channel))) (for-each (lambda _ (call-with-new-thread (lambda () (let init ((args (initializer))) - (parameterize ((%worker-thread-args args)) + (parameterize ((param args)) (let loop ((current-lifetime lifetime)) (match (get-message channel) (((? channel? reply) sent-time (? procedure? proc)) @@ -632,16 +639,19 @@ arguments of the worker thread procedure." (apply destructor args)) (init (initializer)))))) (iota parallelism)) - channel)) -(define* (call-with-worker-thread channel proc #:key duration-logger) + (worker-thread-set channel + param))) + +(define* (call-with-worker-thread record proc #:key duration-logger) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) + (let ((args ((worker-thread-set-arguments-parameter record)))) (if args (apply proc args) (let ((reply (make-channel))) - (put-message channel (list reply (get-internal-real-time) proc)) + (put-message (worker-thread-set-channel record) + (list reply (get-internal-real-time) proc)) (match (get-message reply) (('worker-thread-error duration exn) (when duration-logger |