aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nar-herder/cached-compression.scm3
-rw-r--r--nar-herder/database.scm4
-rw-r--r--nar-herder/utils.scm40
3 files changed, 28 insertions, 19 deletions
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm
index 1537169..e30469f 100644
--- a/nar-herder/cached-compression.scm
+++ b/nar-herder/cached-compression.scm
@@ -176,7 +176,7 @@
#:key (cached-compression-workers 2))
(let ((consider-nar-request-channel
- (make-worker-thread-channel
+ (make-worker-thread-set
(lambda ()
(let ((cached-bytes-by-compression-box
(make-atomic-box #f)))
@@ -198,7 +198,6 @@
(proc nar-cached-compression-usage-hash-table)))))
(define (proc narinfo-id cached-bytes-by-compression-box)
- (peek "NARINFO ID" narinfo-id)
(let* ((existing-cached-files
;; This is important both to avoid trying to create
;; files twice, but also in the case where additional
diff --git a/nar-herder/database.scm b/nar-herder/database.scm
index 028fb50..49c0392 100644
--- a/nar-herder/database.scm
+++ b/nar-herder/database.scm
@@ -196,7 +196,7 @@ CREATE INDEX cached_narinfo_files_narinfo_id
(sqlite-close db))
(let ((reader-thread-channel
- (make-worker-thread-channel
+ (make-worker-thread-set
(lambda ()
(let ((db
(db-open database-file #:write? #f)))
@@ -225,7 +225,7 @@ CREATE INDEX cached_narinfo_files_narinfo_id
seconds-delayed))))))
(writer-thread-channel
- (make-worker-thread-channel
+ (make-worker-thread-set
(lambda ()
(let ((db
(db-open database-file)))
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index 2d62360..f4c39f8 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -18,6 +18,7 @@
(define-module (nar-herder utils)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
#:use-module (srfi srfi-19) ; time
#:use-module (ice-9 q)
;; #:use-module (ice-9 ftw)
@@ -55,7 +56,7 @@
chunked-input-ended-prematurely-error?
make-chunked-input-port*
- make-worker-thread-channel
+ make-worker-thread-set
call-with-worker-thread
call-with-time-logging
@@ -565,24 +566,30 @@ falling back to en_US.utf8\n"
(setlocale LC_ALL ""))
#:unwind? #t))
-(define %worker-thread-args
- (make-parameter #f))
-
-(define* (make-worker-thread-channel initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- destructor
- lifetime
- (log-exception? (const #t)))
+(define-record-type <worker-thread-set>
+ (worker-thread-set channel arguments-parameter)
+ worker-thread-set?
+ (channel worker-thread-set-channel)
+ (arguments-parameter worker-thread-set-arguments-parameter))
+
+(define* (make-worker-thread-set initializer
+ #:key (parallelism 1)
+ (delay-logger (lambda _ #f))
+ destructor
+ lifetime
+ (log-exception? (const #t)))
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the worker thread procedure."
+ (define param
+ (make-parameter #f))
+
(let ((channel (make-channel)))
(for-each
(lambda _
(call-with-new-thread
(lambda ()
(let init ((args (initializer)))
- (parameterize ((%worker-thread-args args))
+ (parameterize ((param args))
(let loop ((current-lifetime lifetime))
(match (get-message channel)
(((? channel? reply) sent-time (? procedure? proc))
@@ -632,16 +639,19 @@ arguments of the worker thread procedure."
(apply destructor args))
(init (initializer))))))
(iota parallelism))
- channel))
-(define* (call-with-worker-thread channel proc #:key duration-logger)
+ (worker-thread-set channel
+ param)))
+
+(define* (call-with-worker-thread record proc #:key duration-logger)
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
- (let ((args (%worker-thread-args)))
+ (let ((args ((worker-thread-set-arguments-parameter record))))
(if args
(apply proc args)
(let ((reply (make-channel)))
- (put-message channel (list reply (get-internal-real-time) proc))
+ (put-message (worker-thread-set-channel record)
+ (list reply (get-internal-real-time) proc))
(match (get-message reply)
(('worker-thread-error duration exn)
(when duration-logger