aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--guix-dev.scm34
-rw-r--r--nar-herder/cached-compression.scm8
-rw-r--r--nar-herder/database.scm367
-rw-r--r--nar-herder/mirror.scm42
-rw-r--r--nar-herder/recent-changes.scm59
-rw-r--r--nar-herder/server.scm158
-rw-r--r--nar-herder/storage.scm239
-rw-r--r--nar-herder/utils.scm631
-rw-r--r--scripts/nar-herder.in150
9 files changed, 663 insertions, 1025 deletions
diff --git a/guix-dev.scm b/guix-dev.scm
index 61e282c..b2e6da3 100644
--- a/guix-dev.scm
+++ b/guix-dev.scm
@@ -45,6 +45,39 @@
(gnu packages web)
(srfi srfi-1))
+(define guile-knots
+ (let ((commit "dcb56ee2c5ac3e283cb46841766e7282f3c2c52e")
+ (revision "1"))
+ (package
+ (name "guile-knots")
+ (version (git-version "0" revision commit))
+ (source (origin
+ (method git-fetch)
+ (uri (git-reference
+ (url "https://git.cbaines.net/git/guile/knots")
+ (commit commit)))
+ (sha256
+ (base32
+ "04z48572canx35hl0kfli3pf3g3m6184zvmnpyg1rbwla6g5z1fk"))
+ (file-name (string-append name "-" version "-checkout"))))
+ (build-system gnu-build-system)
+ (native-inputs
+ (list pkg-config
+ autoconf
+ automake
+ guile-3.0
+ guile-lib
+ guile-fibers))
+ (inputs
+ (list guile-3.0))
+ (propagated-inputs
+ (list guile-fibers))
+ (home-page "https://git.cbaines.net/guile/knots")
+ (synopsis "Patterns and functionality to use with Guile Fibers")
+ (description
+ "")
+ (license license:gpl3+))))
+
(package
(name "nar-herder")
(version "0")
@@ -54,6 +87,7 @@
`(("guix" ,guix)
("guile-json" ,guile-json-4)
("guile-fibers" ,guile-fibers-1.3)
+ ("guile-knots" ,guile-knots)
("guile-gcrypt" ,guile-gcrypt)
("guile-readline" ,guile-readline)
("guile-lzlib" ,guile-lzlib)
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm
index 375fdaa..1ac6d96 100644
--- a/nar-herder/cached-compression.scm
+++ b/nar-herder/cached-compression.scm
@@ -31,6 +31,9 @@
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers operations)
+ #:use-module (knots timeout)
+ #:use-module (knots non-blocking)
+ #:use-module (knots worker-threads)
#:use-module (web uri)
#:use-module (web client)
#:use-module (web response)
@@ -411,7 +414,7 @@
(put-message reply #t))
(loop (alist-cons
- cached-bytes-by-compression
+ compression
updated-bytes
(alist-delete compression
cached-bytes-by-compression)))))))))
@@ -662,8 +665,7 @@
(call-with-values
(lambda ()
(let ((port
- socket
- (open-socket-for-uri* uri)))
+ (non-blocking-open-socket-for-uri uri)))
(http-get uri
#:port port
#:decode-body? #f
diff --git a/nar-herder/database.scm b/nar-herder/database.scm
index 4fa145f..239a7e7 100644
--- a/nar-herder/database.scm
+++ b/nar-herder/database.scm
@@ -27,6 +27,7 @@
#:use-module (web uri)
#:use-module (sqlite3)
#:use-module (fibers)
+ #:use-module (knots worker-threads)
#:use-module (prometheus)
#:use-module (guix store)
#:use-module (guix narinfo)
@@ -77,12 +78,12 @@
database-insert-scheduled-cached-narinfo-removal))
(define-record-type <database>
- (make-database database-file reader-thread-channel writer-thread-channel
+ (make-database database-file reader-thread-set writer-thread-set
metrics-registry)
database?
(database-file database-file)
- (reader-thread-channel database-reader-thread-channel)
- (writer-thread-channel database-writer-thread-channel)
+ (reader-thread-set database-reader-thread-set)
+ (writer-thread-set database-writer-thread-set)
(metrics-registry database-metrics-registry))
(define* (db-open database
@@ -111,8 +112,6 @@ CREATE TABLE narinfos (
added_at TEXT
);
-CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32));
-
CREATE TABLE narinfo_files (
narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
size INTEGER NOT NULL,
@@ -120,8 +119,6 @@ CREATE TABLE narinfo_files (
url TEXT NOT NULL
);
-CREATE INDEX narinfo_files_narinfo_id ON narinfo_files (narinfo_id);
-
CREATE TABLE narinfo_references (
narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
store_path TEXT NOT NULL
@@ -133,8 +130,6 @@ CREATE TABLE tags (
value TEXT NOT NULL
);
-CREATE UNIQUE INDEX tags_index ON tags (key, value);
-
CREATE TABLE narinfo_tags (
narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
tag_id INTEGER NOT NULL REFERENCES tags (id)
@@ -154,9 +149,6 @@ CREATE TABLE cached_narinfo_files (
compression TEXT
);
-CREATE INDEX cached_narinfo_files_narinfo_id
- ON cached_narinfo_files (narinfo_id);
-
CREATE TABLE scheduled_narinfo_removal (
narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id),
removal_datetime TEXT NOT NULL
@@ -216,10 +208,7 @@ CREATE TABLE cached_narinfo_files (
narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
size INTEGER NOT NULL,
compression TEXT NOT NULL
-);
-
-CREATE INDEX cached_narinfo_files_narinfo_id
- ON cached_narinfo_files (narinfo_id);"))
+);"))
(unless (column-exists? db "narinfos" "added_at")
(sqlite-exec
@@ -246,6 +235,22 @@ CREATE TABLE scheduled_cached_narinfo_removal (
(sqlite-exec
db
+ "
+CREATE UNIQUE INDEX IF NOT EXISTS
+ narinfos_store_hash ON narinfos (substr(store_path, 12, 32));")
+
+ (sqlite-exec
+ db
+ "CREATE UNIQUE INDEX IF NOT EXISTS
+ tags_index ON tags (key, value);")
+
+ (sqlite-exec
+ db
+ "CREATE INDEX IF NOT EXISTS cached_narinfo_files_narinfo_id
+ ON cached_narinfo_files (narinfo_id);")
+
+ (sqlite-exec
+ db
"CREATE INDEX IF NOT EXISTS narinfo_tags_narinfo_id
ON narinfo_tags (narinfo_id);")
@@ -257,34 +262,41 @@ CREATE TABLE scheduled_cached_narinfo_removal (
(sqlite-exec
db
"CREATE INDEX IF NOT EXISTS narinfo_files_narinfo_id
- ON narinfo_files (narinfo_id);"))
+ ON narinfo_files (narinfo_id);")
+
+ (sqlite-exec
+ db
+ "CREATE INDEX IF NOT EXISTS narinfo_files_url
+ ON narinfo_files (url);"))
(define* (setup-database database-file metrics-registry
- #:key (reader-threads 1))
+ #:key (reader-threads 1)
+ (readonly? #f))
(define mmap-size #f)
- (let ((db (db-open database-file)))
- (sqlite-exec db "PRAGMA journal_mode=WAL;")
- (sqlite-exec db "PRAGMA optimize;")
- (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")
-
- (update-schema db)
-
- ;; (let ((requested-mmap-bytes 2147418112)
- ;; (statement
- ;; (sqlite-prepare
- ;; db
- ;; (simple-format #f "PRAGMA mmap_size=~A;"
- ;; 2147418112))))
- ;; (match (sqlite-step statement)
- ;; (#(result-mmap-size)
- ;; (sqlite-finalize statement)
- ;; (set! mmap-size
- ;; result-mmap-size))))
-
- (sqlite-close db))
-
- (let ((reader-thread-channel
+ (unless readonly?
+ (let ((db (db-open database-file)))
+ (sqlite-exec db "PRAGMA journal_mode=WAL;")
+ (sqlite-exec db "PRAGMA optimize;")
+ (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")
+
+ (update-schema db)
+
+ ;; (let ((requested-mmap-bytes 2147418112)
+ ;; (statement
+ ;; (sqlite-prepare
+ ;; db
+ ;; (simple-format #f "PRAGMA mmap_size=~A;"
+ ;; 2147418112))))
+ ;; (match (sqlite-step statement)
+ ;; (#(result-mmap-size)
+ ;; (sqlite-finalize statement)
+ ;; (set! mmap-size
+ ;; result-mmap-size))))
+
+ (sqlite-close db)))
+
+ (let ((reader-thread-set
(make-worker-thread-set
(lambda ()
(let ((db
@@ -300,6 +312,7 @@ CREATE TABLE scheduled_cached_narinfo_removal (
(lambda (db)
(sqlite-close db))
#:lifetime 50000
+ #:expire-on-exception? #t
#:name "db r"
;; Use a minimum of 2 and a maximum of 8 threads
@@ -329,58 +342,61 @@ CREATE TABLE scheduled_cached_narinfo_removal (
proc)
(current-error-port))))))
- (writer-thread-channel
- (make-worker-thread-set
- (lambda ()
- (let ((db
- (db-open database-file)))
- (sqlite-exec db "PRAGMA busy_timeout = 5000;")
- (sqlite-exec db "PRAGMA foreign_keys = ON;")
- (when mmap-size
- (sqlite-exec
- db
- (simple-format #f "PRAGMA mmap_size=~A;"
- (number->string mmap-size))))
- (list db)))
- #:destructor
- (lambda (db)
- (db-optimize db
- database-file)
-
- (sqlite-close db))
- #:lifetime 500
- #:name "db w"
-
- ;; SQLite doesn't support parallel writes
- #:parallelism 1
- #:delay-logger (let ((delay-metric
- (make-histogram-metric
- metrics-registry
- "database_write_delay_seconds")))
- (lambda (seconds-delayed proc)
- (metric-observe delay-metric seconds-delayed)
- (when (> seconds-delayed 1)
- (display
- (format
- #f
- "warning: database write (~a) delayed by ~1,2f seconds~%"
- proc
- seconds-delayed)
- (current-error-port)))))
- #:duration-logger
- (lambda (duration proc)
- (when (> duration 5)
- (display
- (format
- #f
- "warning: database write took ~1,2f seconds (~a)~%"
- duration
- proc)
- (current-error-port)))))))
+ (writer-thread-set
+ (if readonly?
+ #f
+ (make-worker-thread-set
+ (lambda ()
+ (let ((db
+ (db-open database-file)))
+ (sqlite-exec db "PRAGMA busy_timeout = 5000;")
+ (sqlite-exec db "PRAGMA foreign_keys = ON;")
+ (when mmap-size
+ (sqlite-exec
+ db
+ (simple-format #f "PRAGMA mmap_size=~A;"
+ (number->string mmap-size))))
+ (list db)))
+ #:destructor
+ (lambda (db)
+ (db-optimize db
+ database-file)
+
+ (sqlite-close db))
+ #:lifetime 500
+ #:expire-on-exception? #t
+ #:name "db w"
+
+ ;; SQLite doesn't support parallel writes
+ #:parallelism 1
+ #:delay-logger (let ((delay-metric
+ (make-histogram-metric
+ metrics-registry
+ "database_write_delay_seconds")))
+ (lambda (seconds-delayed proc)
+ (metric-observe delay-metric seconds-delayed)
+ (when (> seconds-delayed 1)
+ (display
+ (format
+ #f
+ "warning: database write (~a) delayed by ~1,2f seconds~%"
+ proc
+ seconds-delayed)
+ (current-error-port)))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 5)
+ (display
+ (format
+ #f
+ "warning: database write took ~1,2f seconds (~a)~%"
+ duration
+ proc)
+ (current-error-port))))))))
(make-database database-file
- reader-thread-channel
- writer-thread-channel
+ reader-thread-set
+ writer-thread-set
metrics-registry)))
(define (update-database-metrics! database)
@@ -430,7 +446,7 @@ PRAGMA optimize;")))
(retry-on-error
(lambda ()
(call-with-worker-thread
- (database-writer-thread-channel database)
+ (database-writer-thread-set database)
(lambda (db)
(db-optimize
db
@@ -472,42 +488,82 @@ PRAGMA optimize;")))
readonly?
(immediate? (not readonly?)))
(define (run-proc-within-transaction db)
- (with-exception-handler
- (lambda (exn)
- (match (exception-args exn)
- (('sqlite-exec 5 msg)
- (simple-format (current-error-port) "warning: sqlite error: ~A\n" msg)
- (run-proc-within-transaction db))
- (_
- (simple-format (current-error-port)
- "exception starting transaction\n")
- (raise-exception exn))))
- (lambda ()
- (sqlite-exec db (if immediate?
- "BEGIN IMMEDIATE TRANSACTION;"
- "BEGIN TRANSACTION;"))
- (with-exception-handler
- (lambda (exn)
- (simple-format (current-error-port)
- "error: sqlite rolling back transaction\n")
- (sqlite-exec db "ROLLBACK TRANSACTION;")
- (raise-exception exn))
- (lambda ()
- (call-with-values
+ (define (attempt-begin)
+ (with-exception-handler
+ (lambda (exn)
+ (match (exception-args exn)
+ (('sqlite-exec 5 msg)
+ (simple-format
+ (current-error-port)
+ "warning: issue starting transaction (code: 5, proc: ~A): ~A\n"
+ proc msg)
+ #f)
+ (_
+ (simple-format (current-error-port)
+ "exception starting transaction: ~A\n" exn)
+ (raise-exception exn))))
+ (lambda ()
+ (sqlite-exec db (if immediate?
+ "BEGIN IMMEDIATE TRANSACTION;"
+ "BEGIN TRANSACTION;"))
+ #t)
+ #:unwind? #t))
+
+ (define (attempt-commit)
+ (with-exception-handler
+ (lambda (exn)
+ (match (exception-args exn)
+ (('sqlite-exec 5 msg)
+ (simple-format
+ (current-error-port)
+ "warning: attempt commit (code: 5, proc: ~A): ~A\n"
+ proc msg)
+ #f)
+ (_
+ (simple-format (current-error-port)
+ "exception committing transaction: ~A\n" exn)
+ (raise-exception exn))))
+ (lambda ()
+ (sqlite-exec db "COMMIT TRANSACTION;")
+ #t)
+ #:unwind? #t))
+
+ (if (attempt-begin)
+ (call-with-values
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "error: sqlite rolling back transaction (~A)\n"
+ exn)
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (raise-exception exn))
(lambda ()
(parameterize ((%current-transaction-proc proc))
(proc-with-duration-timing db)))
- (lambda vals
- (sqlite-exec db "COMMIT TRANSACTION;")
- (apply values vals))))
- #:unwind? #t))
- #:unwind? #t))
+ #:unwind? #t))
+ (lambda vals
+ (let loop ((success? (attempt-commit)))
+ (if success?
+ (apply values vals)
+ (loop (attempt-commit))))))
+
+ ;; Database is busy, so retry
+ (run-proc-within-transaction db)))
(define (proc-with-duration-timing db)
(let ((start-time (get-internal-real-time)))
(call-with-values
(lambda ()
- (proc db))
+ (with-throw-handler #t
+ (lambda ()
+ (proc db))
+ (lambda (key . args)
+ (simple-format
+ (current-error-port)
+ "exception in transaction: ~A: ~A\n"
+ key args)
+ (backtrace))))
(lambda vals
(let ((duration-seconds
(/ (- (get-internal-real-time) start-time)
@@ -526,8 +582,8 @@ PRAGMA optimize;")))
(match (call-with-worker-thread
((if readonly?
- database-reader-thread-channel
- database-writer-thread-channel)
+ database-reader-thread-set
+ database-writer-thread-set)
database)
(lambda (db)
(if (%current-transaction-proc)
@@ -537,12 +593,25 @@ PRAGMA optimize;")))
(apply values vals))))
(define (dump-database database name)
+ (define (strip-db name)
+ (let ((db (db-open name)))
+ (let ((tables-to-clear
+ '("cached_narinfo_files")))
+ (for-each
+ (lambda (table)
+ (sqlite-exec db (simple-format #f "DELETE FROM ~A;" table)))
+ tables-to-clear))
+
+ (sqlite-close db)))
+
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(sqlite-exec
db
- (string-append "VACUUM INTO '" name "';")))))
+ (string-append "VACUUM INTO '" name "';"))))
+
+ (strip-db name))
(define (last-insert-rowid db)
(let ((statement
@@ -890,7 +959,7 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id"
"select_narinfo"
(lambda ()
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -922,7 +991,7 @@ WHERE id = :id"
"select_narinfo_by_hash"
(lambda ()
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -955,7 +1024,7 @@ WHERE substr(store_path, 12, 32) = :hash"
"select_narinfo_contents_by_hash"
(lambda ()
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -977,7 +1046,7 @@ SELECT id, contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
(define (database-count-recent-changes database)
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -994,7 +1063,7 @@ SELECT COUNT(*) FROM recent_changes"
(define* (database-select-recent-changes database after-date #:key (limit 8192))
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1024,7 +1093,7 @@ LIMIT :limit"
(define (database-select-latest-recent-change-datetime database)
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1042,7 +1111,7 @@ SELECT datetime FROM recent_changes ORDER BY datetime DESC LIMIT 1"
(define (database-get-recent-changes-id-for-deletion database limit)
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1064,7 +1133,7 @@ SELECT id FROM recent_changes ORDER BY datetime DESC LIMIT 1 OFFSET :offset"
(define (database-delete-recent-changes-with-id-below database id)
(call-with-worker-thread
- (database-writer-thread-channel database)
+ (database-writer-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1077,14 +1146,14 @@ DELETE FROM recent_changes WHERE id < :id"
statement
#:id id)
- (sqlite-step statement)
+ (sqlite-fold (const #f) #f statement)
(sqlite-reset statement)
(changes db)))))
(define (database-select-narinfo-for-file database narinfo-file-url)
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1119,7 +1188,7 @@ WHERE narinfo_files.url = :url"
"select_narinfo_files"
(lambda ()
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1154,7 +1223,7 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash"
"select_narinfo_files_by_narinfo_id"
(lambda ()
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1185,7 +1254,7 @@ WHERE narinfos.id = :narinfo_id"
(define (database-fold-all-narinfo-files database proc init)
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1219,7 +1288,7 @@ FROM narinfo_files"
(define (database-count-narinfo-files database)
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1240,7 +1309,7 @@ SELECT COUNT(*) FROM narinfo_files"
size
compression)
(call-with-worker-thread
- (database-writer-thread-channel database)
+ (database-writer-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1271,7 +1340,7 @@ INSERT INTO cached_narinfo_files (
"select_cached_narinfo_file_by_hash"
(lambda ()
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1308,7 +1377,7 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash
"select_cached_narinfo_file_by_narinfo_id"
(lambda ()
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1350,7 +1419,7 @@ WHERE narinfo_id = :narinfo_id"
"select_cached_narinfo_file_by_narinfo_id_and_compression"
(lambda ()
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1387,7 +1456,7 @@ WHERE narinfo_id = :narinfo_id
proc
init)
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1416,7 +1485,7 @@ INNER JOIN narinfos
(define (database-remove-cached-narinfo-file database narinfo-id compression)
(call-with-worker-thread
- (database-writer-thread-channel database)
+ (database-writer-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1441,7 +1510,7 @@ WHERE narinfo_id = :narinfo_id
"select_scheduled_narinfo_removal"
(lambda ()
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1473,7 +1542,7 @@ WHERE narinfo_id = :narinfo_id"
"select_scheduled_narinfo_removal"
(lambda ()
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1501,7 +1570,7 @@ WHERE cached_narinfo_file_id = :cached_narinfo_file_id"
(define (database-delete-scheduled-cached-narinfo-removal database
cached-narinfo-file-id)
(call-with-worker-thread
- (database-writer-thread-channel database)
+ (database-writer-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1522,7 +1591,7 @@ RETURNING 1"
(define (database-select-oldest-scheduled-cached-narinfo-removal database)
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1562,7 +1631,7 @@ LIMIT 1"
(define (database-count-scheduled-cached-narinfo-removal database)
(call-with-worker-thread
- (database-reader-thread-channel database)
+ (database-reader-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
@@ -1582,7 +1651,7 @@ SELECT COUNT(*) FROM scheduled_cached_narinfo_removal"
cached-narinfo-file-id
removal-datetime)
(call-with-worker-thread
- (database-writer-thread-channel database)
+ (database-writer-thread-set database)
(lambda (db)
(let ((statement
(sqlite-prepare
diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm
index a784165..67d4c00 100644
--- a/nar-herder/mirror.scm
+++ b/nar-herder/mirror.scm
@@ -32,6 +32,8 @@
#:use-module (json)
#:use-module (fibers)
#:use-module (fibers channels)
+ #:use-module (knots timeout)
+ #:use-module (knots non-blocking)
#:use-module (guix narinfo)
#:use-module ((guix store) #:select (store-path-hash-part))
#:use-module (nar-herder utils)
@@ -41,7 +43,11 @@
(define (start-fetch-changes-fiber database metrics-registry
storage-root mirror
+ addition-channel
cached-compression-management-channel)
+ (define recent-changes-count-metric
+ (metrics-registry-fetch-metric metrics-registry "recent_changes_count"))
+
(define (request-recent-changes)
(define latest-recent-change
(database-select-latest-recent-change-datetime database))
@@ -73,9 +79,7 @@
latest-recent-change)
(with-port-timeouts
(lambda ()
- (let ((port
- socket
- (open-socket-for-uri* uri)))
+ (let ((port (non-blocking-open-socket-for-uri uri)))
(http-get uri
#:port port
#:streaming? #t)))
@@ -105,6 +109,8 @@
(unless (member (strip-change-datetime change-details)
processed-recent-changes)
(let ((change (assq-ref change-details 'change)))
+ (metric-increment recent-changes-count-metric)
+
(cond
((string=? change "addition")
(let ((narinfo
@@ -122,23 +128,18 @@
(assq-ref change-details
'datetime))
- (and=> (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (lambda (metric)
- ;; Just update this metric if it
- ;; exists, since if it does, it
- ;; should be set to a value
- (let ((new-files-count
- (length (narinfo-uris narinfo))))
- (metric-increment
- metric
- #:by new-files-count
- ;; TODO This should be
- ;; checked, rather than
- ;; assumed to be false
- #:label-values '((stored . "false"))))))))
+ (when addition-channel
+ (for-each
+ (lambda (uri)
+ (spawn-fiber
+ (lambda ()
+ (put-message addition-channel
+ `(addition ,(uri-path uri))))))
+ (narinfo-uris narinfo)))))
+
((string=? change "removal")
(let ((store-path (assq-ref change-details 'data)))
+ ;; TODO Use the nar removal fiber
(log-msg 'INFO "processing removal change for "
store-path
" (" (assq-ref change-details 'datetime) ")")
@@ -191,6 +192,11 @@
(spawn-fiber
(lambda ()
+ (let ((recent-changes-count
+ (database-count-recent-changes database)))
+ (metric-set recent-changes-count-metric recent-changes-count)
+ (log-msg 'DEBUG recent-changes-count " recent changes in the database"))
+
(while #t
(with-exception-handler
(lambda (exn)
diff --git a/nar-herder/recent-changes.scm b/nar-herder/recent-changes.scm
index ccfff93..62bd604 100644
--- a/nar-herder/recent-changes.scm
+++ b/nar-herder/recent-changes.scm
@@ -137,35 +137,38 @@
(log-msg 'ERROR "exception in recent change listener " exn)
#f)
(lambda ()
- (let* ((recent-changes
- (database-select-recent-changes database after))
- (unprocessed-recent-changes
- (remove
+ (with-throw-handler #t
+ (lambda ()
+ (let* ((recent-changes
+ (database-select-recent-changes database after))
+ (unprocessed-recent-changes
+ (remove
+ (lambda (change-details)
+ (member change-details last-processed-recent-changes))
+ recent-changes)))
+
+ (unless (null? unprocessed-recent-changes)
+ (log-msg 'INFO "processing " (length unprocessed-recent-changes)
+ " recent changes")
+
+ (for-each
(lambda (change-details)
- (member change-details last-processed-recent-changes))
- recent-changes)))
-
- (unless (null? unprocessed-recent-changes)
- (log-msg 'INFO "processing " (length unprocessed-recent-changes)
- " recent changes")
-
- (metric-increment recent-changes-count-metric
- #:by (length unprocessed-recent-changes))
-
- (for-each
- (lambda (change-details)
- (let ((change (assq-ref change-details 'change)))
- (cond
- ((string=? change "addition")
- (process-addition-change change-details))
- ((string=? change "removal")
- (process-removal-change change-details))
- (else #f))))
- unprocessed-recent-changes))
-
- ;; Use the unprocessed recent changes here to carry
- ;; forward all processed changes to the next pass
- unprocessed-recent-changes))
+ (let ((change (assq-ref change-details 'change)))
+ (cond
+ ((string=? change "addition")
+ (process-addition-change change-details))
+ ((string=? change "removal")
+ (process-removal-change change-details))
+ (else #f))))
+ unprocessed-recent-changes)
+
+ (metric-increment recent-changes-count-metric
+ #:by (length unprocessed-recent-changes)))
+ ;; Use the unprocessed recent changes here to carry
+ ;; forward all processed changes to the next pass
+ unprocessed-recent-changes))
+ (lambda _
+ (backtrace))))
#:unwind? #t)
(#f (loop after '()))
(recent-changes
diff --git a/nar-herder/server.scm b/nar-herder/server.scm
index fecf166..b904675 100644
--- a/nar-herder/server.scm
+++ b/nar-herder/server.scm
@@ -38,6 +38,10 @@
#:use-module (fibers scheduler)
#:use-module (fibers conditions)
#:use-module (fibers operations)
+ #:use-module (knots)
+ #:use-module (knots timeout)
+ #:use-module (knots web-server)
+ #:use-module (knots non-blocking)
#:use-module (prometheus)
#:use-module (json)
#:use-module ((system foreign)
@@ -217,6 +221,10 @@
(response_code . ,response-code)
,@labels)))
+ (define loop-detections-metric
+ (make-counter-metric metrics-registry
+ "loop_detections_total"))
+
(define %compression-strings
(map symbol->string
%compression-options))
@@ -226,7 +234,12 @@
'DEBUG
(request-method request)
" "
- (uri-path (request-uri request)))
+ (uri-path (request-uri request))
+ (let ((via (request-via request)))
+ (if (null? via)
+ ""
+ (string-append
+ " (Via: " (string-join via ", ") ")"))))
(match (cons (request-method request)
(split-and-decode-uri-path
@@ -324,8 +337,16 @@
(string-take narinfo 32)))))
(values (build-response #:code 404)
"404"))))
- (((or 'HEAD 'GET) "nar" compression filename)
- (let* ((hash (and (>= (string-length filename) 32)
+ ;; TODO The uris in narinfo files can be anything I believe,
+ ;; which doesn't match up with this code
+ (((or 'HEAD 'GET) "nar" rest ...)
+ (let* ((compression
+ (if (= (length rest) 1)
+ "none"
+ (first rest)))
+ (filename
+ (last rest))
+ (hash (and (>= (string-length filename) 32)
(string-take filename 32)))
(narinfo
(and hash
@@ -369,6 +390,7 @@
(assq-ref narinfo 'id)))
(when loop?
+ (metric-increment loop-detections-metric)
(log-msg logger 'WARN
(request-method request)
" "
@@ -395,11 +417,11 @@
(request-via request)))
(values (build-response
#:code 200
- #:headers `((X-Accel-Redirect
- . ,(string-append
- "/internal/nar/"
- compression "/"
- (uri-encode filename)))))
+ #:headers
+ `((X-Accel-Redirect
+ . ,(string-append
+ "/internal"
+ (assq-ref narinfo-file-for-compression 'url)))))
#f)))
(let ((cached-narinfo-file
(and narinfo ; must be a known hash
@@ -437,11 +459,14 @@
(if cached-narinfo-file
(values (build-response
#:code 200
- #:headers `((X-Accel-Redirect
- . ,(string-append
- "/internal/cached-nar/"
- compression "/"
- (uri-encode filename)))))
+ #:headers
+ `((X-Accel-Redirect
+ . ,(string-append
+ "/internal/cached-nar/"
+ ;; This must match up with
+ ;; add-cached-compressions-to-narinfo
+ compression "/"
+ (uri-encode filename)))))
#f)
(values (build-response #:code 404)
"404"))))))
@@ -571,8 +596,7 @@
(simple-format (current-error-port)
"starting downloading the database\n")
(let ((port
- socket
- (open-socket-for-uri* database-uri)))
+ (non-blocking-open-socket-for-uri database-uri)))
(http-get database-uri
#:port port
#:streaming? #t)))
@@ -735,7 +759,16 @@
compression)
'directory)))
(utime (string-append directory "/" filename))))
- maintenance-scheduler)))))
+ maintenance-scheduler))))
+
+ (nar-removal-criteria
+ (filter-map
+ (match-lambda
+ ((key . val)
+ (if (eq? key 'storage-nar-removal-criteria)
+ val
+ #f)))
+ opts)))
(if (string=? (assq-ref opts 'database-dump)
"disabled")
@@ -770,43 +803,32 @@
(assq-ref opts 'recent-changes-limit))
(let ((mirror-channel
- (and=>
- (assq-ref opts 'mirror)
- (lambda (mirror)
- (start-fetch-changes-fiber
- database
- metrics-registry
- canonical-storage
- mirror
- cached-compression-management-channel)
-
- (if (assq-ref opts 'storage)
- (start-mirroring-fiber database
- mirror
- (assq-ref opts 'storage-limit)
- canonical-storage
- metrics-registry)
- #f))))
+ (and (assq-ref opts 'mirror)
+ (assq-ref opts 'storage)
+ (start-mirroring-fiber database
+ (assq-ref opts 'mirror)
+ (assq-ref opts 'storage-limit)
+ (assq-ref opts 'minimum-free-space)
+ canonical-storage
+ metrics-registry)))
(removal-channel
- (let ((nar-removal-criteria
- (filter-map
- (match-lambda
- ((key . val)
- (if (eq? key 'storage-nar-removal-criteria)
- val
- #f)))
- opts)))
- (if (and (assq-ref opts 'storage)
- (number? (assq-ref opts 'storage-limit))
- (not (null? nar-removal-criteria)))
- (start-nar-removal-fiber database
- canonical-storage
- (assq-ref opts 'storage-limit)
- metrics-registry
- nar-removal-criteria)
- #f)))
+ (start-nar-removal-fiber
+ database
+ canonical-storage
+ (assq-ref opts 'storage-limit)
+ metrics-registry
+ nar-removal-criteria))
(addition-channel (make-channel)))
+ (when (assq-ref opts 'mirror)
+ (start-fetch-changes-fiber
+ database
+ metrics-registry
+ canonical-storage ; might be #f, but that's fine here
+ (assq-ref opts 'mirror)
+ addition-channel
+ cached-compression-management-channel))
+
(spawn-fiber
(lambda ()
(while #t
@@ -817,10 +839,25 @@
(lambda ()
(match (get-message addition-channel)
(('addition file)
+ (apply update-nar-files-metric
+ metrics-registry
+ '()
+ (if (and canonical-storage
+ (file-exists?
+ (string-append canonical-storage
+ (uri-decode file))))
+ '(#:stored-addition-count 1)
+ '(#:not-stored-addition-count 1)))
+
(when mirror-channel
- (put-message mirror-channel
- `(fetch ,file)))
- (when removal-channel
+ (spawn-fiber
+ (lambda ()
+ (put-message mirror-channel
+ `(fetch ,file)))))
+
+ (when (and (assq-ref opts 'storage)
+ (number? (assq-ref opts 'storage-limit))
+ (not (null? nar-removal-criteria)))
(spawn-fiber
(lambda ()
(sleep 60)
@@ -837,11 +874,12 @@
file)))))))
#:unwind? #t))))
- (start-recent-change-listener-fiber
- database
- metrics-registry
- addition-channel
- removal-channel))
+ (unless (assq-ref opts 'mirror)
+ (start-recent-change-listener-fiber
+ database
+ metrics-registry
+ addition-channel
+ removal-channel)))
(unless (null? enabled-cached-compressions)
(let ((cached-compression-removal-fiber-wakeup-channel
@@ -885,10 +923,10 @@
(iota (length schedulers))
schedulers))
- (log-msg 'INFO "starting server, listening on "
+ (log-msg 'INFO "starting server (" (getpid) "), listening on "
(assq-ref opts 'host) ":" (assq-ref opts 'port))
- (run-server/patched
+ (run-knots-web-server
(make-request-handler
database
canonical-storage
diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm
index fc49b2d..0e7186d 100644
--- a/nar-herder/storage.scm
+++ b/nar-herder/storage.scm
@@ -28,11 +28,14 @@
#:use-module (web response)
#:use-module (fibers)
#:use-module (fibers channels)
+ #:use-module (knots timeout)
+ #:use-module (knots non-blocking)
#:use-module (logging logger)
#:use-module (logging port-log)
#:use-module (prometheus)
#:use-module (json)
#:use-module ((guix build utils) #:select (dump-port mkdir-p))
+ #:use-module ((guix build syscalls) #:select (free-disk-space))
#:use-module ((guix store) #:select (store-path-hash-part))
#:use-module (guix progress)
#:use-module (nar-herder utils)
@@ -41,6 +44,7 @@
remove-nar-files-by-hash
initialise-storage-metrics
+ update-nar-files-metric
check-storage
removal-channel-remove-nar-from-storage
@@ -76,7 +80,8 @@
(file-exists? filename)))
(when exists?
(remove-nar-from-storage storage-root
- (assq-ref file 'url)))
+ (uri-decode
+ (assq-ref file 'url))))
(and=> (metrics-registry-fetch-metric metrics-registry
"nar_files_total")
@@ -91,6 +96,8 @@
(define (get-storage-size storage-root)
(define enter? (const #t))
(define (leaf name stat result)
+ ;; Allow other fibers to run
+ (sleep 0)
(+ result
(or (and=> (stat:blocks stat)
(lambda (blocks)
@@ -166,8 +173,7 @@
(unrecognised-files . ,(hash-map->list (lambda (key _) key)
files-hash)))))
-;; TODO Maybe remove the metrics-registry argument?
-(define* (fold-nar-files database storage-root metrics-registry
+(define* (fold-nar-files database storage-root
proc init
#:key stored?)
(define stored-files-count 0)
@@ -181,8 +187,10 @@
(uri-decode
(assq-ref nar 'url)))
(nar-stored?
- (file-exists?
- (string-append storage-root url))))
+ (if storage-root
+ (file-exists?
+ (string-append storage-root url))
+ #f)))
(if nar-stored?
(set! stored-files-count (1+ stored-files-count))
@@ -202,7 +210,9 @@
(define* (update-nar-files-metric metrics-registry
nar-file-counts
- #:key fetched-count removed-count)
+ #:key fetched-count removed-count
+ not-stored-addition-count
+ stored-addition-count)
;; Avoid incrementing or decrementing the metric if it hasn't been
;; set yet
@@ -245,7 +255,17 @@
#:label-values '((stored . "true")))
(metric-increment nar-files-metric
#:by removed-count
- #:label-values '((stored . "false")))))))
+ #:label-values '((stored . "false"))))
+
+ (when not-stored-addition-count
+ (metric-increment nar-files-metric
+ #:by not-stored-addition-count
+ #:label-values '((stored . "false"))))
+
+ (when stored-addition-count
+ (metric-increment nar-files-metric
+ #:by stored-addition-count
+ #:label-values '((stored . "true")))))))
(define (initialise-storage-metrics database storage-root metrics-registry)
;; Use a database transaction to block changes
@@ -258,7 +278,6 @@
(fold-nar-files
database
storage-root
- metrics-registry
(const #f)
#f
#:stored? 'both)))
@@ -271,32 +290,36 @@
(define files-count
(database-count-narinfo-files database))
- (call-with-progress-reporter
- (progress-reporter/bar files-count
- (simple-format #f "checking ~A files" files-count)
- (current-error-port))
- (lambda (report)
- (fold-nar-files
- database
- storage-root
- metrics-registry
- (lambda (file _)
- (let* ((full-filename
- (string-append storage-root
- (uri-decode (assq-ref file 'url))))
- (file-size
- (stat:size (stat full-filename)))
- (database-size
- (assq-ref file 'size)))
- (report)
- (unless (= file-size database-size)
- (newline)
- (log-msg 'WARN "file " full-filename
- " has inconsistent size (database: "
- database-size ", file: " file-size ")"))
- #f))
- #f
- #:stored? 'both))))
+ (let ((not-stored-count
+ (call-with-progress-reporter
+ (progress-reporter/bar files-count
+ (simple-format #f "checking ~A files" files-count)
+ (current-error-port))
+ (lambda (report)
+ (fold-nar-files
+ database
+ storage-root
+ (lambda (file not-stored-count)
+ (let* ((full-filename
+ (string-append storage-root
+ (uri-decode (assq-ref file 'url)))))
+ (if (file-exists? full-filename)
+ (let ((file-size
+ (stat:size (stat full-filename)))
+ (database-size
+ (assq-ref file 'size)))
+ (report)
+ (unless (= file-size database-size)
+ (newline)
+ (log-msg 'WARN "file " full-filename
+ " has inconsistent size (database: "
+ database-size ", file: " file-size ")"))
+ not-stored-count)
+ (+ 1 not-stored-count))))
+ 0
+ #:stored? 'both)))))
+
+ (log-msg 'INFO "finished checking, " not-stored-count " files not stored")))
(define (at-most max-length lst)
"If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise
@@ -335,7 +358,7 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
;; Open a new connection to URI and evict old entries from
;; CACHE, if any.
(let ((socket
- (open-socket-for-uri*
+ (non-blocking-open-socket-for-uri
uri
#:verify-certificate? verify-certificate?))
(new-cache evicted
@@ -378,8 +401,10 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
metrics-registry
nar-removal-criteria)
(define storage-size-metric
- (make-gauge-metric metrics-registry
- "storage_size_bytes"))
+ (or (metrics-registry-fetch-metric metrics-registry
+ "storage_size_bytes")
+ (make-gauge-metric metrics-registry
+ "storage_size_bytes")))
(define removal-channel
(make-channel))
@@ -404,31 +429,33 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
".narinfo/info"))))
(with-port-timeouts
(lambda ()
- (call-with-values
- (lambda ()
- (retry-on-error
- (lambda ()
- (call-with-cached-connection uri
- (lambda (port)
+ (retry-on-error
+ (lambda ()
+ (call-with-cached-connection
+ uri
+ (lambda (port)
+ (let ((response
+ body
(http-get uri
#:port port
#:decode-body? #f
#:keep-alive? #t
- #:streaming? #t))))
- #:times 3
- #:delay 5))
- (lambda (response body)
- (and (= (response-code response)
- 200)
-
- (let ((json-body (json->scm body)))
- (eq? (assoc-ref json-body "stored")
- #t))))))
+ #:streaming? #t)))
+ (and (= (response-code response)
+ 200)
+ (let ((json-body (json->scm body)))
+ (eq? (assoc-ref json-body "stored")
+ #t)))))))
+ #:times 3
+ #:delay 5))
#:timeout 30)))))
(define (nar-can-be-removed? nar)
+ (log-msg 'INFO "checking if " (assq-ref nar 'url) " can be removed")
(any (lambda (criteria)
- (check-removal-criteria nar criteria))
+ (let ((result (check-removal-criteria nar criteria)))
+ (log-msg 'INFO "removal criteria (" criteria "): " result)
+ result))
nar-removal-criteria))
(define (run-removal-pass)
@@ -447,7 +474,6 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
(fold-nar-files
database
storage-root
- metrics-registry
(lambda (nar result)
(match result
((storage-size . removed-count)
@@ -477,9 +503,6 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
(log-msg 'INFO "finished looking for nars to remove, removed "
removed-count " files"))))))
- (when (null? nar-removal-criteria)
- (error "must be some removal criteria"))
-
(spawn-fiber
(lambda ()
(while #t
@@ -494,6 +517,9 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
(lambda ()
(with-throw-handler #t
(lambda ()
+ (when (null? nar-removal-criteria)
+ (error "must be some removal criteria"))
+
(cond
((not (file-exists?
(string-append storage-root
@@ -523,35 +549,49 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
(log-msg 'ERROR "failed to remove " file ": " exn))
(lambda ()
;; TODO: Do more checking at this point
- (remove-nar-from-storage
- storage-root
- (uri-decode file))
+ (when storage-root
+ (remove-nar-from-storage
+ storage-root
+ (uri-decode file)))
(update-nar-files-metric metrics-registry
'()
#:removed-count 1))
#:unwind? #t))))))
- (spawn-fiber
- (lambda ()
- (while #t
- (with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "nar removal pass failed " exn))
- run-removal-pass
- #:unwind? #t)
- (sleep (* 60 60 24)))))
+ (when (and storage-root
+ (not (null? nar-removal-criteria)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "nar removal pass failed " exn))
+ run-removal-pass
+ #:unwind? #t)
+ (sleep (* 60 60 24))))))
removal-channel)
-(define (start-mirroring-fiber database mirror storage-limit storage-root
- metrics-registry)
+(define (start-mirroring-fiber database mirror storage-limit minimum-free-space
+ storage-root metrics-registry)
- (define no-storage-limit?
- (not (integer? storage-limit)))
+ (define storage-limit?
+ (integer? storage-limit))
+
+ (define minimum-free-space?
+ (integer? minimum-free-space))
(define storage-size-metric
- (make-gauge-metric metrics-registry
- "storage_size_bytes"))
+ (or (metrics-registry-fetch-metric metrics-registry
+ "storage_size_bytes")
+ (make-gauge-metric metrics-registry
+ "storage_size_bytes")))
+
+ (define storage-free-space-metric
+ (or (metrics-registry-fetch-metric metrics-registry
+ "storage_free_space_bytes")
+ (make-gauge-metric metrics-registry
+ "storage_free_space_bytes")))
(define (fetch-file file)
(let* ((string-url
@@ -582,8 +622,7 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
(call-with-values
(lambda ()
(let ((port
- socket
- (open-socket-for-uri* uri)))
+ (non-blocking-open-socket-for-uri uri)))
(http-get uri
#:port port
#:decode-body? #f
@@ -608,24 +647,32 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
'()
#:fetched-count 1)))
- (define (download-nars initial-storage-size)
- ;; If there's free space, then consider downloading missing nars
- (if (< initial-storage-size storage-limit)
+ (define (download-nars initial-storage-size initial-free-space)
+ (define effective-storage-limit
+ (cond
+ ((and storage-limit? minimum-free-space?)
+ (min storage-limit
+ (+ initial-storage-size
+ (- initial-free-space minimum-free-space))))
+ (storage-limit? storage-limit)
+ (minimum-free-space?
+ (+ initial-storage-size
+ (- initial-free-space minimum-free-space)))))
+
+ (if (< initial-storage-size effective-storage-limit)
(let ((result
nar-file-counts
(fold-nar-files
database
storage-root
- metrics-registry
(lambda (file result)
(log-msg 'DEBUG "considering "
(assq-ref file 'url))
(match result
((storage-size . fetched-count)
(let ((file-bytes (assq-ref file 'size)))
- (if (or no-storage-limit?
- (< (+ storage-size file-bytes)
- storage-limit))
+ (if (< (+ storage-size file-bytes)
+ effective-storage-limit)
(let ((success?
(with-exception-handler
(lambda (exn)
@@ -694,7 +741,6 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
(fold-nar-files
database
storage-root
- metrics-registry
(lambda (nar _)
(put-message channel
(assq-ref nar 'url))
@@ -717,13 +763,18 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
(define (run-mirror-pass)
(log-msg 'DEBUG "running mirror pass")
(let ((initial-storage-size (with-time-logging "getting storage size"
- (get-storage-size storage-root))))
+ (get-storage-size storage-root)))
+ (free-space
+ (free-disk-space storage-root)))
(metric-set storage-size-metric
initial-storage-size)
+ (metric-set storage-free-space-metric
+ free-space)
(let ((fetched-count
- (if no-storage-limit?
- (fast-download-nars)
- (download-nars initial-storage-size))))
+ (if (or storage-limit? minimum-free-space?)
+ (download-nars initial-storage-size
+ free-space)
+ (fast-download-nars))))
(log-msg 'DEBUG "finished mirror pass (fetched " fetched-count " nars)"))))
(let ((channel (make-channel)))
@@ -742,10 +793,10 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
(lambda (exn)
(log-msg 'ERROR "failed to mirror " file ": " exn))
(lambda ()
- (fetch-file file)
- (update-nar-files-metric metrics-registry
- '()
- #:fetched-count 1))
+ (unless (file-exists?
+ (string-append storage-root
+ (uri-decode file)))
+ (fetch-file file)))
#:unwind? #t))))))
(spawn-fiber
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index 4755d33..4155ea0 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -31,10 +31,6 @@
#:use-module (ice-9 binary-ports)
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
- #:use-module (ice-9 suspendable-ports)
- #:use-module ((ice-9 ports internal) #:select (port-poll
- port-read-wait-fd
- port-write-wait-fd))
#:use-module (web uri)
#:use-module (web http)
#:use-module (web client)
@@ -46,30 +42,12 @@
#:use-module (fibers scheduler)
#:use-module (fibers conditions)
#:use-module (fibers operations)
- #:use-module ((guix build syscalls)
- #:select (set-thread-name))
- #:export (make-worker-thread-set
- call-with-worker-thread
-
- call-with-time-logging
+ #:export (call-with-time-logging
with-time-logging
retry-on-error
- create-work-queue
-
- check-locale!
-
- open-socket-for-uri*
-
- call-with-sigint
- run-server/patched
-
- timeout-error?
-
- port-read-timeout-error?
- port-write-timeout-error?
- with-port-timeouts))
+ check-locale!))
(define* (retry-on-error f #:key times delay ignore error-hook)
(let loop ((attempt 1))
@@ -154,218 +132,6 @@
"Log under NAME the time taken to evaluate EXP."
(call-with-time-logging name (lambda () exp ...)))
-(define* (create-work-queue thread-count-parameter proc
- #:key thread-start-delay
- (thread-stop-delay
- (make-time time-duration 0 0))
- (name "unnamed")
- priority<?)
- (let ((queue (make-q))
- (queue-mutex (make-mutex))
- (job-available (make-condition-variable))
- (running-job-args (make-hash-table)))
-
- (define get-thread-count
- (cond
- ((number? thread-count-parameter)
- (const thread-count-parameter))
- ((eq? thread-count-parameter #f)
- ;; Run one thread per job
- (lambda ()
- (+ (q-length queue)
- (hash-count (lambda (index val)
- (list? val))
- running-job-args))))
- (else
- thread-count-parameter)))
-
- (define process-job
- (if priority<?
- (lambda* (args #:key priority)
- (with-mutex queue-mutex
- (enq! queue (cons priority args))
- (set-car!
- queue
- (stable-sort! (car queue)
- (lambda (a b)
- (priority<?
- (car a)
- (car b)))))
- (sync-q! queue)
- (start-new-threads-if-necessary (get-thread-count))
- (signal-condition-variable job-available)))
- (lambda args
- (with-mutex queue-mutex
- (enq! queue args)
- (start-new-threads-if-necessary (get-thread-count))
- (signal-condition-variable job-available)))))
-
- (define (count-threads)
- (with-mutex queue-mutex
- (hash-count (const #t) running-job-args)))
-
- (define (count-jobs)
- (with-mutex queue-mutex
- (+ (q-length queue)
- (hash-count (lambda (index val)
- (list? val))
- running-job-args))))
-
- (define (list-jobs)
- (with-mutex queue-mutex
- (append (if priority<?
- (map cdr (car queue))
- (list-copy (car queue)))
- (hash-fold (lambda (key val result)
- (if val
- (cons val result)
- result))
- '()
- running-job-args))))
-
- (define (thread-process-job job-args)
- (with-exception-handler
- (lambda (exn)
- (simple-format (current-error-port)
- "~A work queue, job raised exception ~A: ~A\n"
- name job-args exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply proc job-args))
- (lambda (key . args)
- (simple-format
- (current-error-port)
- "~A work queue, exception when handling job: ~A ~A\n"
- name key args)
- (backtrace))))
- #:unwind? #t))
-
- (define (start-thread thread-index)
- (define (too-many-threads?)
- (let ((running-jobs-count
- (hash-count (lambda (index val)
- (list? val))
- running-job-args))
- (desired-thread-count (get-thread-count)))
-
- (>= running-jobs-count
- desired-thread-count)))
-
- (define (thread-idle-for-too-long? last-job-finished-at)
- (time>=?
- (time-difference (current-time time-monotonic)
- last-job-finished-at)
- thread-stop-delay))
-
- (define (stop-thread)
- (hash-remove! running-job-args
- thread-index)
- (unlock-mutex queue-mutex))
-
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append name " q t "
- (number->string thread-index))))
- (const #t))
-
- (let loop ((last-job-finished-at (current-time time-monotonic)))
- (lock-mutex queue-mutex)
-
- (if (too-many-threads?)
- (stop-thread)
- (let ((job-args
- (if (q-empty? queue)
- ;; #f from wait-condition-variable indicates a timeout
- (if (wait-condition-variable
- job-available
- queue-mutex
- (+ 9 (time-second (current-time))))
- ;; Another thread could have taken
- ;; the job in the mean time
- (if (q-empty? queue)
- #f
- (if priority<?
- (cdr (deq! queue))
- (deq! queue)))
- #f)
- (if priority<?
- (cdr (deq! queue))
- (deq! queue)))))
-
- (if job-args
- (begin
- (hash-set! running-job-args
- thread-index
- job-args)
-
- (unlock-mutex queue-mutex)
- (thread-process-job job-args)
-
- (with-mutex queue-mutex
- (hash-set! running-job-args
- thread-index
- #f))
-
- (loop (current-time time-monotonic)))
- (if (thread-idle-for-too-long? last-job-finished-at)
- (stop-thread)
- (begin
- (unlock-mutex queue-mutex)
-
- (loop last-job-finished-at))))))))))
-
-
- (define start-new-threads-if-necessary
- (let ((previous-thread-started-at (make-time time-monotonic 0 0)))
- (lambda (desired-count)
- (let* ((thread-count
- (hash-count (const #t) running-job-args))
- (threads-to-start
- (- desired-count thread-count)))
- (when (> threads-to-start 0)
- (for-each
- (lambda (thread-index)
- (when (eq? (hash-ref running-job-args
- thread-index
- 'slot-free)
- 'slot-free)
- (let* ((now (current-time time-monotonic))
- (elapsed (time-difference now
- previous-thread-started-at)))
- (when (or (eq? #f thread-start-delay)
- (time>=? elapsed thread-start-delay))
- (set! previous-thread-started-at now)
- (hash-set! running-job-args
- thread-index
- #f)
- (start-thread thread-index)))))
- (iota desired-count)))))))
-
- (if (procedure? thread-count-parameter)
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append name " q t")))
- (const #t))
-
- (while #t
- (sleep 15)
- (with-mutex queue-mutex
- (let ((idle-threads (hash-count (lambda (index val)
- (eq? #f val))
- running-job-args)))
- (when (= 0 idle-threads)
- (start-new-threads-if-necessary (get-thread-count))))))))
- (start-new-threads-if-necessary (get-thread-count)))
-
- (values process-job count-jobs count-threads list-jobs)))
-
(define (check-locale!)
(with-exception-handler
(lambda (exn)
@@ -393,396 +159,3 @@ falling back to en_US.utf8\n"
(lambda _
(setlocale LC_ALL ""))
#:unwind? #t))
-
-(define-record-type <worker-thread-set>
- (worker-thread-set channel arguments-parameter)
- worker-thread-set?
- (channel worker-thread-set-channel)
- (arguments-parameter worker-thread-set-arguments-parameter))
-
-(define* (make-worker-thread-set initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- (duration-logger (const #f))
- destructor
- lifetime
- (log-exception? (const #t))
- (expire-on-exception? #f)
- (name "unnamed"))
- (define param
- (make-parameter #f))
-
- (define (initializer/safe)
- (let ((args
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running initializer in worker thread (~A): ~A:\n ~A\n"
- name
- initializer
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- initializer
- (lambda args
- (backtrace))))
- #:unwind? #t)))
-
- (if args
- args
- ;; never give up, just keep retrying
- (begin
- (sleep 5)
- (initializer/safe)))))
-
- (define (destructor/safe args)
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (simple-format
- (current-error-port)
- "exception running destructor in worker thread (~A): ~A:\n ~A\n"
- name
- initializer
- exn)
- #f)
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (apply destructor args)
- #t)
- (lambda _
- (backtrace))))
- #:unwind? #t)))
-
- (or success?
- #t
- (begin
- (sleep 5)
- (destructor/safe args)))))
-
- (let ((channel (make-channel)))
- (for-each
- (lambda (thread-index)
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append
- name " w t "
- (number->string thread-index))))
- (const #t))
-
- (let init ((args (initializer/safe)))
- (parameterize ((param args))
- (let loop ((current-lifetime lifetime))
- (let ((exception?
- (match (get-message channel)
- (((? channel? reply) sent-time (? procedure? proc))
- (let ((time-delay
- (- (get-internal-real-time)
- sent-time)))
- (delay-logger (/ time-delay
- internal-time-units-per-second)
- proc)
-
- (let* ((start-time (get-internal-real-time))
- (response
- (with-exception-handler
- (lambda (exn)
- (list 'worker-thread-error
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (call-with-values
- (lambda ()
- (apply proc args))
- (lambda vals
- (cons (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- vals))))
- (lambda args
- (when (match args
- (('%exception exn)
- (log-exception? exn))
- (_ #t))
- (simple-format
- (current-error-port)
- "worker-thread: exception: ~A\n" args)
- (backtrace)))))
- #:unwind? #t)))
- (put-message reply
- response)
-
- (match response
- (('worker-thread-error duration _)
- (when duration-logger
- (duration-logger duration proc))
- #t)
- ((duration . _)
- (when duration-logger
- (duration-logger duration proc))
- #f))))))))
-
- (unless (and expire-on-exception?
- exception?)
- (if (number? current-lifetime)
- (unless (< current-lifetime 0)
- (loop (if current-lifetime
- (- current-lifetime 1)
- #f)))
- (loop #f))))))
-
- (when destructor
- (destructor/safe args))
-
- (init (initializer/safe))))))
- (iota parallelism))
-
- (worker-thread-set channel
- param)))
-
-(define* (call-with-worker-thread record proc #:key duration-logger)
- "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
-If already in the worker thread, call PROC immediately."
- (let ((args ((worker-thread-set-arguments-parameter record))))
- (if args
- (apply proc args)
- (let ((reply (make-channel)))
- (put-message (worker-thread-set-channel record)
- (list reply (get-internal-real-time) proc))
- (match (get-message reply)
- (('worker-thread-error duration exn)
- (when duration-logger
- (duration-logger duration))
- (raise-exception exn))
- ((duration . result)
- (when duration-logger
- (duration-logger duration))
- (apply values result)))))))
-
-(define* (open-socket-for-uri* uri
- #:key (verify-certificate? #t))
- (define tls-wrap
- (@@ (web client) tls-wrap))
-
- (define https?
- (eq? 'https (uri-scheme uri)))
-
- (define plain-uri
- (if https?
- (build-uri
- 'http
- #:userinfo (uri-userinfo uri)
- #:host (uri-host uri)
- #:port (or (uri-port uri) 443)
- #:path (uri-path uri)
- #:query (uri-query uri)
- #:fragment (uri-fragment uri))
- uri))
-
- (let ((s (open-socket-for-uri plain-uri)))
- (values
- (if https?
- (let ((port
- (tls-wrap s (uri-host uri)
- #:verify-certificate? verify-certificate?)))
- ;; Guile/guile-gnutls don't handle the handshake happening on a non
- ;; blocking socket, so change the behavior here.
- (let ((flags (fcntl s F_GETFL)))
- (fcntl s F_SETFL (logior O_NONBLOCK flags)))
- port)
- (let ((flags (fcntl s F_GETFL)))
- (fcntl s F_SETFL (logior O_NONBLOCK flags))
- s))
- s)))
-
-;; Copied from (fibers web server)
-(define (call-with-sigint thunk cvar)
- (let ((handler #f))
- (dynamic-wind
- (lambda ()
- (set! handler
- (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
- thunk
- (lambda ()
- (if handler
- ;; restore Scheme handler, SIG_IGN or SIG_DFL.
- (sigaction SIGINT (car handler) (cdr handler))
- ;; restore original C handler.
- (sigaction SIGINT #f))))))
-
-;; This variant of run-server from the fibers library supports running
-;; multiple servers within one process.
-(define run-server/patched
- (let ((fibers-web-server-module
- (resolve-module '(fibers web server))))
-
- (define set-nonblocking!
- (module-ref fibers-web-server-module 'set-nonblocking!))
-
- (define make-default-socket
- (module-ref fibers-web-server-module 'make-default-socket))
-
- (define socket-loop
- (module-ref fibers-web-server-module 'socket-loop))
-
- (lambda* (handler
- #:key
- (host #f)
- (family AF_INET)
- (addr (if host
- (inet-pton family host)
- INADDR_LOOPBACK))
- (port 8080)
- (socket (make-default-socket family addr port)))
- ;; We use a large backlog by default. If the server is suddenly hit
- ;; with a number of connections on a small backlog, clients won't
- ;; receive confirmation for their SYN, leading them to retry --
- ;; probably successfully, but with a large latency.
- (listen socket 1024)
- (set-nonblocking! socket)
- (sigaction SIGPIPE SIG_IGN)
- (spawn-fiber (lambda () (socket-loop socket handler))))))
-
-;; These procedure are subject to spurious wakeups.
-
-(define (readable? port)
- "Test if PORT is writable."
- (match (select (vector port) #() #() 0)
- ((#() #() #()) #f)
- ((#(_) #() #()) #t)))
-
-(define (writable? port)
- "Test if PORT is writable."
- (match (select #() (vector port) #() 0)
- ((#() #() #()) #f)
- ((#() #(_) #()) #t)))
-
-(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
- (make-base-operation #f
- (lambda _
- (and (ready? (port-ready-fd port)) values))
- (lambda (flag sched resume)
- (define (commit)
- (match (atomic-box-compare-and-swap! flag 'W 'S)
- ('W (resume values))
- ('C (commit))
- ('S #f)))
- (schedule-when-ready
- sched (port-ready-fd port) commit))))
-
-(define (wait-until-port-readable-operation port)
- "Make an operation that will succeed when PORT is readable."
- (unless (input-port? port)
- (error "refusing to wait forever for input on non-input port"))
- (make-wait-operation readable? schedule-task-when-fd-readable port
- port-read-wait-fd
- wait-until-port-readable-operation))
-
-(define (wait-until-port-writable-operation port)
- "Make an operation that will succeed when PORT is writable."
- (unless (output-port? port)
- (error "refusing to wait forever for output on non-output port"))
- (make-wait-operation writable? schedule-task-when-fd-writable port
- port-write-wait-fd
- wait-until-port-writable-operation))
-
-
-
-(define &port-timeout
- (make-exception-type '&port-timeout
- &external-error
- '(port)))
-
-(define make-port-timeout-error
- (record-constructor &port-timeout))
-
-(define port-timeout-error?
- (record-predicate &port-timeout))
-
-(define &port-read-timeout
- (make-exception-type '&port-read-timeout
- &port-timeout
- '()))
-
-(define make-port-read-timeout-error
- (record-constructor &port-read-timeout))
-
-(define port-read-timeout-error?
- (record-predicate &port-read-timeout))
-
-(define &port-write-timeout
- (make-exception-type '&port-write-timeout
- &port-timeout
- '()))
-
-(define make-port-write-timeout-error
- (record-constructor &port-write-timeout))
-
-(define port-write-timeout-error?
- (record-predicate &port-write-timeout))
-
-(define* (with-port-timeouts thunk
- #:key timeout
- (read-timeout timeout)
- (write-timeout timeout))
- (define (no-fibers-wait port mode timeout)
- (define poll-timeout-ms 200)
-
- ;; When the GC runs, it restarts the poll syscall, but the timeout
- ;; remains unchanged! When the timeout is longer than the time
- ;; between the syscall restarting, I think this renders the
- ;; timeout useless. Therefore, this code uses a short timeout, and
- ;; repeatedly calls poll while watching the clock to see if it has
- ;; timed out overall.
- (let ((timeout-internal
- (+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
- (let loop ((poll-value
- (port-poll port mode poll-timeout-ms)))
- (if (= poll-value 0)
- (if (> (get-internal-real-time)
- timeout-internal)
- (raise-exception
- (if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
- (loop (port-poll port mode poll-timeout-ms)))
- poll-value))))
-
- (parameterize
- ((current-read-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-readable-operation port)
- (wrap-operation
- (sleep-operation read-timeout)
- (lambda ()
- (raise-exception
- (make-port-read-timeout-error thunk port))))))
- (no-fibers-wait port "r" read-timeout))))
- (current-write-waiter
- (lambda (port)
- (if (current-scheduler)
- (perform-operation
- (choice-operation
- (wait-until-port-writable-operation port)
- (wrap-operation
- (sleep-operation write-timeout)
- (lambda ()
- (raise-exception
- (make-port-write-timeout-error thunk port))))))
- (no-fibers-wait port "w" write-timeout)))))
- (thunk)))
diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in
index 67515dc..812ec98 100644
--- a/scripts/nar-herder.in
+++ b/scripts/nar-herder.in
@@ -148,9 +148,16 @@
"none"
(string->number arg))
(alist-delete 'storage-limit result))))
+ (option '("storage-minimum-free-space") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'storage-minimum-free-space
+ (if (string=? arg "none")
+ "none"
+ (string->number arg))
+ (alist-delete 'storage-minimum-free-space result))))
- ;; stored-on=https://other-nar-herder-server
- ;; stored-on=https://other-nar-herder-server&stored-on=https://different-server
+ ;; (stored-on https://other-nar-herder-server)
+ ;; and=((stored-on https://other-nar-herder-server) (stored-on https://different-server))
(option '("storage-nar-removal-criteria") #t #f
(lambda (opt name arg result)
(alist-cons 'storage-nar-removal-criteria
@@ -391,37 +398,78 @@
(assq-ref opts 'arguments)))
(len (length narinfos))
(progress
- (progress-reporter/bar len
- (format #f "importing ~a narinfos"
- len)
- (current-error-port))))
+ (if (= 1 len)
+ progress-reporter/silent
+ (progress-reporter/bar len
+ (format #f "importing ~a narinfos"
+ len)
+ (current-error-port)))))
(call-with-progress-reporter progress
(lambda (report)
(database-call-with-transaction
database
(lambda (db)
- (let ((read-narinfos
- (map
- (lambda (narinfo-file)
- (let ((narinfo
- (call-with-input-file narinfo-file
- (lambda (port)
- ;; Set url to a dummy value as this doesn't
- ;; matter
- (read-narinfo port
- "https://narherderdummyvalue")))))
-
- (database-insert-narinfo
- database
- narinfo
- #:tags (or (assq-ref opts 'tags)
- '()))
-
- (report)
-
- narinfo))
- narinfos)))
+ (let* ((canonical-storage
+ (and=> (assq-ref opts 'storage)
+ canonicalize-path))
+ (read-narinfos
+ (map
+ (lambda (narinfo-file)
+ (let ((narinfo
+ (call-with-input-file narinfo-file
+ (lambda (port)
+ ;; Set url to a dummy value as this doesn't
+ ;; matter
+ (read-narinfo port
+ "https://narherderdummyvalue")))))
+
+ (define (check-size! file size)
+ (let ((actual-size (stat:size (stat file))))
+ (unless (= size actual-size)
+ (error
+ (simple-format
+ #f
+ "error importing ~A, ~A should be ~A bytes but is ~A"
+ narinfo-file
+ file
+ size
+ actual-size)))))
+
+ (database-insert-narinfo
+ database
+ narinfo
+ #:tags (or (assq-ref opts 'tags)
+ '()))
+
+ (when canonical-storage
+ (for-each
+ (lambda (uri size)
+ (let* ((nar-path
+ (uri-decode (uri-path uri)))
+ (source
+ (string-append
+ (dirname narinfo-file) "/" nar-path)))
+ (if (string=? canonical-storage
+ (dirname narinfo-file))
+ (check-size! source size)
+ (let ((dest
+ (string-append
+ canonical-storage "/" nar-path)))
+ (check-size! source size)
+ (simple-format (current-error-port)
+ "moving ~A to ~A\n"
+ source dest)
+ (rename-file source dest)
+ ;; Re-check file size
+ (check-size! dest size)))))
+ (narinfo-uris narinfo)
+ (narinfo-file-sizes narinfo)))
+
+ (report)
+
+ narinfo))
+ narinfos)))
(when (assq-ref opts 'ensure-references-exist)
(for-each
@@ -443,7 +491,9 @@
"missing reference to ~A\n"
reference))))
(narinfo-references narinfo))))
- read-narinfos))))))))))
+ read-narinfos)))))))
+ (when (= 1 len)
+ (simple-format (current-error-port) "imported narinfo\n")))))
(("remove" rest ...)
(let* ((opts (parse-options %base-options
%base-option-defaults
@@ -458,11 +508,14 @@
(port-log (make <port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
(format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str)))))
+ (strftime "%F %H:%M:%S" (localtime
+ (second args)))
+ (first args)
+ (third args))))))
(add-handler! lgr port-log)
(open-log! lgr)
@@ -507,8 +560,9 @@
;; that'll stop these files appearing in narinfos
(database-remove-cached-narinfo-file
database
- narinfo-id
- (symbol->string compression)))
+ (assq-ref narinfo-details 'id)
+ (symbol->string
+ (assq-ref cached-narinfo-details 'compression))))
cached-narinfo-files)
(database-remove-narinfo database store-path))
@@ -524,11 +578,14 @@
(port-log (make <port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
(format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str))))
+ (strftime "%F %H:%M:%S" (localtime
+ (second args)))
+ (first args)
+ (third args)))))
(metrics-registry (make-metrics-registry
#:namespace
"narherder")))
@@ -545,7 +602,8 @@
(loop (cdr levels)))))
(let* ((database (setup-database (assq-ref opts 'database)
- metrics-registry))
+ metrics-registry
+ #:readonly? #t))
(canonical-storage (and=> (assq-ref opts 'storage)
canonicalize-path)))
@@ -567,11 +625,14 @@
(port-log (make <port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
(format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str)))))
+ (strftime "%F %H:%M:%S" (localtime
+ (second args)))
+ (first args)
+ (third args))))))
(add-handler! lgr port-log)
(open-log! lgr)
@@ -602,4 +663,5 @@
(lambda (port)
(simple-format port "~A\n" (getpid))))))
- (run-nar-herder-service opts lgr))))
+ (with-fluids ((%file-port-name-canonicalization 'none))
+ (run-nar-herder-service opts lgr)))))