diff options
-rw-r--r-- | guix-dev.scm | 34 | ||||
-rw-r--r-- | nar-herder/cached-compression.scm | 8 | ||||
-rw-r--r-- | nar-herder/database.scm | 367 | ||||
-rw-r--r-- | nar-herder/mirror.scm | 42 | ||||
-rw-r--r-- | nar-herder/recent-changes.scm | 59 | ||||
-rw-r--r-- | nar-herder/server.scm | 158 | ||||
-rw-r--r-- | nar-herder/storage.scm | 239 | ||||
-rw-r--r-- | nar-herder/utils.scm | 631 | ||||
-rw-r--r-- | scripts/nar-herder.in | 150 |
9 files changed, 663 insertions, 1025 deletions
diff --git a/guix-dev.scm b/guix-dev.scm index 61e282c..b2e6da3 100644 --- a/guix-dev.scm +++ b/guix-dev.scm @@ -45,6 +45,39 @@ (gnu packages web) (srfi srfi-1)) +(define guile-knots + (let ((commit "dcb56ee2c5ac3e283cb46841766e7282f3c2c52e") + (revision "1")) + (package + (name "guile-knots") + (version (git-version "0" revision commit)) + (source (origin + (method git-fetch) + (uri (git-reference + (url "https://git.cbaines.net/git/guile/knots") + (commit commit))) + (sha256 + (base32 + "04z48572canx35hl0kfli3pf3g3m6184zvmnpyg1rbwla6g5z1fk")) + (file-name (string-append name "-" version "-checkout")))) + (build-system gnu-build-system) + (native-inputs + (list pkg-config + autoconf + automake + guile-3.0 + guile-lib + guile-fibers)) + (inputs + (list guile-3.0)) + (propagated-inputs + (list guile-fibers)) + (home-page "https://git.cbaines.net/guile/knots") + (synopsis "Patterns and functionality to use with Guile Fibers") + (description + "") + (license license:gpl3+)))) + (package (name "nar-herder") (version "0") @@ -54,6 +87,7 @@ `(("guix" ,guix) ("guile-json" ,guile-json-4) ("guile-fibers" ,guile-fibers-1.3) + ("guile-knots" ,guile-knots) ("guile-gcrypt" ,guile-gcrypt) ("guile-readline" ,guile-readline) ("guile-lzlib" ,guile-lzlib) diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm index 375fdaa..1ac6d96 100644 --- a/nar-herder/cached-compression.scm +++ b/nar-herder/cached-compression.scm @@ -31,6 +31,9 @@ #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) + #:use-module (knots timeout) + #:use-module (knots non-blocking) + #:use-module (knots worker-threads) #:use-module (web uri) #:use-module (web client) #:use-module (web response) @@ -411,7 +414,7 @@ (put-message reply #t)) (loop (alist-cons - cached-bytes-by-compression + compression updated-bytes (alist-delete compression cached-bytes-by-compression))))))))) @@ -662,8 +665,7 @@ (call-with-values (lambda () (let ((port - socket - (open-socket-for-uri* uri))) + (non-blocking-open-socket-for-uri uri))) (http-get uri #:port port #:decode-body? #f diff --git a/nar-herder/database.scm b/nar-herder/database.scm index 4fa145f..239a7e7 100644 --- a/nar-herder/database.scm +++ b/nar-herder/database.scm @@ -27,6 +27,7 @@ #:use-module (web uri) #:use-module (sqlite3) #:use-module (fibers) + #:use-module (knots worker-threads) #:use-module (prometheus) #:use-module (guix store) #:use-module (guix narinfo) @@ -77,12 +78,12 @@ database-insert-scheduled-cached-narinfo-removal)) (define-record-type <database> - (make-database database-file reader-thread-channel writer-thread-channel + (make-database database-file reader-thread-set writer-thread-set metrics-registry) database? (database-file database-file) - (reader-thread-channel database-reader-thread-channel) - (writer-thread-channel database-writer-thread-channel) + (reader-thread-set database-reader-thread-set) + (writer-thread-set database-writer-thread-set) (metrics-registry database-metrics-registry)) (define* (db-open database @@ -111,8 +112,6 @@ CREATE TABLE narinfos ( added_at TEXT ); -CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32)); - CREATE TABLE narinfo_files ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), size INTEGER NOT NULL, @@ -120,8 +119,6 @@ CREATE TABLE narinfo_files ( url TEXT NOT NULL ); -CREATE INDEX narinfo_files_narinfo_id ON narinfo_files (narinfo_id); - CREATE TABLE narinfo_references ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), store_path TEXT NOT NULL @@ -133,8 +130,6 @@ CREATE TABLE tags ( value TEXT NOT NULL ); -CREATE UNIQUE INDEX tags_index ON tags (key, value); - CREATE TABLE narinfo_tags ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), tag_id INTEGER NOT NULL REFERENCES tags (id) @@ -154,9 +149,6 @@ CREATE TABLE cached_narinfo_files ( compression TEXT ); -CREATE INDEX cached_narinfo_files_narinfo_id - ON cached_narinfo_files (narinfo_id); - CREATE TABLE scheduled_narinfo_removal ( narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id), removal_datetime TEXT NOT NULL @@ -216,10 +208,7 @@ CREATE TABLE cached_narinfo_files ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), size INTEGER NOT NULL, compression TEXT NOT NULL -); - -CREATE INDEX cached_narinfo_files_narinfo_id - ON cached_narinfo_files (narinfo_id);")) +);")) (unless (column-exists? db "narinfos" "added_at") (sqlite-exec @@ -246,6 +235,22 @@ CREATE TABLE scheduled_cached_narinfo_removal ( (sqlite-exec db + " +CREATE UNIQUE INDEX IF NOT EXISTS + narinfos_store_hash ON narinfos (substr(store_path, 12, 32));") + + (sqlite-exec + db + "CREATE UNIQUE INDEX IF NOT EXISTS + tags_index ON tags (key, value);") + + (sqlite-exec + db + "CREATE INDEX IF NOT EXISTS cached_narinfo_files_narinfo_id + ON cached_narinfo_files (narinfo_id);") + + (sqlite-exec + db "CREATE INDEX IF NOT EXISTS narinfo_tags_narinfo_id ON narinfo_tags (narinfo_id);") @@ -257,34 +262,41 @@ CREATE TABLE scheduled_cached_narinfo_removal ( (sqlite-exec db "CREATE INDEX IF NOT EXISTS narinfo_files_narinfo_id - ON narinfo_files (narinfo_id);")) + ON narinfo_files (narinfo_id);") + + (sqlite-exec + db + "CREATE INDEX IF NOT EXISTS narinfo_files_url + ON narinfo_files (url);")) (define* (setup-database database-file metrics-registry - #:key (reader-threads 1)) + #:key (reader-threads 1) + (readonly? #f)) (define mmap-size #f) - (let ((db (db-open database-file))) - (sqlite-exec db "PRAGMA journal_mode=WAL;") - (sqlite-exec db "PRAGMA optimize;") - (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") - - (update-schema db) - - ;; (let ((requested-mmap-bytes 2147418112) - ;; (statement - ;; (sqlite-prepare - ;; db - ;; (simple-format #f "PRAGMA mmap_size=~A;" - ;; 2147418112)))) - ;; (match (sqlite-step statement) - ;; (#(result-mmap-size) - ;; (sqlite-finalize statement) - ;; (set! mmap-size - ;; result-mmap-size)))) - - (sqlite-close db)) - - (let ((reader-thread-channel + (unless readonly? + (let ((db (db-open database-file))) + (sqlite-exec db "PRAGMA journal_mode=WAL;") + (sqlite-exec db "PRAGMA optimize;") + (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") + + (update-schema db) + + ;; (let ((requested-mmap-bytes 2147418112) + ;; (statement + ;; (sqlite-prepare + ;; db + ;; (simple-format #f "PRAGMA mmap_size=~A;" + ;; 2147418112)))) + ;; (match (sqlite-step statement) + ;; (#(result-mmap-size) + ;; (sqlite-finalize statement) + ;; (set! mmap-size + ;; result-mmap-size)))) + + (sqlite-close db))) + + (let ((reader-thread-set (make-worker-thread-set (lambda () (let ((db @@ -300,6 +312,7 @@ CREATE TABLE scheduled_cached_narinfo_removal ( (lambda (db) (sqlite-close db)) #:lifetime 50000 + #:expire-on-exception? #t #:name "db r" ;; Use a minimum of 2 and a maximum of 8 threads @@ -329,58 +342,61 @@ CREATE TABLE scheduled_cached_narinfo_removal ( proc) (current-error-port)))))) - (writer-thread-channel - (make-worker-thread-set - (lambda () - (let ((db - (db-open database-file))) - (sqlite-exec db "PRAGMA busy_timeout = 5000;") - (sqlite-exec db "PRAGMA foreign_keys = ON;") - (when mmap-size - (sqlite-exec - db - (simple-format #f "PRAGMA mmap_size=~A;" - (number->string mmap-size)))) - (list db))) - #:destructor - (lambda (db) - (db-optimize db - database-file) - - (sqlite-close db)) - #:lifetime 500 - #:name "db w" - - ;; SQLite doesn't support parallel writes - #:parallelism 1 - #:delay-logger (let ((delay-metric - (make-histogram-metric - metrics-registry - "database_write_delay_seconds"))) - (lambda (seconds-delayed proc) - (metric-observe delay-metric seconds-delayed) - (when (> seconds-delayed 1) - (display - (format - #f - "warning: database write (~a) delayed by ~1,2f seconds~%" - proc - seconds-delayed) - (current-error-port))))) - #:duration-logger - (lambda (duration proc) - (when (> duration 5) - (display - (format - #f - "warning: database write took ~1,2f seconds (~a)~%" - duration - proc) - (current-error-port))))))) + (writer-thread-set + (if readonly? + #f + (make-worker-thread-set + (lambda () + (let ((db + (db-open database-file))) + (sqlite-exec db "PRAGMA busy_timeout = 5000;") + (sqlite-exec db "PRAGMA foreign_keys = ON;") + (when mmap-size + (sqlite-exec + db + (simple-format #f "PRAGMA mmap_size=~A;" + (number->string mmap-size)))) + (list db))) + #:destructor + (lambda (db) + (db-optimize db + database-file) + + (sqlite-close db)) + #:lifetime 500 + #:expire-on-exception? #t + #:name "db w" + + ;; SQLite doesn't support parallel writes + #:parallelism 1 + #:delay-logger (let ((delay-metric + (make-histogram-metric + metrics-registry + "database_write_delay_seconds"))) + (lambda (seconds-delayed proc) + (metric-observe delay-metric seconds-delayed) + (when (> seconds-delayed 1) + (display + (format + #f + "warning: database write (~a) delayed by ~1,2f seconds~%" + proc + seconds-delayed) + (current-error-port))))) + #:duration-logger + (lambda (duration proc) + (when (> duration 5) + (display + (format + #f + "warning: database write took ~1,2f seconds (~a)~%" + duration + proc) + (current-error-port)))))))) (make-database database-file - reader-thread-channel - writer-thread-channel + reader-thread-set + writer-thread-set metrics-registry))) (define (update-database-metrics! database) @@ -430,7 +446,7 @@ PRAGMA optimize;"))) (retry-on-error (lambda () (call-with-worker-thread - (database-writer-thread-channel database) + (database-writer-thread-set database) (lambda (db) (db-optimize db @@ -472,42 +488,82 @@ PRAGMA optimize;"))) readonly? (immediate? (not readonly?))) (define (run-proc-within-transaction db) - (with-exception-handler - (lambda (exn) - (match (exception-args exn) - (('sqlite-exec 5 msg) - (simple-format (current-error-port) "warning: sqlite error: ~A\n" msg) - (run-proc-within-transaction db)) - (_ - (simple-format (current-error-port) - "exception starting transaction\n") - (raise-exception exn)))) - (lambda () - (sqlite-exec db (if immediate? - "BEGIN IMMEDIATE TRANSACTION;" - "BEGIN TRANSACTION;")) - (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "error: sqlite rolling back transaction\n") - (sqlite-exec db "ROLLBACK TRANSACTION;") - (raise-exception exn)) - (lambda () - (call-with-values + (define (attempt-begin) + (with-exception-handler + (lambda (exn) + (match (exception-args exn) + (('sqlite-exec 5 msg) + (simple-format + (current-error-port) + "warning: issue starting transaction (code: 5, proc: ~A): ~A\n" + proc msg) + #f) + (_ + (simple-format (current-error-port) + "exception starting transaction: ~A\n" exn) + (raise-exception exn)))) + (lambda () + (sqlite-exec db (if immediate? + "BEGIN IMMEDIATE TRANSACTION;" + "BEGIN TRANSACTION;")) + #t) + #:unwind? #t)) + + (define (attempt-commit) + (with-exception-handler + (lambda (exn) + (match (exception-args exn) + (('sqlite-exec 5 msg) + (simple-format + (current-error-port) + "warning: attempt commit (code: 5, proc: ~A): ~A\n" + proc msg) + #f) + (_ + (simple-format (current-error-port) + "exception committing transaction: ~A\n" exn) + (raise-exception exn)))) + (lambda () + (sqlite-exec db "COMMIT TRANSACTION;") + #t) + #:unwind? #t)) + + (if (attempt-begin) + (call-with-values + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "error: sqlite rolling back transaction (~A)\n" + exn) + (sqlite-exec db "ROLLBACK TRANSACTION;") + (raise-exception exn)) (lambda () (parameterize ((%current-transaction-proc proc)) (proc-with-duration-timing db))) - (lambda vals - (sqlite-exec db "COMMIT TRANSACTION;") - (apply values vals)))) - #:unwind? #t)) - #:unwind? #t)) + #:unwind? #t)) + (lambda vals + (let loop ((success? (attempt-commit))) + (if success? + (apply values vals) + (loop (attempt-commit)))))) + + ;; Database is busy, so retry + (run-proc-within-transaction db))) (define (proc-with-duration-timing db) (let ((start-time (get-internal-real-time))) (call-with-values (lambda () - (proc db)) + (with-throw-handler #t + (lambda () + (proc db)) + (lambda (key . args) + (simple-format + (current-error-port) + "exception in transaction: ~A: ~A\n" + key args) + (backtrace)))) (lambda vals (let ((duration-seconds (/ (- (get-internal-real-time) start-time) @@ -526,8 +582,8 @@ PRAGMA optimize;"))) (match (call-with-worker-thread ((if readonly? - database-reader-thread-channel - database-writer-thread-channel) + database-reader-thread-set + database-writer-thread-set) database) (lambda (db) (if (%current-transaction-proc) @@ -537,12 +593,25 @@ PRAGMA optimize;"))) (apply values vals)))) (define (dump-database database name) + (define (strip-db name) + (let ((db (db-open name))) + (let ((tables-to-clear + '("cached_narinfo_files"))) + (for-each + (lambda (table) + (sqlite-exec db (simple-format #f "DELETE FROM ~A;" table))) + tables-to-clear)) + + (sqlite-close db))) + (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (sqlite-exec db - (string-append "VACUUM INTO '" name "';"))))) + (string-append "VACUUM INTO '" name "';")))) + + (strip-db name)) (define (last-insert-rowid db) (let ((statement @@ -890,7 +959,7 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id" "select_narinfo" (lambda () (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -922,7 +991,7 @@ WHERE id = :id" "select_narinfo_by_hash" (lambda () (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -955,7 +1024,7 @@ WHERE substr(store_path, 12, 32) = :hash" "select_narinfo_contents_by_hash" (lambda () (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -977,7 +1046,7 @@ SELECT id, contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" (define (database-count-recent-changes database) (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -994,7 +1063,7 @@ SELECT COUNT(*) FROM recent_changes" (define* (database-select-recent-changes database after-date #:key (limit 8192)) (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1024,7 +1093,7 @@ LIMIT :limit" (define (database-select-latest-recent-change-datetime database) (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1042,7 +1111,7 @@ SELECT datetime FROM recent_changes ORDER BY datetime DESC LIMIT 1" (define (database-get-recent-changes-id-for-deletion database limit) (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1064,7 +1133,7 @@ SELECT id FROM recent_changes ORDER BY datetime DESC LIMIT 1 OFFSET :offset" (define (database-delete-recent-changes-with-id-below database id) (call-with-worker-thread - (database-writer-thread-channel database) + (database-writer-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1077,14 +1146,14 @@ DELETE FROM recent_changes WHERE id < :id" statement #:id id) - (sqlite-step statement) + (sqlite-fold (const #f) #f statement) (sqlite-reset statement) (changes db))))) (define (database-select-narinfo-for-file database narinfo-file-url) (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1119,7 +1188,7 @@ WHERE narinfo_files.url = :url" "select_narinfo_files" (lambda () (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1154,7 +1223,7 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash" "select_narinfo_files_by_narinfo_id" (lambda () (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1185,7 +1254,7 @@ WHERE narinfos.id = :narinfo_id" (define (database-fold-all-narinfo-files database proc init) (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1219,7 +1288,7 @@ FROM narinfo_files" (define (database-count-narinfo-files database) (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1240,7 +1309,7 @@ SELECT COUNT(*) FROM narinfo_files" size compression) (call-with-worker-thread - (database-writer-thread-channel database) + (database-writer-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1271,7 +1340,7 @@ INSERT INTO cached_narinfo_files ( "select_cached_narinfo_file_by_hash" (lambda () (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1308,7 +1377,7 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash "select_cached_narinfo_file_by_narinfo_id" (lambda () (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1350,7 +1419,7 @@ WHERE narinfo_id = :narinfo_id" "select_cached_narinfo_file_by_narinfo_id_and_compression" (lambda () (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1387,7 +1456,7 @@ WHERE narinfo_id = :narinfo_id proc init) (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1416,7 +1485,7 @@ INNER JOIN narinfos (define (database-remove-cached-narinfo-file database narinfo-id compression) (call-with-worker-thread - (database-writer-thread-channel database) + (database-writer-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1441,7 +1510,7 @@ WHERE narinfo_id = :narinfo_id "select_scheduled_narinfo_removal" (lambda () (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1473,7 +1542,7 @@ WHERE narinfo_id = :narinfo_id" "select_scheduled_narinfo_removal" (lambda () (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1501,7 +1570,7 @@ WHERE cached_narinfo_file_id = :cached_narinfo_file_id" (define (database-delete-scheduled-cached-narinfo-removal database cached-narinfo-file-id) (call-with-worker-thread - (database-writer-thread-channel database) + (database-writer-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1522,7 +1591,7 @@ RETURNING 1" (define (database-select-oldest-scheduled-cached-narinfo-removal database) (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1562,7 +1631,7 @@ LIMIT 1" (define (database-count-scheduled-cached-narinfo-removal database) (call-with-worker-thread - (database-reader-thread-channel database) + (database-reader-thread-set database) (lambda (db) (let ((statement (sqlite-prepare @@ -1582,7 +1651,7 @@ SELECT COUNT(*) FROM scheduled_cached_narinfo_removal" cached-narinfo-file-id removal-datetime) (call-with-worker-thread - (database-writer-thread-channel database) + (database-writer-thread-set database) (lambda (db) (let ((statement (sqlite-prepare diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm index a784165..67d4c00 100644 --- a/nar-herder/mirror.scm +++ b/nar-herder/mirror.scm @@ -32,6 +32,8 @@ #:use-module (json) #:use-module (fibers) #:use-module (fibers channels) + #:use-module (knots timeout) + #:use-module (knots non-blocking) #:use-module (guix narinfo) #:use-module ((guix store) #:select (store-path-hash-part)) #:use-module (nar-herder utils) @@ -41,7 +43,11 @@ (define (start-fetch-changes-fiber database metrics-registry storage-root mirror + addition-channel cached-compression-management-channel) + (define recent-changes-count-metric + (metrics-registry-fetch-metric metrics-registry "recent_changes_count")) + (define (request-recent-changes) (define latest-recent-change (database-select-latest-recent-change-datetime database)) @@ -73,9 +79,7 @@ latest-recent-change) (with-port-timeouts (lambda () - (let ((port - socket - (open-socket-for-uri* uri))) + (let ((port (non-blocking-open-socket-for-uri uri))) (http-get uri #:port port #:streaming? #t))) @@ -105,6 +109,8 @@ (unless (member (strip-change-datetime change-details) processed-recent-changes) (let ((change (assq-ref change-details 'change))) + (metric-increment recent-changes-count-metric) + (cond ((string=? change "addition") (let ((narinfo @@ -122,23 +128,18 @@ (assq-ref change-details 'datetime)) - (and=> (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (lambda (metric) - ;; Just update this metric if it - ;; exists, since if it does, it - ;; should be set to a value - (let ((new-files-count - (length (narinfo-uris narinfo)))) - (metric-increment - metric - #:by new-files-count - ;; TODO This should be - ;; checked, rather than - ;; assumed to be false - #:label-values '((stored . "false")))))))) + (when addition-channel + (for-each + (lambda (uri) + (spawn-fiber + (lambda () + (put-message addition-channel + `(addition ,(uri-path uri)))))) + (narinfo-uris narinfo))))) + ((string=? change "removal") (let ((store-path (assq-ref change-details 'data))) + ;; TODO Use the nar removal fiber (log-msg 'INFO "processing removal change for " store-path " (" (assq-ref change-details 'datetime) ")") @@ -191,6 +192,11 @@ (spawn-fiber (lambda () + (let ((recent-changes-count + (database-count-recent-changes database))) + (metric-set recent-changes-count-metric recent-changes-count) + (log-msg 'DEBUG recent-changes-count " recent changes in the database")) + (while #t (with-exception-handler (lambda (exn) diff --git a/nar-herder/recent-changes.scm b/nar-herder/recent-changes.scm index ccfff93..62bd604 100644 --- a/nar-herder/recent-changes.scm +++ b/nar-herder/recent-changes.scm @@ -137,35 +137,38 @@ (log-msg 'ERROR "exception in recent change listener " exn) #f) (lambda () - (let* ((recent-changes - (database-select-recent-changes database after)) - (unprocessed-recent-changes - (remove + (with-throw-handler #t + (lambda () + (let* ((recent-changes + (database-select-recent-changes database after)) + (unprocessed-recent-changes + (remove + (lambda (change-details) + (member change-details last-processed-recent-changes)) + recent-changes))) + + (unless (null? unprocessed-recent-changes) + (log-msg 'INFO "processing " (length unprocessed-recent-changes) + " recent changes") + + (for-each (lambda (change-details) - (member change-details last-processed-recent-changes)) - recent-changes))) - - (unless (null? unprocessed-recent-changes) - (log-msg 'INFO "processing " (length unprocessed-recent-changes) - " recent changes") - - (metric-increment recent-changes-count-metric - #:by (length unprocessed-recent-changes)) - - (for-each - (lambda (change-details) - (let ((change (assq-ref change-details 'change))) - (cond - ((string=? change "addition") - (process-addition-change change-details)) - ((string=? change "removal") - (process-removal-change change-details)) - (else #f)))) - unprocessed-recent-changes)) - - ;; Use the unprocessed recent changes here to carry - ;; forward all processed changes to the next pass - unprocessed-recent-changes)) + (let ((change (assq-ref change-details 'change))) + (cond + ((string=? change "addition") + (process-addition-change change-details)) + ((string=? change "removal") + (process-removal-change change-details)) + (else #f)))) + unprocessed-recent-changes) + + (metric-increment recent-changes-count-metric + #:by (length unprocessed-recent-changes))) + ;; Use the unprocessed recent changes here to carry + ;; forward all processed changes to the next pass + unprocessed-recent-changes)) + (lambda _ + (backtrace)))) #:unwind? #t) (#f (loop after '())) (recent-changes diff --git a/nar-herder/server.scm b/nar-herder/server.scm index fecf166..b904675 100644 --- a/nar-herder/server.scm +++ b/nar-herder/server.scm @@ -38,6 +38,10 @@ #:use-module (fibers scheduler) #:use-module (fibers conditions) #:use-module (fibers operations) + #:use-module (knots) + #:use-module (knots timeout) + #:use-module (knots web-server) + #:use-module (knots non-blocking) #:use-module (prometheus) #:use-module (json) #:use-module ((system foreign) @@ -217,6 +221,10 @@ (response_code . ,response-code) ,@labels))) + (define loop-detections-metric + (make-counter-metric metrics-registry + "loop_detections_total")) + (define %compression-strings (map symbol->string %compression-options)) @@ -226,7 +234,12 @@ 'DEBUG (request-method request) " " - (uri-path (request-uri request))) + (uri-path (request-uri request)) + (let ((via (request-via request))) + (if (null? via) + "" + (string-append + " (Via: " (string-join via ", ") ")")))) (match (cons (request-method request) (split-and-decode-uri-path @@ -324,8 +337,16 @@ (string-take narinfo 32))))) (values (build-response #:code 404) "404")))) - (((or 'HEAD 'GET) "nar" compression filename) - (let* ((hash (and (>= (string-length filename) 32) + ;; TODO The uris in narinfo files can be anything I believe, + ;; which doesn't match up with this code + (((or 'HEAD 'GET) "nar" rest ...) + (let* ((compression + (if (= (length rest) 1) + "none" + (first rest))) + (filename + (last rest)) + (hash (and (>= (string-length filename) 32) (string-take filename 32))) (narinfo (and hash @@ -369,6 +390,7 @@ (assq-ref narinfo 'id))) (when loop? + (metric-increment loop-detections-metric) (log-msg logger 'WARN (request-method request) " " @@ -395,11 +417,11 @@ (request-via request))) (values (build-response #:code 200 - #:headers `((X-Accel-Redirect - . ,(string-append - "/internal/nar/" - compression "/" - (uri-encode filename))))) + #:headers + `((X-Accel-Redirect + . ,(string-append + "/internal" + (assq-ref narinfo-file-for-compression 'url))))) #f))) (let ((cached-narinfo-file (and narinfo ; must be a known hash @@ -437,11 +459,14 @@ (if cached-narinfo-file (values (build-response #:code 200 - #:headers `((X-Accel-Redirect - . ,(string-append - "/internal/cached-nar/" - compression "/" - (uri-encode filename))))) + #:headers + `((X-Accel-Redirect + . ,(string-append + "/internal/cached-nar/" + ;; This must match up with + ;; add-cached-compressions-to-narinfo + compression "/" + (uri-encode filename))))) #f) (values (build-response #:code 404) "404")))))) @@ -571,8 +596,7 @@ (simple-format (current-error-port) "starting downloading the database\n") (let ((port - socket - (open-socket-for-uri* database-uri))) + (non-blocking-open-socket-for-uri database-uri))) (http-get database-uri #:port port #:streaming? #t))) @@ -735,7 +759,16 @@ compression) 'directory))) (utime (string-append directory "/" filename)))) - maintenance-scheduler))))) + maintenance-scheduler)))) + + (nar-removal-criteria + (filter-map + (match-lambda + ((key . val) + (if (eq? key 'storage-nar-removal-criteria) + val + #f))) + opts))) (if (string=? (assq-ref opts 'database-dump) "disabled") @@ -770,43 +803,32 @@ (assq-ref opts 'recent-changes-limit)) (let ((mirror-channel - (and=> - (assq-ref opts 'mirror) - (lambda (mirror) - (start-fetch-changes-fiber - database - metrics-registry - canonical-storage - mirror - cached-compression-management-channel) - - (if (assq-ref opts 'storage) - (start-mirroring-fiber database - mirror - (assq-ref opts 'storage-limit) - canonical-storage - metrics-registry) - #f)))) + (and (assq-ref opts 'mirror) + (assq-ref opts 'storage) + (start-mirroring-fiber database + (assq-ref opts 'mirror) + (assq-ref opts 'storage-limit) + (assq-ref opts 'minimum-free-space) + canonical-storage + metrics-registry))) (removal-channel - (let ((nar-removal-criteria - (filter-map - (match-lambda - ((key . val) - (if (eq? key 'storage-nar-removal-criteria) - val - #f))) - opts))) - (if (and (assq-ref opts 'storage) - (number? (assq-ref opts 'storage-limit)) - (not (null? nar-removal-criteria))) - (start-nar-removal-fiber database - canonical-storage - (assq-ref opts 'storage-limit) - metrics-registry - nar-removal-criteria) - #f))) + (start-nar-removal-fiber + database + canonical-storage + (assq-ref opts 'storage-limit) + metrics-registry + nar-removal-criteria)) (addition-channel (make-channel))) + (when (assq-ref opts 'mirror) + (start-fetch-changes-fiber + database + metrics-registry + canonical-storage ; might be #f, but that's fine here + (assq-ref opts 'mirror) + addition-channel + cached-compression-management-channel)) + (spawn-fiber (lambda () (while #t @@ -817,10 +839,25 @@ (lambda () (match (get-message addition-channel) (('addition file) + (apply update-nar-files-metric + metrics-registry + '() + (if (and canonical-storage + (file-exists? + (string-append canonical-storage + (uri-decode file)))) + '(#:stored-addition-count 1) + '(#:not-stored-addition-count 1))) + (when mirror-channel - (put-message mirror-channel - `(fetch ,file))) - (when removal-channel + (spawn-fiber + (lambda () + (put-message mirror-channel + `(fetch ,file))))) + + (when (and (assq-ref opts 'storage) + (number? (assq-ref opts 'storage-limit)) + (not (null? nar-removal-criteria))) (spawn-fiber (lambda () (sleep 60) @@ -837,11 +874,12 @@ file))))))) #:unwind? #t)))) - (start-recent-change-listener-fiber - database - metrics-registry - addition-channel - removal-channel)) + (unless (assq-ref opts 'mirror) + (start-recent-change-listener-fiber + database + metrics-registry + addition-channel + removal-channel))) (unless (null? enabled-cached-compressions) (let ((cached-compression-removal-fiber-wakeup-channel @@ -885,10 +923,10 @@ (iota (length schedulers)) schedulers)) - (log-msg 'INFO "starting server, listening on " + (log-msg 'INFO "starting server (" (getpid) "), listening on " (assq-ref opts 'host) ":" (assq-ref opts 'port)) - (run-server/patched + (run-knots-web-server (make-request-handler database canonical-storage diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm index fc49b2d..0e7186d 100644 --- a/nar-herder/storage.scm +++ b/nar-herder/storage.scm @@ -28,11 +28,14 @@ #:use-module (web response) #:use-module (fibers) #:use-module (fibers channels) + #:use-module (knots timeout) + #:use-module (knots non-blocking) #:use-module (logging logger) #:use-module (logging port-log) #:use-module (prometheus) #:use-module (json) #:use-module ((guix build utils) #:select (dump-port mkdir-p)) + #:use-module ((guix build syscalls) #:select (free-disk-space)) #:use-module ((guix store) #:select (store-path-hash-part)) #:use-module (guix progress) #:use-module (nar-herder utils) @@ -41,6 +44,7 @@ remove-nar-files-by-hash initialise-storage-metrics + update-nar-files-metric check-storage removal-channel-remove-nar-from-storage @@ -76,7 +80,8 @@ (file-exists? filename))) (when exists? (remove-nar-from-storage storage-root - (assq-ref file 'url))) + (uri-decode + (assq-ref file 'url)))) (and=> (metrics-registry-fetch-metric metrics-registry "nar_files_total") @@ -91,6 +96,8 @@ (define (get-storage-size storage-root) (define enter? (const #t)) (define (leaf name stat result) + ;; Allow other fibers to run + (sleep 0) (+ result (or (and=> (stat:blocks stat) (lambda (blocks) @@ -166,8 +173,7 @@ (unrecognised-files . ,(hash-map->list (lambda (key _) key) files-hash))))) -;; TODO Maybe remove the metrics-registry argument? -(define* (fold-nar-files database storage-root metrics-registry +(define* (fold-nar-files database storage-root proc init #:key stored?) (define stored-files-count 0) @@ -181,8 +187,10 @@ (uri-decode (assq-ref nar 'url))) (nar-stored? - (file-exists? - (string-append storage-root url)))) + (if storage-root + (file-exists? + (string-append storage-root url)) + #f))) (if nar-stored? (set! stored-files-count (1+ stored-files-count)) @@ -202,7 +210,9 @@ (define* (update-nar-files-metric metrics-registry nar-file-counts - #:key fetched-count removed-count) + #:key fetched-count removed-count + not-stored-addition-count + stored-addition-count) ;; Avoid incrementing or decrementing the metric if it hasn't been ;; set yet @@ -245,7 +255,17 @@ #:label-values '((stored . "true"))) (metric-increment nar-files-metric #:by removed-count - #:label-values '((stored . "false"))))))) + #:label-values '((stored . "false")))) + + (when not-stored-addition-count + (metric-increment nar-files-metric + #:by not-stored-addition-count + #:label-values '((stored . "false")))) + + (when stored-addition-count + (metric-increment nar-files-metric + #:by stored-addition-count + #:label-values '((stored . "true"))))))) (define (initialise-storage-metrics database storage-root metrics-registry) ;; Use a database transaction to block changes @@ -258,7 +278,6 @@ (fold-nar-files database storage-root - metrics-registry (const #f) #f #:stored? 'both))) @@ -271,32 +290,36 @@ (define files-count (database-count-narinfo-files database)) - (call-with-progress-reporter - (progress-reporter/bar files-count - (simple-format #f "checking ~A files" files-count) - (current-error-port)) - (lambda (report) - (fold-nar-files - database - storage-root - metrics-registry - (lambda (file _) - (let* ((full-filename - (string-append storage-root - (uri-decode (assq-ref file 'url)))) - (file-size - (stat:size (stat full-filename))) - (database-size - (assq-ref file 'size))) - (report) - (unless (= file-size database-size) - (newline) - (log-msg 'WARN "file " full-filename - " has inconsistent size (database: " - database-size ", file: " file-size ")")) - #f)) - #f - #:stored? 'both)))) + (let ((not-stored-count + (call-with-progress-reporter + (progress-reporter/bar files-count + (simple-format #f "checking ~A files" files-count) + (current-error-port)) + (lambda (report) + (fold-nar-files + database + storage-root + (lambda (file not-stored-count) + (let* ((full-filename + (string-append storage-root + (uri-decode (assq-ref file 'url))))) + (if (file-exists? full-filename) + (let ((file-size + (stat:size (stat full-filename))) + (database-size + (assq-ref file 'size))) + (report) + (unless (= file-size database-size) + (newline) + (log-msg 'WARN "file " full-filename + " has inconsistent size (database: " + database-size ", file: " file-size ")")) + not-stored-count) + (+ 1 not-stored-count)))) + 0 + #:stored? 'both))))) + + (log-msg 'INFO "finished checking, " not-stored-count " files not stored"))) (define (at-most max-length lst) "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise @@ -335,7 +358,7 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." ;; Open a new connection to URI and evict old entries from ;; CACHE, if any. (let ((socket - (open-socket-for-uri* + (non-blocking-open-socket-for-uri uri #:verify-certificate? verify-certificate?)) (new-cache evicted @@ -378,8 +401,10 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." metrics-registry nar-removal-criteria) (define storage-size-metric - (make-gauge-metric metrics-registry - "storage_size_bytes")) + (or (metrics-registry-fetch-metric metrics-registry + "storage_size_bytes") + (make-gauge-metric metrics-registry + "storage_size_bytes"))) (define removal-channel (make-channel)) @@ -404,31 +429,33 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." ".narinfo/info")))) (with-port-timeouts (lambda () - (call-with-values - (lambda () - (retry-on-error - (lambda () - (call-with-cached-connection uri - (lambda (port) + (retry-on-error + (lambda () + (call-with-cached-connection + uri + (lambda (port) + (let ((response + body (http-get uri #:port port #:decode-body? #f #:keep-alive? #t - #:streaming? #t)))) - #:times 3 - #:delay 5)) - (lambda (response body) - (and (= (response-code response) - 200) - - (let ((json-body (json->scm body))) - (eq? (assoc-ref json-body "stored") - #t)))))) + #:streaming? #t))) + (and (= (response-code response) + 200) + (let ((json-body (json->scm body))) + (eq? (assoc-ref json-body "stored") + #t))))))) + #:times 3 + #:delay 5)) #:timeout 30))))) (define (nar-can-be-removed? nar) + (log-msg 'INFO "checking if " (assq-ref nar 'url) " can be removed") (any (lambda (criteria) - (check-removal-criteria nar criteria)) + (let ((result (check-removal-criteria nar criteria))) + (log-msg 'INFO "removal criteria (" criteria "): " result) + result)) nar-removal-criteria)) (define (run-removal-pass) @@ -447,7 +474,6 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." (fold-nar-files database storage-root - metrics-registry (lambda (nar result) (match result ((storage-size . removed-count) @@ -477,9 +503,6 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." (log-msg 'INFO "finished looking for nars to remove, removed " removed-count " files")))))) - (when (null? nar-removal-criteria) - (error "must be some removal criteria")) - (spawn-fiber (lambda () (while #t @@ -494,6 +517,9 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." (lambda () (with-throw-handler #t (lambda () + (when (null? nar-removal-criteria) + (error "must be some removal criteria")) + (cond ((not (file-exists? (string-append storage-root @@ -523,35 +549,49 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." (log-msg 'ERROR "failed to remove " file ": " exn)) (lambda () ;; TODO: Do more checking at this point - (remove-nar-from-storage - storage-root - (uri-decode file)) + (when storage-root + (remove-nar-from-storage + storage-root + (uri-decode file))) (update-nar-files-metric metrics-registry '() #:removed-count 1)) #:unwind? #t)))))) - (spawn-fiber - (lambda () - (while #t - (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "nar removal pass failed " exn)) - run-removal-pass - #:unwind? #t) - (sleep (* 60 60 24))))) + (when (and storage-root + (not (null? nar-removal-criteria))) + (spawn-fiber + (lambda () + (while #t + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "nar removal pass failed " exn)) + run-removal-pass + #:unwind? #t) + (sleep (* 60 60 24)))))) removal-channel) -(define (start-mirroring-fiber database mirror storage-limit storage-root - metrics-registry) +(define (start-mirroring-fiber database mirror storage-limit minimum-free-space + storage-root metrics-registry) - (define no-storage-limit? - (not (integer? storage-limit))) + (define storage-limit? + (integer? storage-limit)) + + (define minimum-free-space? + (integer? minimum-free-space)) (define storage-size-metric - (make-gauge-metric metrics-registry - "storage_size_bytes")) + (or (metrics-registry-fetch-metric metrics-registry + "storage_size_bytes") + (make-gauge-metric metrics-registry + "storage_size_bytes"))) + + (define storage-free-space-metric + (or (metrics-registry-fetch-metric metrics-registry + "storage_free_space_bytes") + (make-gauge-metric metrics-registry + "storage_free_space_bytes"))) (define (fetch-file file) (let* ((string-url @@ -582,8 +622,7 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." (call-with-values (lambda () (let ((port - socket - (open-socket-for-uri* uri))) + (non-blocking-open-socket-for-uri uri))) (http-get uri #:port port #:decode-body? #f @@ -608,24 +647,32 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." '() #:fetched-count 1))) - (define (download-nars initial-storage-size) - ;; If there's free space, then consider downloading missing nars - (if (< initial-storage-size storage-limit) + (define (download-nars initial-storage-size initial-free-space) + (define effective-storage-limit + (cond + ((and storage-limit? minimum-free-space?) + (min storage-limit + (+ initial-storage-size + (- initial-free-space minimum-free-space)))) + (storage-limit? storage-limit) + (minimum-free-space? + (+ initial-storage-size + (- initial-free-space minimum-free-space))))) + + (if (< initial-storage-size effective-storage-limit) (let ((result nar-file-counts (fold-nar-files database storage-root - metrics-registry (lambda (file result) (log-msg 'DEBUG "considering " (assq-ref file 'url)) (match result ((storage-size . fetched-count) (let ((file-bytes (assq-ref file 'size))) - (if (or no-storage-limit? - (< (+ storage-size file-bytes) - storage-limit)) + (if (< (+ storage-size file-bytes) + effective-storage-limit) (let ((success? (with-exception-handler (lambda (exn) @@ -694,7 +741,6 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." (fold-nar-files database storage-root - metrics-registry (lambda (nar _) (put-message channel (assq-ref nar 'url)) @@ -717,13 +763,18 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." (define (run-mirror-pass) (log-msg 'DEBUG "running mirror pass") (let ((initial-storage-size (with-time-logging "getting storage size" - (get-storage-size storage-root)))) + (get-storage-size storage-root))) + (free-space + (free-disk-space storage-root))) (metric-set storage-size-metric initial-storage-size) + (metric-set storage-free-space-metric + free-space) (let ((fetched-count - (if no-storage-limit? - (fast-download-nars) - (download-nars initial-storage-size)))) + (if (or storage-limit? minimum-free-space?) + (download-nars initial-storage-size + free-space) + (fast-download-nars)))) (log-msg 'DEBUG "finished mirror pass (fetched " fetched-count " nars)")))) (let ((channel (make-channel))) @@ -742,10 +793,10 @@ When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." (lambda (exn) (log-msg 'ERROR "failed to mirror " file ": " exn)) (lambda () - (fetch-file file) - (update-nar-files-metric metrics-registry - '() - #:fetched-count 1)) + (unless (file-exists? + (string-append storage-root + (uri-decode file))) + (fetch-file file))) #:unwind? #t)))))) (spawn-fiber diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index 4755d33..4155ea0 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -31,10 +31,6 @@ #:use-module (ice-9 binary-ports) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) - #:use-module (ice-9 suspendable-ports) - #:use-module ((ice-9 ports internal) #:select (port-poll - port-read-wait-fd - port-write-wait-fd)) #:use-module (web uri) #:use-module (web http) #:use-module (web client) @@ -46,30 +42,12 @@ #:use-module (fibers scheduler) #:use-module (fibers conditions) #:use-module (fibers operations) - #:use-module ((guix build syscalls) - #:select (set-thread-name)) - #:export (make-worker-thread-set - call-with-worker-thread - - call-with-time-logging + #:export (call-with-time-logging with-time-logging retry-on-error - create-work-queue - - check-locale! - - open-socket-for-uri* - - call-with-sigint - run-server/patched - - timeout-error? - - port-read-timeout-error? - port-write-timeout-error? - with-port-timeouts)) + check-locale!)) (define* (retry-on-error f #:key times delay ignore error-hook) (let loop ((attempt 1)) @@ -154,218 +132,6 @@ "Log under NAME the time taken to evaluate EXP." (call-with-time-logging name (lambda () exp ...))) -(define* (create-work-queue thread-count-parameter proc - #:key thread-start-delay - (thread-stop-delay - (make-time time-duration 0 0)) - (name "unnamed") - priority<?) - (let ((queue (make-q)) - (queue-mutex (make-mutex)) - (job-available (make-condition-variable)) - (running-job-args (make-hash-table))) - - (define get-thread-count - (cond - ((number? thread-count-parameter) - (const thread-count-parameter)) - ((eq? thread-count-parameter #f) - ;; Run one thread per job - (lambda () - (+ (q-length queue) - (hash-count (lambda (index val) - (list? val)) - running-job-args)))) - (else - thread-count-parameter))) - - (define process-job - (if priority<? - (lambda* (args #:key priority) - (with-mutex queue-mutex - (enq! queue (cons priority args)) - (set-car! - queue - (stable-sort! (car queue) - (lambda (a b) - (priority<? - (car a) - (car b))))) - (sync-q! queue) - (start-new-threads-if-necessary (get-thread-count)) - (signal-condition-variable job-available))) - (lambda args - (with-mutex queue-mutex - (enq! queue args) - (start-new-threads-if-necessary (get-thread-count)) - (signal-condition-variable job-available))))) - - (define (count-threads) - (with-mutex queue-mutex - (hash-count (const #t) running-job-args))) - - (define (count-jobs) - (with-mutex queue-mutex - (+ (q-length queue) - (hash-count (lambda (index val) - (list? val)) - running-job-args)))) - - (define (list-jobs) - (with-mutex queue-mutex - (append (if priority<? - (map cdr (car queue)) - (list-copy (car queue))) - (hash-fold (lambda (key val result) - (if val - (cons val result) - result)) - '() - running-job-args)))) - - (define (thread-process-job job-args) - (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "~A work queue, job raised exception ~A: ~A\n" - name job-args exn)) - (lambda () - (with-throw-handler #t - (lambda () - (apply proc job-args)) - (lambda (key . args) - (simple-format - (current-error-port) - "~A work queue, exception when handling job: ~A ~A\n" - name key args) - (backtrace)))) - #:unwind? #t)) - - (define (start-thread thread-index) - (define (too-many-threads?) - (let ((running-jobs-count - (hash-count (lambda (index val) - (list? val)) - running-job-args)) - (desired-thread-count (get-thread-count))) - - (>= running-jobs-count - desired-thread-count))) - - (define (thread-idle-for-too-long? last-job-finished-at) - (time>=? - (time-difference (current-time time-monotonic) - last-job-finished-at) - thread-stop-delay)) - - (define (stop-thread) - (hash-remove! running-job-args - thread-index) - (unlock-mutex queue-mutex)) - - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append name " q t " - (number->string thread-index)))) - (const #t)) - - (let loop ((last-job-finished-at (current-time time-monotonic))) - (lock-mutex queue-mutex) - - (if (too-many-threads?) - (stop-thread) - (let ((job-args - (if (q-empty? queue) - ;; #f from wait-condition-variable indicates a timeout - (if (wait-condition-variable - job-available - queue-mutex - (+ 9 (time-second (current-time)))) - ;; Another thread could have taken - ;; the job in the mean time - (if (q-empty? queue) - #f - (if priority<? - (cdr (deq! queue)) - (deq! queue))) - #f) - (if priority<? - (cdr (deq! queue)) - (deq! queue))))) - - (if job-args - (begin - (hash-set! running-job-args - thread-index - job-args) - - (unlock-mutex queue-mutex) - (thread-process-job job-args) - - (with-mutex queue-mutex - (hash-set! running-job-args - thread-index - #f)) - - (loop (current-time time-monotonic))) - (if (thread-idle-for-too-long? last-job-finished-at) - (stop-thread) - (begin - (unlock-mutex queue-mutex) - - (loop last-job-finished-at)))))))))) - - - (define start-new-threads-if-necessary - (let ((previous-thread-started-at (make-time time-monotonic 0 0))) - (lambda (desired-count) - (let* ((thread-count - (hash-count (const #t) running-job-args)) - (threads-to-start - (- desired-count thread-count))) - (when (> threads-to-start 0) - (for-each - (lambda (thread-index) - (when (eq? (hash-ref running-job-args - thread-index - 'slot-free) - 'slot-free) - (let* ((now (current-time time-monotonic)) - (elapsed (time-difference now - previous-thread-started-at))) - (when (or (eq? #f thread-start-delay) - (time>=? elapsed thread-start-delay)) - (set! previous-thread-started-at now) - (hash-set! running-job-args - thread-index - #f) - (start-thread thread-index))))) - (iota desired-count))))))) - - (if (procedure? thread-count-parameter) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append name " q t"))) - (const #t)) - - (while #t - (sleep 15) - (with-mutex queue-mutex - (let ((idle-threads (hash-count (lambda (index val) - (eq? #f val)) - running-job-args))) - (when (= 0 idle-threads) - (start-new-threads-if-necessary (get-thread-count)))))))) - (start-new-threads-if-necessary (get-thread-count))) - - (values process-job count-jobs count-threads list-jobs))) - (define (check-locale!) (with-exception-handler (lambda (exn) @@ -393,396 +159,3 @@ falling back to en_US.utf8\n" (lambda _ (setlocale LC_ALL "")) #:unwind? #t)) - -(define-record-type <worker-thread-set> - (worker-thread-set channel arguments-parameter) - worker-thread-set? - (channel worker-thread-set-channel) - (arguments-parameter worker-thread-set-arguments-parameter)) - -(define* (make-worker-thread-set initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - destructor - lifetime - (log-exception? (const #t)) - (expire-on-exception? #f) - (name "unnamed")) - (define param - (make-parameter #f)) - - (define (initializer/safe) - (let ((args - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running initializer in worker thread (~A): ~A:\n ~A\n" - name - initializer - exn) - #f) - (lambda () - (with-throw-handler #t - initializer - (lambda args - (backtrace)))) - #:unwind? #t))) - - (if args - args - ;; never give up, just keep retrying - (begin - (sleep 5) - (initializer/safe))))) - - (define (destructor/safe args) - (let ((success? - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running destructor in worker thread (~A): ~A:\n ~A\n" - name - initializer - exn) - #f) - (lambda () - (with-throw-handler #t - (lambda () - (apply destructor args) - #t) - (lambda _ - (backtrace)))) - #:unwind? #t))) - - (or success? - #t - (begin - (sleep 5) - (destructor/safe args))))) - - (let ((channel (make-channel))) - (for-each - (lambda (thread-index) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append - name " w t " - (number->string thread-index)))) - (const #t)) - - (let init ((args (initializer/safe))) - (parameterize ((param args)) - (let loop ((current-lifetime lifetime)) - (let ((exception? - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second) - proc) - - (let* ((start-time (get-internal-real-time)) - (response - (with-exception-handler - (lambda (exn) - (list 'worker-thread-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (call-with-values - (lambda () - (apply proc args)) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - vals)))) - (lambda args - (when (match args - (('%exception exn) - (log-exception? exn)) - (_ #t)) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" args) - (backtrace))))) - #:unwind? #t))) - (put-message reply - response) - - (match response - (('worker-thread-error duration _) - (when duration-logger - (duration-logger duration proc)) - #t) - ((duration . _) - (when duration-logger - (duration-logger duration proc)) - #f)))))))) - - (unless (and expire-on-exception? - exception?) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))))) - - (when destructor - (destructor/safe args)) - - (init (initializer/safe)))))) - (iota parallelism)) - - (worker-thread-set channel - param))) - -(define* (call-with-worker-thread record proc #:key duration-logger) - "Send PROC to the worker thread through CHANNEL. Return the result of PROC. -If already in the worker thread, call PROC immediately." - (let ((args ((worker-thread-set-arguments-parameter record)))) - (if args - (apply proc args) - (let ((reply (make-channel))) - (put-message (worker-thread-set-channel record) - (list reply (get-internal-real-time) proc)) - (match (get-message reply) - (('worker-thread-error duration exn) - (when duration-logger - (duration-logger duration)) - (raise-exception exn)) - ((duration . result) - (when duration-logger - (duration-logger duration)) - (apply values result))))))) - -(define* (open-socket-for-uri* uri - #:key (verify-certificate? #t)) - (define tls-wrap - (@@ (web client) tls-wrap)) - - (define https? - (eq? 'https (uri-scheme uri))) - - (define plain-uri - (if https? - (build-uri - 'http - #:userinfo (uri-userinfo uri) - #:host (uri-host uri) - #:port (or (uri-port uri) 443) - #:path (uri-path uri) - #:query (uri-query uri) - #:fragment (uri-fragment uri)) - uri)) - - (let ((s (open-socket-for-uri plain-uri))) - (values - (if https? - (let ((port - (tls-wrap s (uri-host uri) - #:verify-certificate? verify-certificate?))) - ;; Guile/guile-gnutls don't handle the handshake happening on a non - ;; blocking socket, so change the behavior here. - (let ((flags (fcntl s F_GETFL))) - (fcntl s F_SETFL (logior O_NONBLOCK flags))) - port) - (let ((flags (fcntl s F_GETFL))) - (fcntl s F_SETFL (logior O_NONBLOCK flags)) - s)) - s))) - -;; Copied from (fibers web server) -(define (call-with-sigint thunk cvar) - (let ((handler #f)) - (dynamic-wind - (lambda () - (set! handler - (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) - thunk - (lambda () - (if handler - ;; restore Scheme handler, SIG_IGN or SIG_DFL. - (sigaction SIGINT (car handler) (cdr handler)) - ;; restore original C handler. - (sigaction SIGINT #f)))))) - -;; This variant of run-server from the fibers library supports running -;; multiple servers within one process. -(define run-server/patched - (let ((fibers-web-server-module - (resolve-module '(fibers web server)))) - - (define set-nonblocking! - (module-ref fibers-web-server-module 'set-nonblocking!)) - - (define make-default-socket - (module-ref fibers-web-server-module 'make-default-socket)) - - (define socket-loop - (module-ref fibers-web-server-module 'socket-loop)) - - (lambda* (handler - #:key - (host #f) - (family AF_INET) - (addr (if host - (inet-pton family host) - INADDR_LOOPBACK)) - (port 8080) - (socket (make-default-socket family addr port))) - ;; We use a large backlog by default. If the server is suddenly hit - ;; with a number of connections on a small backlog, clients won't - ;; receive confirmation for their SYN, leading them to retry -- - ;; probably successfully, but with a large latency. - (listen socket 1024) - (set-nonblocking! socket) - (sigaction SIGPIPE SIG_IGN) - (spawn-fiber (lambda () (socket-loop socket handler)))))) - -;; These procedure are subject to spurious wakeups. - -(define (readable? port) - "Test if PORT is writable." - (match (select (vector port) #() #() 0) - ((#() #() #()) #f) - ((#(_) #() #()) #t))) - -(define (writable? port) - "Test if PORT is writable." - (match (select #() (vector port) #() 0) - ((#() #() #()) #f) - ((#() #(_) #()) #t))) - -(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) - (make-base-operation #f - (lambda _ - (and (ready? (port-ready-fd port)) values)) - (lambda (flag sched resume) - (define (commit) - (match (atomic-box-compare-and-swap! flag 'W 'S) - ('W (resume values)) - ('C (commit)) - ('S #f))) - (schedule-when-ready - sched (port-ready-fd port) commit)))) - -(define (wait-until-port-readable-operation port) - "Make an operation that will succeed when PORT is readable." - (unless (input-port? port) - (error "refusing to wait forever for input on non-input port")) - (make-wait-operation readable? schedule-task-when-fd-readable port - port-read-wait-fd - wait-until-port-readable-operation)) - -(define (wait-until-port-writable-operation port) - "Make an operation that will succeed when PORT is writable." - (unless (output-port? port) - (error "refusing to wait forever for output on non-output port")) - (make-wait-operation writable? schedule-task-when-fd-writable port - port-write-wait-fd - wait-until-port-writable-operation)) - - - -(define &port-timeout - (make-exception-type '&port-timeout - &external-error - '(port))) - -(define make-port-timeout-error - (record-constructor &port-timeout)) - -(define port-timeout-error? - (record-predicate &port-timeout)) - -(define &port-read-timeout - (make-exception-type '&port-read-timeout - &port-timeout - '())) - -(define make-port-read-timeout-error - (record-constructor &port-read-timeout)) - -(define port-read-timeout-error? - (record-predicate &port-read-timeout)) - -(define &port-write-timeout - (make-exception-type '&port-write-timeout - &port-timeout - '())) - -(define make-port-write-timeout-error - (record-constructor &port-write-timeout)) - -(define port-write-timeout-error? - (record-predicate &port-write-timeout)) - -(define* (with-port-timeouts thunk - #:key timeout - (read-timeout timeout) - (write-timeout timeout)) - (define (no-fibers-wait port mode timeout) - (define poll-timeout-ms 200) - - ;; When the GC runs, it restarts the poll syscall, but the timeout - ;; remains unchanged! When the timeout is longer than the time - ;; between the syscall restarting, I think this renders the - ;; timeout useless. Therefore, this code uses a short timeout, and - ;; repeatedly calls poll while watching the clock to see if it has - ;; timed out overall. - (let ((timeout-internal - (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) - (let loop ((poll-value - (port-poll port mode poll-timeout-ms))) - (if (= poll-value 0) - (if (> (get-internal-real-time) - timeout-internal) - (raise-exception - (if (string=? mode "r") - (make-port-read-timeout-error port) - (make-port-write-timeout-error port))) - (loop (port-poll port mode poll-timeout-ms))) - poll-value)))) - - (parameterize - ((current-read-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-readable-operation port) - (wrap-operation - (sleep-operation read-timeout) - (lambda () - (raise-exception - (make-port-read-timeout-error thunk port)))))) - (no-fibers-wait port "r" read-timeout)))) - (current-write-waiter - (lambda (port) - (if (current-scheduler) - (perform-operation - (choice-operation - (wait-until-port-writable-operation port) - (wrap-operation - (sleep-operation write-timeout) - (lambda () - (raise-exception - (make-port-write-timeout-error thunk port)))))) - (no-fibers-wait port "w" write-timeout))))) - (thunk))) diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in index 67515dc..812ec98 100644 --- a/scripts/nar-herder.in +++ b/scripts/nar-herder.in @@ -148,9 +148,16 @@ "none" (string->number arg)) (alist-delete 'storage-limit result)))) + (option '("storage-minimum-free-space") #t #f + (lambda (opt name arg result) + (alist-cons 'storage-minimum-free-space + (if (string=? arg "none") + "none" + (string->number arg)) + (alist-delete 'storage-minimum-free-space result)))) - ;; stored-on=https://other-nar-herder-server - ;; stored-on=https://other-nar-herder-server&stored-on=https://different-server + ;; (stored-on https://other-nar-herder-server) + ;; and=((stored-on https://other-nar-herder-server) (stored-on https://different-server)) (option '("storage-nar-removal-criteria") #t #f (lambda (opt name arg result) (alist-cons 'storage-nar-removal-criteria @@ -391,37 +398,78 @@ (assq-ref opts 'arguments))) (len (length narinfos)) (progress - (progress-reporter/bar len - (format #f "importing ~a narinfos" - len) - (current-error-port)))) + (if (= 1 len) + progress-reporter/silent + (progress-reporter/bar len + (format #f "importing ~a narinfos" + len) + (current-error-port))))) (call-with-progress-reporter progress (lambda (report) (database-call-with-transaction database (lambda (db) - (let ((read-narinfos - (map - (lambda (narinfo-file) - (let ((narinfo - (call-with-input-file narinfo-file - (lambda (port) - ;; Set url to a dummy value as this doesn't - ;; matter - (read-narinfo port - "https://narherderdummyvalue"))))) - - (database-insert-narinfo - database - narinfo - #:tags (or (assq-ref opts 'tags) - '())) - - (report) - - narinfo)) - narinfos))) + (let* ((canonical-storage + (and=> (assq-ref opts 'storage) + canonicalize-path)) + (read-narinfos + (map + (lambda (narinfo-file) + (let ((narinfo + (call-with-input-file narinfo-file + (lambda (port) + ;; Set url to a dummy value as this doesn't + ;; matter + (read-narinfo port + "https://narherderdummyvalue"))))) + + (define (check-size! file size) + (let ((actual-size (stat:size (stat file)))) + (unless (= size actual-size) + (error + (simple-format + #f + "error importing ~A, ~A should be ~A bytes but is ~A" + narinfo-file + file + size + actual-size))))) + + (database-insert-narinfo + database + narinfo + #:tags (or (assq-ref opts 'tags) + '())) + + (when canonical-storage + (for-each + (lambda (uri size) + (let* ((nar-path + (uri-decode (uri-path uri))) + (source + (string-append + (dirname narinfo-file) "/" nar-path))) + (if (string=? canonical-storage + (dirname narinfo-file)) + (check-size! source size) + (let ((dest + (string-append + canonical-storage "/" nar-path))) + (check-size! source size) + (simple-format (current-error-port) + "moving ~A to ~A\n" + source dest) + (rename-file source dest) + ;; Re-check file size + (check-size! dest size))))) + (narinfo-uris narinfo) + (narinfo-file-sizes narinfo))) + + (report) + + narinfo)) + narinfos))) (when (assq-ref opts 'ensure-references-exist) (for-each @@ -443,7 +491,9 @@ "missing reference to ~A\n" reference)))) (narinfo-references narinfo)))) - read-narinfos)))))))))) + read-narinfos))))))) + (when (= 1 len) + (simple-format (current-error-port) "imported narinfo\n"))))) (("remove" rest ...) (let* ((opts (parse-options %base-options %base-option-defaults @@ -458,11 +508,14 @@ (port-log (make <port-log> #:port (current-output-port) #:formatter - (lambda (lvl time str) + ;; In guile-lib v0.2.8 onwards, the formatter is + ;; called with more arguments + (lambda args ; lvl, time, str (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime time)) - lvl - str))))) + (strftime "%F %H:%M:%S" (localtime + (second args))) + (first args) + (third args)))))) (add-handler! lgr port-log) (open-log! lgr) @@ -507,8 +560,9 @@ ;; that'll stop these files appearing in narinfos (database-remove-cached-narinfo-file database - narinfo-id - (symbol->string compression))) + (assq-ref narinfo-details 'id) + (symbol->string + (assq-ref cached-narinfo-details 'compression)))) cached-narinfo-files) (database-remove-narinfo database store-path)) @@ -524,11 +578,14 @@ (port-log (make <port-log> #:port (current-output-port) #:formatter - (lambda (lvl time str) + ;; In guile-lib v0.2.8 onwards, the formatter is + ;; called with more arguments + (lambda args ; lvl, time, str (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime time)) - lvl - str)))) + (strftime "%F %H:%M:%S" (localtime + (second args))) + (first args) + (third args))))) (metrics-registry (make-metrics-registry #:namespace "narherder"))) @@ -545,7 +602,8 @@ (loop (cdr levels))))) (let* ((database (setup-database (assq-ref opts 'database) - metrics-registry)) + metrics-registry + #:readonly? #t)) (canonical-storage (and=> (assq-ref opts 'storage) canonicalize-path))) @@ -567,11 +625,14 @@ (port-log (make <port-log> #:port (current-output-port) #:formatter - (lambda (lvl time str) + ;; In guile-lib v0.2.8 onwards, the formatter is + ;; called with more arguments + (lambda args ; lvl, time, str (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime time)) - lvl - str))))) + (strftime "%F %H:%M:%S" (localtime + (second args))) + (first args) + (third args)))))) (add-handler! lgr port-log) (open-log! lgr) @@ -602,4 +663,5 @@ (lambda (port) (simple-format port "~A\n" (getpid)))))) - (run-nar-herder-service opts lgr)))) + (with-fluids ((%file-port-name-canonicalization 'none)) + (run-nar-herder-service opts lgr))))) |