diff options
Diffstat (limited to 'nar-herder')
-rw-r--r-- | nar-herder/cached-compression.scm | 753 | ||||
-rw-r--r-- | nar-herder/database.scm | 1006 | ||||
-rw-r--r-- | nar-herder/mirror.scm | 85 | ||||
-rw-r--r-- | nar-herder/recent-changes.scm | 159 | ||||
-rw-r--r-- | nar-herder/server.scm | 795 | ||||
-rw-r--r-- | nar-herder/storage.scm | 730 | ||||
-rw-r--r-- | nar-herder/utils.scm | 836 |
7 files changed, 3524 insertions, 840 deletions
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm new file mode 100644 index 0000000..5c257dc --- /dev/null +++ b/nar-herder/cached-compression.scm @@ -0,0 +1,753 @@ +;;; Nar Herder +;;; +;;; Copyright © 2022, 2023 Christopher Baines <mail@cbaines.net> +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; <http://www.gnu.org/licenses/>. + +(define-module (nar-herder cached-compression) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-19) + #:use-module (srfi srfi-26) + #:use-module (srfi srfi-71) + #:use-module (ice-9 ftw) + #:use-module (ice-9 match) + #:use-module (ice-9 atomic) + #:use-module (ice-9 threads) + #:use-module (logging logger) + #:use-module (prometheus) + #:use-module (fibers) + #:use-module (fibers timers) + #:use-module (fibers channels) + #:use-module (fibers operations) + #:use-module (web uri) + #:use-module (web client) + #:use-module (web response) + #:use-module (guix store) + #:use-module ((guix utils) #:select (compressed-file? + call-with-decompressed-port)) + #:use-module ((guix build utils) + #:select (dump-port mkdir-p)) + #:use-module (nar-herder utils) + #:use-module (nar-herder database) + #:export (start-cached-compression-management-fiber + start-cached-compression-removal-fiber + start-cached-compression-schedule-removal-fiber)) + +;; Nar caching overview +;; +;; On start +;; - Compute the size of each cached compression directory +;; - Remove database entries if they're missing from the directory +;; - Remove files if they're missing from the database +;; +;; On nar usage +;; - Bump count +;; - If count is sufficient +;; - Trigger generation of the cached nar +;; - Skip if the work queue already includes this job +;; - At the start of the job, check for a database entry and exit +;; early if one exists bumping the atime of the file +;; - If the file doesn't exist, check if there's free space +;; - If there's free space, create the file +;; +;; Nar removal fiber +;; - Periodically remove nars that have been scheduled for removal +;; (when the scheduled time passes) +;; +;; Nar schedule removal fiber +;; - Periodically check for nars that haven't been accessed in some +;; time and schedule them for removal + +(define (perform-cached-compression-startup database + enabled-cached-compressions + nar-cache-files) + (let* ((database-entries-missing-files + ;; List tracking entries in the database for cached files, + ;; where the file is missing from the disk. + ;; + ;; These database entries will be removed at the end of the + ;; startup procecss. + '()) + + ;; alist of compression to hash table of files + ;; + ;; Entries from the hash tables will be removed when the + ;; database entry is processed below, so these hash tables + ;; will be left reflecting files in the directories, but with + ;; no entry in the database. + ;; + ;; These files will be deleted at the end of the startup + ;; process. + (files-by-compression + (map + (match-lambda + ((compression . details) + (let ((result (make-hash-table))) + (for-each + (lambda (file) + (hash-set! result file #t)) + (scandir (assq-ref details 'directory) + (negate (cut member <> '("." ".."))))) + (cons compression + result)))) + enabled-cached-compressions)) + + (cached-bytes-by-compression + (database-fold-cached-narinfo-files + database + (lambda (details result) + (let ((compression + (assq-ref details 'compression)) + (filename + (store-path-base + (assq-ref details 'store-path)))) + + (let ((files-hash + (assq-ref files-by-compression compression))) + (if (hash-ref files-hash filename) + (begin + (hash-remove! files-hash filename) + + (metric-increment nar-cache-files + #:label-values + `((compression . ,compression))) + + `((,compression . ,(+ (assq-ref details 'size) + (or (assq-ref result compression) + 0))) + ,@(alist-delete compression result))) + + ;; Database entry, but file missing + (begin + (set! database-entries-missing-files + (cons details + database-entries-missing-files)) + result))))) + (map + (lambda (compression) + (cons compression 0)) + (map car enabled-cached-compressions))))) + + ;; Delete cached files with no database entries + (for-each + (match-lambda + ((compression . hash-table) + (let ((count (hash-count (const #t) + hash-table))) + (unless (= 0 count) + (let ((directory + (assq-ref + (assq-ref enabled-cached-compressions compression) + 'directory))) + (simple-format #t "deleting ~A cached files from ~A\n" + count + directory) + (hash-for-each + (lambda (filename _) + (delete-file (string-append directory "/" filename))) + hash-table)))))) + files-by-compression) + + ;; Delete database entries where the file is missing + (let ((count (length database-entries-missing-files))) + (unless (= 0 count) + (simple-format + #t + "deleting ~A cached_narinfo_files entries due to missing files\n" + count) + + (for-each + (lambda (details) + (database-remove-cached-narinfo-file database + (assq-ref details 'narinfo-id) + (symbol->string + (assq-ref details 'compression)))) + database-entries-missing-files))) + + cached-bytes-by-compression)) + +;; This fiber manages metadata around cached compressions, and +;; delegates tasks to the thread pool to generate newly compressed +;; nars +(define* (start-cached-compression-management-fiber + database + metrics-registry + nar-source + enabled-cached-compressions + cached-compression-min-uses + #:key (cached-compression-workers 2) + scheduler) + + (define nar-cache-bytes-metric + (make-gauge-metric metrics-registry + "nar_cache_size_bytes" + #:labels '(compression))) + + (define nar-cache-files + (make-gauge-metric metrics-registry + "nar_cache_files" + #:labels '(compression))) + + (define channel + (make-channel)) + + (let ((process-job + count-jobs + count-threads + list-jobs + (create-work-queue cached-compression-workers + (lambda (thunk) + (thunk)) + #:name "cached compression"))) + + (define (consider-narinfo cached-bytes-by-compression + usage-hash-table + narinfo-id) + (let* ((existing-cached-files + ;; This is important both to avoid trying to create + ;; files twice, but also in the case where additional + ;; compressions are enabled and more files need to be + ;; generated + (database-select-cached-narinfo-files-by-narinfo-id + database + narinfo-id)) + (existing-compressions + (map (lambda (details) + (assq-ref details 'compression)) + existing-cached-files)) + (missing-compressions + (lset-difference eq? + (map car enabled-cached-compressions) + existing-compressions)) + (narinfo-files + (database-select-narinfo-files-by-narinfo-id + database + narinfo-id)) + + (compress-file? + (let ((url (assq-ref (first narinfo-files) 'url))) + ;; TODO: Maybe this should be configurable? + (not (compressed-file? url))))) + + (when (and compress-file? + (not (null? missing-compressions))) + (let ((new-count + (let ((val (+ 1 + (or (hash-ref usage-hash-table narinfo-id) + 0)))) + (hash-set! usage-hash-table + narinfo-id + val) + val))) + + (when (and (> new-count + cached-compression-min-uses)) + (let* ((narinfo-details + (database-select-narinfo database narinfo-id)) + (nar-size + (assq-ref narinfo-details 'nar-size)) + (compressions-with-space + (filter + (lambda (compression) + (let ((directory-max-size + (assq-ref + (assq-ref enabled-cached-compressions + compression) + 'directory-max-size)) + (current-directory-size + (assq-ref cached-bytes-by-compression + compression))) + + (unless current-directory-size + (log-msg 'ERROR "current-directory-size unset: " + current-directory-size)) + (unless nar-size + (log-msg 'ERROR "nar-size unset: " + nar-size)) + + (if directory-max-size + (< (+ current-directory-size + ;; Assume the commpressed nar could be + ;; as big as the uncompressed nar + nar-size) + directory-max-size) + #t))) + missing-compressions))) + (for-each + (lambda (compression) + (spawn-fiber + (lambda () + (process-job + (lambda () + (let ((new-bytes + (make-compressed-nar + narinfo-files + nar-source + enabled-cached-compressions + compression + #:level (assq-ref + (assq-ref enabled-cached-compressions + compression) + 'level)))) + (put-message channel + (list 'cached-narinfo-added + narinfo-id + compression + new-bytes + #f)))))))) + compressions-with-space))))))) + + (spawn-fiber + (lambda () + (let ((initial-cached-bytes-by-compression + (perform-cached-compression-startup + database + enabled-cached-compressions + nar-cache-files)) + (nar-cached-compression-usage-hash-table + (make-hash-table 65536))) + + (for-each + (match-lambda + ((compression . bytes) + (metric-set + nar-cache-bytes-metric + bytes + #:label-values `((compression . ,compression))))) + initial-cached-bytes-by-compression) + + (let loop ((cached-bytes-by-compression + initial-cached-bytes-by-compression)) + (match (get-message channel) + (('narinfo-id . narinfo-id) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception considering narinfo (" + narinfo-id "): " exn) + #f) + (lambda () + (consider-narinfo cached-bytes-by-compression + nar-cached-compression-usage-hash-table + narinfo-id)) + #:unwind? #t) + (loop cached-bytes-by-compression)) + + (((and (or 'cached-narinfo-added 'cached-narinfo-removed) + action) + narinfo-id compression size reply) + (let ((updated-bytes + ((if (eq? action 'cached-narinfo-added) + + + -) + (or (assq-ref cached-bytes-by-compression + compression) + 0) + size))) + + (metric-set + nar-cache-bytes-metric + updated-bytes + #:label-values `((compression . ,compression))) + + ((if (eq? action 'cached-narinfo-added) + metric-increment + metric-decrement) + nar-cache-files + #:label-values `((compression . ,compression))) + + ;; Use an explicit transaction as it handles the + ;; database being busy, + (database-call-with-transaction + database + (lambda _ + (if (eq? action 'cached-narinfo-added) + (database-insert-cached-narinfo-file + database + narinfo-id + size + compression) + (let ((cached-narinfo-details + (database-select-cached-narinfo-file-by-narinfo-id-and-compression + database + narinfo-id + compression))) + + ;; It might not have been scheduled for + ;; removal, but remove any schedule that + ;; exists + (let ((schedule-removed? + (database-delete-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-details 'id)))) + (when schedule-removed? + (metric-decrement + (metrics-registry-fetch-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total")))) + + ;; Remove all the database entries first, as + ;; that'll stop these files appearing in narinfos + (database-remove-cached-narinfo-file + database + narinfo-id + (symbol->string compression)))))) + + (hash-remove! nar-cached-compression-usage-hash-table + narinfo-id) + + (when reply + (put-message reply #t)) + + (loop (alist-cons + compression + updated-bytes + (alist-delete compression + cached-bytes-by-compression))))))))) + scheduler) + + channel)) + +;; Periodically check for nars that haven't been accessed in some time +;; and schedule them for removal +(define (start-cached-compression-schedule-removal-fiber + database + metrics-registry + cached-compression-management-channel + enabled-cached-compressions + cached-compression-removal-fiber-wakeup-channel + base-ttl) + + (define (files-to-schedule-for-removal compression-details) + (let* ((directory (assq-ref compression-details 'directory)) + (unused-removal-duration + (assq-ref compression-details 'unused-removal-duration)) + (atime-threshold + (time-second + (subtract-duration (current-time) + unused-removal-duration)))) + (scandir + directory + (lambda (filename) + (and + (< (stat:atime (stat (string-append directory "/" filename))) + atime-threshold) + (not (member filename '("." "..")))))))) + + (define (schedule-removal compression compression-details) + (let ((files + (let ((files + (with-time-logging "files-to-schedule-for-removal" + (files-to-schedule-for-removal compression-details)))) + (log-msg 'INFO "cached-compression-schedule-removal-fiber " + "looking at " (length files) " files") + files)) + (count-metric + (metrics-registry-fetch-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total"))) + + (with-time-logging "inserting scheduled-cached-narinfo-removals" + (for-each + (lambda (file) + (let* ((cached-narinfo-file-details + (database-select-cached-narinfo-file-by-hash + database + (string-take file 32) ; hash part + compression)) + (existing-scheduled-removal + (database-select-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-file-details 'id)))) + (unless existing-scheduled-removal + (let ((removal-time + ;; The earliest this can be removed is the current + ;; time, plus the TTL + (add-duration + (current-time) + (make-time time-duration + 0 + (or (assq-ref compression-details 'ttl) + base-ttl))))) + (database-insert-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-file-details 'id) + removal-time) + + (metric-increment count-metric))))) + files)) + + ;; Wake the cached compression removal fiber in case one of + ;; the new scheduled removals is before it's scheduled to wake + ;; up + (put-message cached-compression-removal-fiber-wakeup-channel + #t))) + + (spawn-fiber + (lambda () + (let ((sleep-duration + (max + (* 60 60 6) ; Maybe this be confirgurable + (apply min + (map (lambda (compression-details) + (/ (time-second + (assq-ref compression-details + 'unused-removal-duration)) + 4)) + enabled-cached-compressions)))) + (start-count + (database-count-scheduled-cached-narinfo-removal + database))) + + (metric-set + (or (metrics-registry-fetch-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total") + (make-gauge-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total")) + start-count) + + (while #t + (log-msg 'INFO "cached-compression-schedule-removal-fiber starting pass") + + (for-each + (match-lambda + ((compression . details) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "cached-compression-schedule-removal-fiber: " + "exception: " exn)) + (lambda () + (schedule-removal compression details)) + #:unwind? #t))) + enabled-cached-compressions) + + (log-msg 'INFO "cached-compression-schedule-removal-fiber sleeping for " + sleep-duration) + (sleep sleep-duration)))))) + +;; Process the scheduled removals for cached nars +(define (start-cached-compression-removal-fiber + database + cached-compression-management-channel + enabled-cached-compressions) + + (define wakeup-channel + (make-channel)) + + (define (make-pass) + (log-msg 'INFO "cached-compression-removal-fiber starting pass") + + (let ((scheduled-cached-narinfo-removal + (database-select-oldest-scheduled-cached-narinfo-removal + database))) + + (log-msg 'INFO "scheduled removal: " scheduled-cached-narinfo-removal) + + (if scheduled-cached-narinfo-removal + (if (time<=? (assq-ref scheduled-cached-narinfo-removal + 'scheduled-removal-time) + (current-time)) + (let ((id (assq-ref scheduled-cached-narinfo-removal 'id)) + (narinfo-id (assq-ref scheduled-cached-narinfo-removal + 'narinfo-id)) + (compression (assq-ref scheduled-cached-narinfo-removal + 'compression)) + (size (assq-ref scheduled-cached-narinfo-removal + 'size)) + (store-path (assq-ref scheduled-cached-narinfo-removal + 'store-path))) + (let ((reply (make-channel))) + (put-message + cached-compression-management-channel + (list 'cached-narinfo-removed + narinfo-id + compression + size + reply)) + + ;; Wait for the management fiber to delete the + ;; database entry before removing the file. + (get-message reply)) + + (let ((directory + (assq-ref (assq-ref enabled-cached-compressions + compression) + 'directory))) + (let ((filename + (string-append + directory "/" + (basename store-path)))) + (log-msg 'INFO "deleting " filename) + (delete-file filename)))) + + (let ((duration + (time-difference + (assq-ref scheduled-cached-narinfo-removal + 'scheduled-removal-time) + (current-time)))) + (perform-operation + (choice-operation + (sleep-operation (max 1 (+ 1 (time-second duration)))) + (get-operation wakeup-channel))))) + + ;; Sleep until woken + (get-message wakeup-channel)))) + + (spawn-fiber + (lambda () + (while #t + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception in cached-compression-removal-fiber: " + exn)) + (lambda () + (with-throw-handler #t + make-pass + (lambda _ + (backtrace)))) + #:unwind? #t)))) + + wakeup-channel) + +(define* (make-compressed-nar narinfo-files + nar-source + enabled-cached-compressions + target-compression + #:key level) + (define cached-compression-details + (assq-ref enabled-cached-compressions target-compression)) + + (log-msg 'INFO "making " target-compression " for " + (uri-decode + (basename + (assq-ref (first narinfo-files) 'url)))) + + (let* ((source-narinfo-file + ;; There's no specific logic to this, it should be possible + ;; to use any file + (first narinfo-files)) + (source-filename + (cond + ((string-prefix? "http" nar-source) + (let* ((output-port (mkstemp "/tmp/nar-herder-source-nar-XXXXXX")) + (filename + (port-filename output-port)) + (uri + (string->uri + (string-append nar-source + (assq-ref source-narinfo-file 'url))))) + + (log-msg 'DEBUG "downloading " (uri->string uri)) + (with-exception-handler + (lambda (exn) + (close-port output-port) + (delete-file filename) + (raise-exception exn)) + (lambda () + (with-port-timeouts + (lambda () + (call-with-values + (lambda () + (let ((port + socket + (open-socket-for-uri* uri))) + (http-get uri + #:port port + #:decode-body? #f + #:streaming? #t))) + (lambda (response body) + (unless (= (response-code response) + 200) + (error "unknown response code" + (response-code response))) + + (dump-port body output-port))) + (close-port output-port)) + #:timeout 30)) + #:unwind? #t) + + filename)) + ((string-prefix? "/" nar-source) + (string-append + ;; If it's a filename, then it's the canonical path to + ;; the storage directory + nar-source + (uri-decode (assq-ref source-narinfo-file 'url)))) + (else + (error "unknown nar source"))))) + + (let* ((dest-directory + (assq-ref cached-compression-details + 'directory)) + (dest-filename + (string-append + dest-directory + "/" + (uri-decode + (basename + (assq-ref source-narinfo-file 'url))))) + (tmp-dest-filename + (string-append dest-filename ".tmp"))) + + (when (file-exists? tmp-dest-filename) + (delete-file tmp-dest-filename)) + (when (file-exists? dest-filename) + (delete-file dest-filename)) + + (mkdir-p dest-directory) + + (call-with-input-file + source-filename + (lambda (source-port) + (call-with-decompressed-port + (string->symbol + (assq-ref source-narinfo-file + 'compression)) + source-port + (lambda (decompressed-source-port) + (let ((call-with-compressed-output-port* + (match target-compression + ('gzip + (@ (zlib) call-with-gzip-output-port)) + ('lzip + (@ (lzlib) call-with-lzip-output-port)) + ('zstd + (@ (zstd) call-with-zstd-output-port)) + ('none + (lambda (port proc) + (proc port)))))) + (apply + call-with-compressed-output-port* + (open-output-file tmp-dest-filename) + (lambda (compressed-port) + (dump-port decompressed-source-port + compressed-port)) + (if level + `(#:level ,level) + '()))))))) + (rename-file + tmp-dest-filename + dest-filename) + + (when (string-prefix? "http" nar-source) + (log-msg 'DEBUG "deleting temporary file " source-filename) + (delete-file source-filename)) + + (let ((bytes + (stat:size (stat dest-filename)))) + (log-msg 'INFO "created " dest-filename) + + bytes)))) diff --git a/nar-herder/database.scm b/nar-herder/database.scm index 61f5bb1..ded7c2c 100644 --- a/nar-herder/database.scm +++ b/nar-herder/database.scm @@ -28,10 +28,12 @@ #:use-module (sqlite3) #:use-module (fibers) #:use-module (prometheus) + #:use-module (guix store) #:use-module (guix narinfo) #:use-module (guix derivations) #:use-module (nar-herder utils) #:export (setup-database + update-database-metrics! database-optimize database-spawn-fibers @@ -42,8 +44,11 @@ database-insert-narinfo database-remove-narinfo + database-select-narinfo + database-select-narinfo-by-hash database-select-narinfo-contents-by-hash + database-count-recent-changes database-select-recent-changes database-select-latest-recent-change-datetime database-get-recent-changes-id-for-deletion @@ -51,8 +56,25 @@ database-select-narinfo-for-file database-select-narinfo-files - - database-map-all-narinfo-files)) + database-select-narinfo-files-by-narinfo-id + + database-fold-all-narinfo-files + database-map-all-narinfo-files + database-count-narinfo-files + + database-insert-cached-narinfo-file + database-select-cached-narinfo-file-by-hash + database-select-cached-narinfo-file-by-narinfo-id-and-compression + database-select-cached-narinfo-files-by-narinfo-id + database-fold-cached-narinfo-files + database-remove-cached-narinfo-file + + database-select-scheduled-narinfo-removal + database-select-scheduled-cached-narinfo-removal + database-delete-scheduled-cached-narinfo-removal + database-select-oldest-scheduled-cached-narinfo-removal + database-count-scheduled-cached-narinfo-removal + database-insert-scheduled-cached-narinfo-removal)) (define-record-type <database> (make-database database-file reader-thread-channel writer-thread-channel @@ -85,7 +107,8 @@ CREATE TABLE narinfos ( nar_size INTEGER NOT NULL, deriver TEXT, system TEXT, - contents NOT NULL + contents NOT NULL, + added_at TEXT ); CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32)); @@ -97,6 +120,8 @@ CREATE TABLE narinfo_files ( url TEXT NOT NULL ); +CREATE INDEX narinfo_files_narinfo_id ON narinfo_files (narinfo_id); + CREATE TABLE narinfo_references ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), store_path TEXT NOT NULL @@ -120,28 +145,124 @@ CREATE TABLE recent_changes ( datetime TEXT NOT NULL, change TEXT NOT NULl, data TEXT NOT NULL +); + +CREATE TABLE cached_narinfo_files ( + id INTEGER PRIMARY KEY ASC, + narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), + size INTEGER NOT NULL, + compression TEXT +); + +CREATE INDEX cached_narinfo_files_narinfo_id + ON cached_narinfo_files (narinfo_id); + +CREATE TABLE scheduled_narinfo_removal ( + narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id), + removal_datetime TEXT NOT NULL +); + +CREATE TABLE scheduled_cached_narinfo_removal ( + cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id), + removal_datetime TEXT NOT NULL );") (sqlite-exec db schema)) -(define (update-schema db) +(define (table-exists? db name) (let ((statement (sqlite-prepare db " -SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) +SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = :name"))) (sqlite-bind-arguments statement - #:name "narinfos") + #:name name) + + (let ((result + (match (sqlite-step statement) + (#f #f) + (#(1) #t)))) + (sqlite-finalize statement) - (match (sqlite-step statement) - (#f (perform-initial-database-setup db)) - (_ #f)) + result))) + +(define (column-exists? db table-name column-name) + (let ((statement + (sqlite-prepare + db + (simple-format #f "PRAGMA table_info(~A);" table-name)))) + + (let ((columns + (sqlite-map + (lambda (row) + (vector-ref row 1)) + statement))) + (sqlite-finalize statement) + + (member column-name columns)))) + +(define (update-schema db) + (unless (table-exists? db "narinfos") + (perform-initial-database-setup db)) + + (unless (table-exists? db "cached_narinfo_files") + (sqlite-exec + db + " +CREATE TABLE cached_narinfo_files ( + id INTEGER PRIMARY KEY ASC, + narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), + size INTEGER NOT NULL, + compression TEXT NOT NULL +); - (sqlite-finalize statement))) +CREATE INDEX cached_narinfo_files_narinfo_id + ON cached_narinfo_files (narinfo_id);")) + + (unless (column-exists? db "narinfos" "added_at") + (sqlite-exec + db + "ALTER TABLE narinfos ADD COLUMN added_at TEXT;")) + + (unless (table-exists? db "scheduled_narinfo_removal") + (sqlite-exec + db + " +CREATE TABLE scheduled_narinfo_removal ( + narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id), + removal_datetime TEXT NOT NULL +);")) + + (unless (table-exists? db "scheduled_cached_narinfo_removal") + (sqlite-exec + db + " +CREATE TABLE scheduled_cached_narinfo_removal ( + cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id), + removal_datetime TEXT NOT NULL +);")) + + (sqlite-exec + db + "CREATE INDEX IF NOT EXISTS narinfo_tags_narinfo_id + ON narinfo_tags (narinfo_id);") + + (sqlite-exec + db + "CREATE INDEX IF NOT EXISTS narinfo_references_narinfo_id + ON narinfo_references (narinfo_id);") + + (sqlite-exec + db + "CREATE INDEX IF NOT EXISTS narinfo_files_narinfo_id + ON narinfo_files (narinfo_id);")) + +(define* (setup-database database-file metrics-registry + #:key (reader-threads 1)) + (define mmap-size #f) -(define (setup-database database-file metrics-registry) (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA optimize;") @@ -149,44 +270,77 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) (update-schema db) + ;; (let ((requested-mmap-bytes 2147418112) + ;; (statement + ;; (sqlite-prepare + ;; db + ;; (simple-format #f "PRAGMA mmap_size=~A;" + ;; 2147418112)))) + ;; (match (sqlite-step statement) + ;; (#(result-mmap-size) + ;; (sqlite-finalize statement) + ;; (set! mmap-size + ;; result-mmap-size)))) + (sqlite-close db)) (let ((reader-thread-channel - (make-worker-thread-channel + (make-worker-thread-set (lambda () (let ((db (db-open database-file #:write? #f))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") + (when mmap-size + (sqlite-exec + db + (simple-format #f "PRAGMA mmap_size=~A;" + (number->string mmap-size)))) (list db))) #:destructor (lambda (db) (sqlite-close db)) #:lifetime 50000 + #:name "db r" ;; Use a minimum of 2 and a maximum of 8 threads - #:parallelism - (min (max (current-processor-count) - 2) - 64) + #:parallelism reader-threads #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "database_read_delay_seconds"))) - (lambda (seconds-delayed) + (lambda (seconds-delayed proc) (metric-observe delay-metric seconds-delayed) (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database read delayed by ~1,2f seconds~%" - seconds-delayed)))))) + (display + (format + #f + "warning: database read (~a) delayed by ~1,2f seconds~%" + proc + seconds-delayed) + (current-error-port))))) + #:duration-logger + (lambda (duration proc) + (when (> duration 5) + (display + (format + #f + "warning: database read took ~1,2f seconds (~a)~%" + duration + proc) + (current-error-port)))))) (writer-thread-channel - (make-worker-thread-channel + (make-worker-thread-set (lambda () (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (sqlite-exec db "PRAGMA foreign_keys = ON;") + (when mmap-size + (sqlite-exec + db + (simple-format #f "PRAGMA mmap_size=~A;" + (number->string mmap-size)))) (list db))) #:destructor (lambda (db) @@ -195,6 +349,7 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) (sqlite-close db)) #:lifetime 500 + #:name "db w" ;; SQLite doesn't support parallel writes #:parallelism 1 @@ -202,19 +357,56 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) (make-histogram-metric metrics-registry "database_write_delay_seconds"))) - (lambda (seconds-delayed) + (lambda (seconds-delayed proc) (metric-observe delay-metric seconds-delayed) (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database write delayed by ~1,2f seconds~%" - seconds-delayed))))))) + (display + (format + #f + "warning: database write (~a) delayed by ~1,2f seconds~%" + proc + seconds-delayed) + (current-error-port))))) + #:duration-logger + (lambda (duration proc) + (when (> duration 5) + (display + (format + #f + "warning: database write took ~1,2f seconds (~a)~%" + duration + proc) + (current-error-port))))))) (make-database database-file reader-thread-channel writer-thread-channel metrics-registry))) +(define (update-database-metrics! database) + (let* ((db-filename (database-file database)) + (db-wal-filename + (string-append db-filename "-wal")) + + (registry (database-metrics-registry database)) + (db-bytes + (or (metrics-registry-fetch-metric registry + "database_bytes") + (make-gauge-metric + registry "database_bytes" + #:docstring "Size of the SQLite database file"))) + (db-wal-bytes + (or (metrics-registry-fetch-metric registry + "database_wal_bytes") + (make-gauge-metric + registry "database_wal_bytes" + #:docstring "Size of the SQLite Write Ahead Log file")))) + + + (metric-set db-bytes (stat:size (stat db-filename))) + (metric-set db-wal-bytes (stat:size (stat db-wal-filename)))) + #t) + (define (db-optimize db db-filename) (define (wal-size) (let ((db-wal-filename @@ -267,43 +459,110 @@ PRAGMA optimize;"))) (string-append "database_" thing "_duration_seconds")) (if registry - (let* ((metric - (or (metrics-registry-fetch-metric registry metric-name) - (make-histogram-metric registry - metric-name))) - (start-time (get-internal-real-time))) - (let ((result (thunk))) - (metric-observe metric - (/ (- (get-internal-real-time) start-time) - internal-time-units-per-second)) - result)) + (call-with-duration-metric registry + metric-name + thunk) (thunk))) (define %current-transaction-proc (make-parameter #f)) (define* (database-call-with-transaction database proc - #:key - readonly?) + #:key + readonly? + (immediate? (not readonly?))) (define (run-proc-within-transaction db) - (if (%current-transaction-proc) - (proc db) ; already in transaction - (begin - (sqlite-exec db "BEGIN TRANSACTION;") - (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "error: sqlite rolling back transaction\n") - (sqlite-exec db "ROLLBACK TRANSACTION;") - (raise-exception exn)) + (define (attempt-begin) + (with-exception-handler + (lambda (exn) + (match (exception-args exn) + (('sqlite-exec 5 msg) + (simple-format + (current-error-port) + "warning: issue starting transaction (code: 5, proc: ~A): ~A\n" + proc msg) + #f) + (_ + (simple-format (current-error-port) + "exception starting transaction: ~A\n" exn) + (raise-exception exn)))) + (lambda () + (sqlite-exec db (if immediate? + "BEGIN IMMEDIATE TRANSACTION;" + "BEGIN TRANSACTION;")) + #t) + #:unwind? #t)) + + (define (attempt-commit) + (with-exception-handler + (lambda (exn) + (match (exception-args exn) + (('sqlite-exec 5 msg) + (simple-format + (current-error-port) + "warning: attempt commit (code: 5, proc: ~A): ~A\n" + proc msg) + #f) + (_ + (simple-format (current-error-port) + "exception committing transaction: ~A\n" exn) + (raise-exception exn)))) + (lambda () + (sqlite-exec db "COMMIT TRANSACTION;") + #t) + #:unwind? #t)) + + (if (attempt-begin) + (call-with-values (lambda () - (call-with-values - (lambda () - (parameterize ((%current-transaction-proc proc)) - (proc db))) - (lambda vals - (sqlite-exec db "COMMIT TRANSACTION;") - (apply values vals)))))))) + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "error: sqlite rolling back transaction (~A)\n" + exn) + (sqlite-exec db "ROLLBACK TRANSACTION;") + (raise-exception exn)) + (lambda () + (parameterize ((%current-transaction-proc proc)) + (proc-with-duration-timing db))) + #:unwind? #t)) + (lambda vals + (let loop ((success? (attempt-commit))) + (if success? + (apply values vals) + (loop (attempt-commit)))))) + + ;; Database is busy, so retry + (run-proc-within-transaction db))) + + (define (proc-with-duration-timing db) + (let ((start-time (get-internal-real-time))) + (call-with-values + (lambda () + (with-throw-handler #t + (lambda () + (proc db)) + (lambda (key . args) + (simple-format + (current-error-port) + "exception in transaction: ~A: ~A\n" + key args) + (backtrace)))) + (lambda vals + (let ((duration-seconds + (/ (- (get-internal-real-time) start-time) + internal-time-units-per-second))) + (when (and (not readonly?) + (> duration-seconds 2)) + (display + (format + #f + "warning: ~a:\n took ~4f seconds in transaction\n" + proc + duration-seconds) + (current-error-port))) + + (cons duration-seconds vals)))))) (match (call-with-worker-thread ((if readonly? @@ -311,25 +570,9 @@ PRAGMA optimize;"))) database-writer-thread-channel) database) (lambda (db) - (let ((start-time (get-internal-real-time))) - (call-with-values - (lambda () - (run-proc-within-transaction db)) - (lambda vals - (let ((duration-seconds - (/ (- (get-internal-real-time) start-time) - internal-time-units-per-second))) - (when (and (not readonly?) - (> duration-seconds 2)) - (display - (format - #f - "warning: ~a:\n took ~4f seconds in transaction\n" - proc - duration-seconds) - (current-error-port))) - - (cons duration-seconds vals))))))) + (if (%current-transaction-proc) + (proc-with-duration-timing db) ; already in transaction + (run-proc-within-transaction db)))) ((duration vals ...) (apply values vals)))) @@ -417,9 +660,9 @@ SELECT id FROM tags WHERE key = :key AND value = :value" db " INSERT INTO narinfos ( - store_path, nar_hash, nar_size, deriver, system, contents + store_path, nar_hash, nar_size, deriver, system, contents, added_at ) VALUES ( - :store_path, :nar_hash, :nar_size, :deriver, :system, :contents + :store_path, :nar_hash, :nar_size, :deriver, :system, :contents, :added_at )" #:cache? #t))) (sqlite-bind-arguments @@ -429,7 +672,8 @@ INSERT INTO narinfos ( #:nar_size (narinfo-size narinfo) #:deriver (narinfo-deriver narinfo) #:system (narinfo-system narinfo) - #:contents (narinfo-contents narinfo)) + #:contents (narinfo-contents narinfo) + #:added_at (date->string (current-date) "~1 ~3")) (sqlite-step statement) (sqlite-reset statement) @@ -563,23 +807,6 @@ INSERT INTO narinfo_tags (narinfo_id, tag_id) VALUES (:narinfo_id, :tag_id)" (define* (database-remove-narinfo database store-path #:key change-datetime) - (define (store-path->narinfo-id db) - (let ((statement - (sqlite-prepare - db - " -SELECT id FROM narinfos WHERE store_path = :store_path" - #:cache? #t))) - - (sqlite-bind-arguments - statement - #:store_path store-path) - - (let ((result (vector-ref (sqlite-step statement) 0))) - (sqlite-reset statement) - - result))) - (define (remove-narinfo-record db id) (let ((statement (sqlite-prepare @@ -677,18 +904,90 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id" (database-call-with-transaction database (lambda (db) - (let ((narinfo-id (store-path->narinfo-id db))) - (if change-datetime - (insert-change-with-datetime db store-path - change-datetime) - (insert-change db store-path)) + (let ((narinfo-details + (database-select-narinfo-by-hash + database + (store-path-hash-part store-path)))) + (if narinfo-details + (let ((narinfo-id (assq-ref narinfo-details + 'id))) + (if change-datetime + (insert-change-with-datetime db store-path + change-datetime) + (insert-change db store-path)) + + (remove-narinfo-files db narinfo-id) + (remove-narinfo-references db narinfo-id) + (remove-tags db narinfo-id) + + (remove-narinfo-record db narinfo-id) + #t) + #f))))) + +(define (database-select-narinfo database id) + (call-with-time-tracking + database + "select_narinfo" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT store_path, nar_hash, nar_size, deriver, system +FROM narinfos +WHERE id = :id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:id id) - (remove-narinfo-files db narinfo-id) - (remove-narinfo-references db narinfo-id) - (remove-tags db narinfo-id) - (remove-narinfo-record db narinfo-id) + (match (let ((result (sqlite-step statement))) + (sqlite-reset statement) + result) + (#(store_path nar_hash nar_size deriver system) + `((store-path . ,store_path) + (nar-hash . ,nar_hash) + (nar-size . ,nar_size) + (deriver . ,deriver) + (system . ,system))) + (_ + #f)))))))) + +(define (database-select-narinfo-by-hash database hash) + (call-with-time-tracking + database + "select_narinfo_by_hash" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT id, store_path, nar_hash, nar_size, deriver, system +FROM narinfos +WHERE substr(store_path, 12, 32) = :hash" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:hash hash) - #t)))) + (match (let ((result (sqlite-step statement))) + (sqlite-reset statement) + result) + (#(id store_path nar_hash nar_size deriver system) + `((id . ,id) + (store-path . ,store_path) + (nar-hash . ,nar_hash) + (nar-size . ,nar_size) + (deriver . ,deriver) + (system . ,system))) + (_ + #f)))))))) (define (database-select-narinfo-contents-by-hash database hash) (call-with-time-tracking @@ -702,7 +1001,7 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id" (sqlite-prepare db " -SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" +SELECT id, contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" #:cache? #t))) (sqlite-bind-arguments statement @@ -711,8 +1010,27 @@ SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" (match (let ((result (sqlite-step statement))) (sqlite-reset statement) result) - (#(contents) contents) - (_ #f)))))))) + (#(id contents) + (values contents id)) + (_ + (values #f #f))))))))) + +(define (database-count-recent-changes database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT COUNT(*) FROM recent_changes" + #:cache? #t))) + + (let ((result + (match (sqlite-step statement) + (#(count) count)))) + (sqlite-reset statement) + result))))) (define* (database-select-recent-changes database after-date #:key (limit 8192)) (call-with-worker-thread @@ -722,26 +1040,27 @@ SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" (sqlite-prepare db " -SELECT datetime, change, data FROM recent_changes WHERE datetime >= :datetime LIMIT :limit" +SELECT datetime, change, data +FROM recent_changes +WHERE datetime >= :datetime +ORDER BY datetime ASC +LIMIT :limit" #:cache? #t))) (sqlite-bind-arguments statement #:datetime after-date #:limit limit) - (let loop ((row (sqlite-step statement)) - (result '())) - (match row - (#(datetime change data) - (loop (sqlite-step statement) - (cons `((datetime . ,datetime) - (change . ,change) - (data . ,data)) - result))) - (#f - (sqlite-reset statement) - - (reverse result)))))))) + (let ((result + (sqlite-map + (match-lambda + (#(datetime change data) + `((datetime . ,datetime) + (change . ,change) + (data . ,data)))) + statement))) + (sqlite-reset statement) + result))))) (define (database-select-latest-recent-change-datetime database) (call-with-worker-thread @@ -869,7 +1188,42 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash" result))))))) -(define (database-map-all-narinfo-files database proc) +(define (database-select-narinfo-files-by-narinfo-id database narinfo-id) + (call-with-time-tracking + database + "select_narinfo_files_by_narinfo_id" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url +FROM narinfos +INNER JOIN narinfo_files + ON narinfos.id = narinfo_files.narinfo_id +WHERE narinfos.id = :narinfo_id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id) + + (let ((result + (sqlite-map + (match-lambda + (#(size compression url) + `((size . ,size) + (compression . ,compression) + (url . ,url)))) + statement))) + (sqlite-reset statement) + + result))))))) + +(define (database-fold-all-narinfo-files database proc init) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) @@ -885,12 +1239,410 @@ FROM narinfo_files" (lambda (row result) (match row (#(size compression url) - (cons (proc `((size . ,size) - (compression . ,compression) - (url . ,url))) + (proc `((size . ,size) + (compression . ,compression) + (url . ,url)) + result)))) + init + statement))) + (sqlite-reset statement) + + result-list))))) + +(define (database-map-all-narinfo-files database proc) + (database-fold-all-narinfo-files + database + (lambda (nar-file result) + (cons (proc nar-file) + result)) + '())) + +(define (database-count-narinfo-files database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT COUNT(*) FROM narinfo_files" + #:cache? #t))) + + (let ((result + (vector-ref (sqlite-step statement) + 0))) + (sqlite-reset statement) + + result))))) + +(define (database-insert-cached-narinfo-file database + narinfo-id + size + compression) + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO cached_narinfo_files ( + narinfo_id, size, compression +) VALUES ( + :narinfo_id, :size, :compression +)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:size size + #:compression (symbol->string compression)) + + (sqlite-step statement) + (sqlite-reset statement) + + (last-insert-rowid db))))) + +(define (database-select-cached-narinfo-file-by-hash database + hash + compression) + (call-with-time-tracking + database + "select_cached_narinfo_file_by_hash" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT cached_narinfo_files.id, cached_narinfo_files.size +FROM narinfos +INNER JOIN cached_narinfo_files + ON cached_narinfo_files.narinfo_id = narinfos.id +WHERE substr(narinfos.store_path, 12, 32) = :hash + AND cached_narinfo_files.compression = :compression" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:hash hash + #:compression (symbol->string compression)) + + (let ((result + (match (sqlite-step statement) + (#(id size) + `((id . ,id) + (size . ,size))) + (#f #f)))) + (sqlite-reset statement) + + result))))))) + +(define (database-select-cached-narinfo-files-by-narinfo-id + database + narinfo-id) + (call-with-time-tracking + database + "select_cached_narinfo_file_by_narinfo_id" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT cached_narinfo_files.id, + narinfos.store_path, + cached_narinfo_files.size, + cached_narinfo_files.compression +FROM cached_narinfo_files +INNER JOIN narinfos + ON cached_narinfo_files.narinfo_id = narinfos.id +WHERE narinfo_id = :narinfo_id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id) + + (let ((result + (sqlite-map + (match-lambda + (#(id store_path size compression) + `((id . ,id) + (store-path . ,store_path) + (size . ,size) + (compression . ,(string->symbol compression))))) + statement))) + (sqlite-reset statement) + + result))))))) + +(define (database-select-cached-narinfo-file-by-narinfo-id-and-compression + database + narinfo-id + compression) + (call-with-time-tracking + database + "select_cached_narinfo_file_by_narinfo_id_and_compression" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT cached_narinfo_files.id, + narinfos.store_path, + cached_narinfo_files.size, + cached_narinfo_files.compression +FROM cached_narinfo_files +INNER JOIN narinfos + ON cached_narinfo_files.narinfo_id = narinfos.id +WHERE narinfo_id = :narinfo_id + AND compression = :compression" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:compression (symbol->string compression)) + + (let ((result + (match (sqlite-step statement) + (#(id store_path size compression) + `((id . ,id) + (store-path . ,store_path) + (size . ,size) + (compression . ,(string->symbol compression))))))) + (sqlite-reset statement) + + result))))))) + +(define (database-fold-cached-narinfo-files database + proc + init) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT store_path, size, compression, narinfo_id +FROM cached_narinfo_files +INNER JOIN narinfos + ON cached_narinfo_files.narinfo_id = narinfos.id" + #:cache? #t))) + (let ((result-list + (sqlite-fold + (lambda (row result) + (match row + (#(store_path size compression narinfo_id) + (proc `((size . ,size) + (compression . ,(string->symbol compression)) + (store-path . ,store_path) + (narinfo-id . ,narinfo_id)) result)))) - '() + init statement))) (sqlite-reset statement) result-list))))) + +(define (database-remove-cached-narinfo-file database narinfo-id compression) + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +DELETE FROM cached_narinfo_files +WHERE narinfo_id = :narinfo_id + AND compression = :compression" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:compression compression) + + (sqlite-step statement) + (sqlite-reset statement))))) + +(define (database-select-scheduled-narinfo-removal database narinfo-id) + (call-with-time-tracking + database + "select_scheduled_narinfo_removal" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT removal_datetime +FROM scheduled_narinfo_removal +WHERE narinfo_id = :narinfo_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id) + + (let ((result + (match (sqlite-step statement) + (#(datetime) + (date->time-utc + (string->date + datetime + "~Y-~m-~d ~H:~M:~S"))) + (#f #f)))) + (sqlite-reset statement) + result))))))) + +(define (database-select-scheduled-cached-narinfo-removal database + cached-narinfo-file-id) + (call-with-time-tracking + database + "select_scheduled_narinfo_removal" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT removal_datetime +FROM scheduled_cached_narinfo_removal +WHERE cached_narinfo_file_id = :cached_narinfo_file_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:cached_narinfo_file_id cached-narinfo-file-id) + + (let ((result + (match (sqlite-step statement) + (#(datetime) + (date->time-utc + (string->date + datetime + "~Y-~m-~d ~H:~M:~S"))) + (#f #f)))) + (sqlite-reset statement) + result))))))) + +(define (database-delete-scheduled-cached-narinfo-removal database + cached-narinfo-file-id) + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +DELETE FROM scheduled_cached_narinfo_removal +WHERE cached_narinfo_file_id = :cached_narinfo_file_id +RETURNING 1" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:cached_narinfo_file_id cached-narinfo-file-id) + + (let ((result (->bool (sqlite-step statement)))) + (sqlite-reset statement) + + result))))) + +(define (database-select-oldest-scheduled-cached-narinfo-removal database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT cached_narinfo_files.id, + cached_narinfo_files.narinfo_id, + cached_narinfo_files.size, + cached_narinfo_files.compression, + narinfos.store_path, + scheduled_cached_narinfo_removal.removal_datetime +FROM scheduled_cached_narinfo_removal +INNER JOIN cached_narinfo_files + ON scheduled_cached_narinfo_removal.cached_narinfo_file_id = + cached_narinfo_files.id +INNER JOIN narinfos + ON cached_narinfo_files.narinfo_id = narinfos.id +ORDER BY scheduled_cached_narinfo_removal.removal_datetime DESC +LIMIT 1" + #:cache? #t))) + + (let ((result + (match (sqlite-step statement) + (#(id narinfo_id size compression store_path datetime) + `((id . ,id) + (narinfo-id . ,narinfo_id) + (size . ,size) + (compression . ,(string->symbol compression)) + (store-path . ,store_path) + (scheduled-removal-time . ,(date->time-utc + (string->date + datetime + "~Y-~m-~d ~H:~M:~S"))))) + (#f #f)))) + (sqlite-reset statement) + result))))) + +(define (database-count-scheduled-cached-narinfo-removal database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT COUNT(*) FROM scheduled_cached_narinfo_removal" + #:cache? #t))) + + (let ((result + (vector-ref (sqlite-step statement) + 0))) + (sqlite-reset statement) + + result))))) + +(define (database-insert-scheduled-cached-narinfo-removal database + cached-narinfo-file-id + removal-datetime) + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO scheduled_cached_narinfo_removal ( + cached_narinfo_file_id, removal_datetime +) VALUES ( + :cached_narinfo_file_id, :removal_datetime +)" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:cached_narinfo_file_id cached-narinfo-file-id + #:removal_datetime (date->string + (time-utc->date removal-datetime) + "~Y-~m-~d ~H:~M:~S")) + + (sqlite-step statement) + (sqlite-reset statement) + + #t)))) diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm index 6b9f4f7..8aae845 100644 --- a/nar-herder/mirror.scm +++ b/nar-herder/mirror.scm @@ -19,6 +19,7 @@ (define-module (nar-herder mirror) #:use-module (srfi srfi-1) #:use-module (srfi srfi-43) + #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) @@ -29,22 +30,18 @@ #:use-module (prometheus) #:use-module (logging logger) #:use-module (json) + #:use-module (fibers) + #:use-module (fibers channels) #:use-module (guix narinfo) #:use-module ((guix store) #:select (store-path-hash-part)) #:use-module (nar-herder utils) #:use-module (nar-herder database) #:use-module (nar-herder storage) - #:export (start-fetch-changes-thread)) - -(define (start-fetch-changes-thread database storage-root - mirror metrics-registry) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) + #:export (start-fetch-changes-fiber)) +(define (start-fetch-changes-fiber database metrics-registry + storage-root mirror + cached-compression-management-channel) (define (request-recent-changes) (define latest-recent-change (database-select-latest-recent-change-datetime database)) @@ -72,18 +69,25 @@ (lambda () (retry-on-error (lambda () - (http-get uri #:decode-body? #f)) + (log-msg 'INFO "querying for recent changes since " + latest-recent-change) + (with-port-timeouts + (lambda () + (let ((port + socket + (open-socket-for-uri* uri))) + (http-get uri + #:port port + #:streaming? #t))) + #:timeout 30)) #:times 3 #:delay 15)) (lambda (response body) (if (= (response-code response) 200) - (let* ((json-body (json-string->scm - (utf8->string body))) + (let* ((json-body (json->scm body)) (recent-changes (assoc-ref json-body "recent_changes"))) - (log-msg 'INFO "queried for recent changes since " - latest-recent-change) (log-msg 'INFO "got " (vector-length recent-changes) " changes") ;; Switch to symbol keys and standardise the key order @@ -116,48 +120,62 @@ narinfo #:change-datetime (assq-ref change-details - 'datetime)) - - (let ((new-files-count (length (narinfo-uris narinfo)))) - (metric-increment nar-files-metric - #:by new-files-count - ;; TODO This should be - ;; checked, rather than - ;; assumed to be false - #:label-values '((stored . "false")))))) + 'datetime)))) + ((string=? change "removal") (let ((store-path (assq-ref change-details 'data))) (log-msg 'INFO "processing removal change for " store-path " (" (assq-ref change-details 'datetime) ")") + (let* ((hash (store-path-hash-part store-path)) + (narinfo-details + (database-select-narinfo-by-hash + database + hash))) + (when storage-root (remove-nar-files-by-hash database storage-root metrics-registry - (store-path-hash-part store-path))) + hash)) + + (let ((cached-narinfo-files + (database-select-cached-narinfo-files-by-narinfo-id + database + (assq-ref narinfo-details 'id)))) + (for-each + (lambda (cached-narinfo-file-details) + ;; TODO Delete the file as well + + (let ((reply (make-channel))) + (put-message + cached-compression-management-channel + (list 'cached-narinfo-removed + (assq-ref narinfo-details 'id) + (assq-ref cached-narinfo-files 'compression) + (assq-ref cached-narinfo-files 'size) + reply)) + (get-message reply))) + cached-narinfo-files)) (database-remove-narinfo database store-path #:change-datetime (assq-ref change-details - 'datetime)))) + 'datetime))))) (else (error "unimplemented")))))) recent-changes)) (raise-exception (make-exception-with-message - (simple-format #f "unknown response: ~A\n code: ~A response: ~A" + (simple-format #f "unknown response: ~A code: ~A" (uri->string uri) - (response-code response) - (utf8->string body)))))))) + (response-code response)))))))) - (call-with-new-thread + (spawn-fiber (lambda () - ;; This will initialise the nar_files_total metric - (get-nar-files database storage-root metrics-registry) - (while #t (with-exception-handler (lambda (exn) @@ -165,4 +183,5 @@ request-recent-changes #:unwind? #t) + (log-msg 'DEBUG "finished requesting recent changes, sleeping") (sleep 60))))) diff --git a/nar-herder/recent-changes.scm b/nar-herder/recent-changes.scm index fee63f3..62bd604 100644 --- a/nar-herder/recent-changes.scm +++ b/nar-herder/recent-changes.scm @@ -18,15 +18,25 @@ (define-module (nar-herder recent-changes) #:use-module (srfi srfi-1) + #:use-module (ice-9 match) #:use-module (ice-9 threads) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (logging logger) + #:use-module (prometheus) + #:use-module (web uri) + #:use-module (guix narinfo) #:use-module (nar-herder database) - #:export (start-recent-change-removal-and-database-dump-thread)) + #:export (start-recent-change-removal-and-database-dump-fiber + start-recent-change-listener-fiber)) -(define (start-recent-change-removal-and-database-dump-thread database - database-dump-filename - check-interval - recent-changes-limit) +(define (start-recent-change-removal-and-database-dump-fiber database + metrics-registry + database-dump-filename + check-interval + recent-changes-limit) (define (update-database-dump) + (log-msg 'DEBUG "updating the database dump at " database-dump-filename) (let ((temp-database-dump-filename (string-append database-dump-filename ".tmp"))) @@ -41,23 +51,130 @@ (simple-format (current-error-port) "updated database dump\n"))) - (call-with-new-thread + (define recent-changes-count-metric + (metrics-registry-fetch-metric metrics-registry "recent_changes_count")) + + (spawn-fiber (lambda () (while #t - (let ((recent-changes-id-for-deletion - (database-get-recent-changes-id-for-deletion database - recent-changes-limit))) - (when recent-changes-id-for-deletion - (update-database-dump) - - (let ((deleted-recent-changes - (database-delete-recent-changes-with-id-below - database - recent-changes-id-for-deletion))) - (simple-format (current-error-port) - "deleted ~A recent changes\n" - deleted-recent-changes))) - - (sleep check-interval)))))) + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception in recent change removal thread: ~A\n" + exn) + (sleep 120)) + (lambda () + (let ((recent-changes-id-for-deletion + (database-get-recent-changes-id-for-deletion database + recent-changes-limit))) + (when recent-changes-id-for-deletion + (when database-dump-filename + (update-database-dump)) + + (let ((deleted-recent-changes + (database-call-with-transaction + database + (lambda _ + (database-delete-recent-changes-with-id-below + database + recent-changes-id-for-deletion))))) + + (metric-decrement recent-changes-count-metric + #:by deleted-recent-changes) + + (simple-format (current-error-port) + "deleted ~A recent changes\n" + deleted-recent-changes))) + + (sleep check-interval))) + #:unwind? #t))))) + +(define (start-recent-change-listener-fiber database + metrics-registry + addition-channel + removal-channel) + (define recent-changes-count-metric + (metrics-registry-fetch-metric metrics-registry "recent_changes_count")) + + (define (process-addition-change change-details) + (let ((narinfo + (call-with-input-string + (assq-ref change-details 'data) + (lambda (port) + (read-narinfo port + "https://narherderdummyvalue"))))) + (for-each + (lambda (uri) + (log-msg 'DEBUG "processing recent addition of " (uri-path uri)) + (put-message addition-channel (list 'addition (uri-path uri)))) + (narinfo-uris narinfo)))) + + (define (process-removal-change change-details) + (log-msg 'DEBUG "processing recent change triggered removal of " + (assq-ref change-details 'data)) + (put-message removal-channel + (list 'remove (assq-ref change-details 'data)))) + + (spawn-fiber + (lambda () + (let ((recent-changes-count + (database-count-recent-changes database))) + (metric-set recent-changes-count-metric recent-changes-count) + (log-msg 'DEBUG recent-changes-count " recent changes in the database")) + + (log-msg 'DEBUG "starting to listen for recent changes") + (let ((after-initial + (database-select-latest-recent-change-datetime database))) + (let loop ((after after-initial) + (last-processed-recent-changes + (database-select-recent-changes database after-initial))) + (sleep 10) + + (match + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception in recent change listener " exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (let* ((recent-changes + (database-select-recent-changes database after)) + (unprocessed-recent-changes + (remove + (lambda (change-details) + (member change-details last-processed-recent-changes)) + recent-changes))) + + (unless (null? unprocessed-recent-changes) + (log-msg 'INFO "processing " (length unprocessed-recent-changes) + " recent changes") + + (for-each + (lambda (change-details) + (let ((change (assq-ref change-details 'change))) + (cond + ((string=? change "addition") + (process-addition-change change-details)) + ((string=? change "removal") + (process-removal-change change-details)) + (else #f)))) + unprocessed-recent-changes) + (metric-increment recent-changes-count-metric + #:by (length unprocessed-recent-changes))) + ;; Use the unprocessed recent changes here to carry + ;; forward all processed changes to the next pass + unprocessed-recent-changes)) + (lambda _ + (backtrace)))) + #:unwind? #t) + (#f (loop after '())) + (recent-changes + (if (null? recent-changes) + (loop after last-processed-recent-changes) + (loop (assq-ref (last recent-changes) + 'datetime) + recent-changes))))))))) diff --git a/nar-herder/server.scm b/nar-herder/server.scm index 522ff3f..2e2c1e7 100644 --- a/nar-herder/server.scm +++ b/nar-herder/server.scm @@ -19,39 +19,65 @@ (define-module (nar-herder server) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) + #:use-module (srfi srfi-19) #:use-module (srfi srfi-34) + #:use-module (srfi srfi-71) + #:use-module (ice-9 ftw) + #:use-module (ice-9 iconv) #:use-module (ice-9 match) + #:use-module (ice-9 threads) #:use-module (ice-9 binary-ports) #:use-module (rnrs bytevectors) #:use-module (web uri) + #:use-module (web client) #:use-module (web response) #:use-module (web request) #:use-module (logging logger) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers scheduler) + #:use-module (fibers conditions) + #:use-module (fibers operations) #:use-module (prometheus) #:use-module (json) #:use-module ((system foreign) #:select (bytevector->pointer pointer->bytevector)) #:use-module (guix store) #:use-module (guix base32) + #:use-module (guix progress) #:use-module (guix serialization) #:use-module ((guix utils) #:select (decompressed-port)) #:use-module ((guix build utils) #:select (dump-port)) + #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (nar-herder database) #:use-module (nar-herder storage) + #:use-module (nar-herder utils) + #:use-module (nar-herder mirror) + #:use-module (nar-herder recent-changes) + #:use-module (nar-herder cached-compression) #:use-module (ice-9 textual-ports) - #:export (make-request-handler)) + #:export (%compression-options + + run-nar-herder-service + make-request-handler)) + +(define %compression-options + '(gzip lzip zstd none)) (define* (render-json json #:key (extra-headers '()) (code 200)) (values (build-response #:code code #:headers (append extra-headers - '((content-type . (application/json)) + '((content-type . (application/json + (charset . "utf-8"))) (vary . (accept))))) - (lambda (port) - (scm->json json port)))) + (call-with-encoded-output-string + "utf-8" + (lambda (port) + (scm->json json port))))) (define (parse-query-string query) (let lp ((lst (map uri-decode (string-split query (char-set #\& #\=))))) @@ -61,32 +87,6 @@ (("") '()) (() '())))) -(define (get-gc-metrics-updater registry) - (define metrics - `((gc-time-taken - . ,(make-gauge-metric registry "guile_gc_time_taken")) - (heap-size - . ,(make-gauge-metric registry "guile_heap_size")) - (heap-free-size - . ,(make-gauge-metric registry "guile_heap_free_size")) - (heap-total-allocated - . ,(make-gauge-metric registry "guile_heap_total_allocated")) - (heap-allocated-since-gc - . ,(make-gauge-metric registry "guile_allocated_since_gc")) - (protected-objects - . ,(make-gauge-metric registry "guile_gc_protected_objects")) - (gc-times - . ,(make-gauge-metric registry "guile_gc_times")))) - - (lambda () - (let ((stats (gc-stats))) - (for-each - (match-lambda - ((name . metric) - (let ((value (assq-ref stats name))) - (metric-set metric value)))) - metrics)))) - (define (serve-fixed-output-file input compression proc) ;; TODO It's hard with fold-archive from (guix serialization) to ;; read just the singular file from the archive, so the following @@ -121,10 +121,11 @@ (define (read-string p) (utf8->string (read-byte-string p))) - (let*-values (((port pids) - (decompressed-port - (string->symbol compression) - input))) + (let ((port + pids + (decompressed-port + (string->symbol compression) + input))) ;; The decompressor can be an external program, so wait for it to ;; exit @@ -147,26 +148,78 @@ (proc port size))))) +(define (add-cached-compressions-to-narinfo initial-narinfo-contents + cached-narinfo-files) + (let ((cached-nar-strings + (map (lambda (cached-nar-details) + (let ((compression + (symbol->string + (assq-ref cached-nar-details 'compression)))) + (string-append + "URL: nar/" compression "/" + (uri-encode + (store-path-base + (assq-ref cached-nar-details 'store-path))) + "\n" + "Compression: " compression "\n" + "FileSize: " (number->string + (assq-ref cached-nar-details 'size)) + "\n"))) + cached-narinfo-files))) + (string-append + initial-narinfo-contents + (string-join + cached-nar-strings + "\n")))) + (define* (make-request-handler database storage-root - #:key ttl negative-ttl logger - metrics-registry) + #:key base-ttl base-cached-compressions-ttl + negative-ttl logger + metrics-registry + maybe-trigger-creation-of-cached-nars + cached-compression-nar-requested-hook) + (define hostname + (gethostname)) + (define (narinfo? str) (and (= (string-length str) 40) (string-suffix? ".narinfo" str))) + (define plain-metrics-registry + (make-metrics-registry)) + (define gc-metrics-updater - (get-gc-metrics-updater metrics-registry)) + (get-gc-metrics-updater plain-metrics-registry)) + + (define process-metrics-updater + (get-process-metrics-updater plain-metrics-registry)) + + (define guile-time-metrics-updater + (let ((internal-real-time + (make-gauge-metric plain-metrics-registry "guile_internal_real_time")) + (internal-run-time + (make-gauge-metric plain-metrics-registry "guile_internal_run_time"))) + (lambda () + (metric-set internal-real-time + (get-internal-real-time)) + (metric-set internal-run-time + (get-internal-run-time))))) (define requests-total-metric (make-counter-metric metrics-registry "server_requests_total")) - (define (increment-request-metric category response-code) + (define* (increment-request-metric category response-code #:key (labels '())) (metric-increment requests-total-metric #:label-values `((category . ,category) - (response_code . ,response-code)))) + (response_code . ,response-code) + ,@labels))) + + (define %compression-strings + (map symbol->string + %compression-options)) (lambda (request body) (log-msg logger @@ -178,30 +231,81 @@ (match (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) - (('GET (? narinfo? narinfo)) - (let ((narinfo-contents + (((or 'HEAD 'GET) (? narinfo? narinfo)) + (let ((base-narinfo-contents + narinfo-id (database-select-narinfo-contents-by-hash database (string-take narinfo 32)))) (increment-request-metric "narinfo" - (if narinfo-contents + (if base-narinfo-contents "200" "404")) - (if narinfo-contents - (values `((content-type . (text/plain)) - ,@(if ttl - `((cache-control (max-age . ,ttl))) - '())) - narinfo-contents) + (if base-narinfo-contents + (let* ((cached-narinfo-files + (database-select-cached-narinfo-files-by-narinfo-id + database + narinfo-id)) + (narinfo-contents + (if (null? cached-narinfo-files) + base-narinfo-contents + (add-cached-compressions-to-narinfo + base-narinfo-contents + cached-narinfo-files))) + (potential-ttls + (remove + not + `(,(if (null? cached-narinfo-files) + base-ttl + base-cached-compressions-ttl) + + ,(and=> (database-select-scheduled-narinfo-removal + database + narinfo-id) + (lambda (scheduled-removal-time) + (list + (max + (- (time-second + (time-difference scheduled-removal-time + (current-time))) + 60) + 0)))) + + ,@(if (null? cached-narinfo-files) + '() + (map + (lambda (details) + (and=> + (database-select-scheduled-cached-narinfo-removal + database + (assq-ref details 'id)) + (lambda (scheduled-removal-time) + (max + (- (time-second + (time-difference scheduled-removal-time + (current-time))) + 60) + 0)))) + cached-narinfo-files))))) + (ttl + (cond + ((null? potential-ttls) #f) + (else (apply min potential-ttls))))) + + (values `((content-type . (text/plain)) + ,@(if ttl + `((cache-control (max-age . ,ttl))) + '())) + narinfo-contents)) (values (build-response #:code 404 #:headers (if negative-ttl `((cache-control (max-age . ,negative-ttl))) '())) "404")))) - (('GET (? narinfo? narinfo) "info") + (((or 'HEAD 'GET) (? narinfo? narinfo) "info") (let ((narinfo-contents (database-select-narinfo-contents-by-hash database @@ -220,42 +324,133 @@ (string-take narinfo 32))))) (values (build-response #:code 404) "404")))) - (('GET "nar" compression filename) - (let* ((hash (string-take filename 32)) + (((or 'HEAD 'GET) "nar" compression filename) + (let* ((hash (and (>= (string-length filename) 32) + (string-take filename 32))) + (narinfo + (and hash + (database-select-narinfo-by-hash + database + hash))) (narinfo-files - (database-select-narinfo-files - database - hash)) + (and=> (assq-ref narinfo 'id) + (lambda (id) + (database-select-narinfo-files-by-narinfo-id + database + id)))) (narinfo-file-for-compression (find (lambda (file) - (string=? (assq-ref file 'compression) - compression)) - narinfo-files))) - - (when (or narinfo-file-for-compression - ;; Check for a common compression to avoid lots of - ;; metrics being generated if compression is random - (member compression '("gzip" "lzip" "zstd"))) - (increment-request-metric - (string-append "nar/" - compression) - (if narinfo-file-for-compression "200" "404"))) + (and (string=? (assq-ref file 'compression) + compression) + (string=? + (last (string-split (assq-ref file 'url) + #\/)) + (uri-encode filename)))) + (or narinfo-files '()))) + (compression-symbol + (if (member + compression + %compression-strings + string=?) + (string->symbol compression) + #f))) (if narinfo-file-for-compression - (values (build-response - #:code 200 - #:headers `((X-Accel-Redirect - . ,(string-append - "/internal/nar/" - compression "/" - (uri-encode filename))))) - #f) - (values (build-response #:code 404) - "404")))) - (('GET "file" name algo hash) + (let ((loop? + (any + (lambda (via) + (string=? (last (string-split via #\space)) + hostname)) + (request-via request)))) + + (when (and (not loop?) + maybe-trigger-creation-of-cached-nars) + (maybe-trigger-creation-of-cached-nars + (assq-ref narinfo 'id))) + + (when loop? + (log-msg logger 'WARN + (request-method request) + " " + (uri-path (request-uri request)) + ": loop detected (" hostname "): " + (string-join (request-via request) ", "))) + + (increment-request-metric + (string-append "nar/" + compression) + (if loop? + "500" + "200") + #:labels + (let ((system (assq-ref narinfo 'system))) + (if (string? system) + `((system . ,system)) + '()))) + + (if loop? + (values (build-response #:code 500) + (simple-format #f "loop detected (~A): ~A\n" + hostname + (request-via request))) + (values (build-response + #:code 200 + #:headers `((X-Accel-Redirect + . ,(string-append + "/internal/nar/" + compression "/" + (uri-encode filename))))) + #f))) + (let ((cached-narinfo-file + (and narinfo ; must be a known hash + compression-symbol ; must be a known compression + ;; Check that the filename given in the + ;; request matches the narinfo store-path + (string=? filename + (basename + (assq-ref narinfo 'store-path))) + (database-select-cached-narinfo-file-by-hash + database + hash + compression-symbol)))) + + (when (or cached-narinfo-file + ;; Check for a common compression to avoid lots of + ;; metrics being generated if compression is random + compression-symbol) + (increment-request-metric + (string-append "nar/" + compression) + (if cached-narinfo-file "200" "404") + #:labels + (if cached-narinfo-file + (let ((system (assq-ref narinfo 'system))) + (if (string? system) + `((system . ,system)) + '())) + '()))) + + (when cached-narinfo-file + (cached-compression-nar-requested-hook compression-symbol + filename)) + + (if cached-narinfo-file + (values (build-response + #:code 200 + #:headers `((X-Accel-Redirect + . ,(string-append + "/internal/cached-nar/" + compression "/" + (uri-encode filename))))) + #f) + (values (build-response #:code 404) + "404")))))) + (((or 'HEAD 'GET) "file" name algo hash) (guard (c ((invalid-base32-character? c) (values (build-response #:code 404) - "404"))) + (if (eq? (request-method request) 'HEAD) + #f + "404")))) (let ((hash-bytevector (nix-base32-string->bytevector hash))) (if (and (string=? algo "sha256") (= 32 (bytevector-length hash-bytevector))) @@ -272,43 +467,54 @@ store-path-hash)) (selected-narinfo-file ;; TODO Select intelligently - (first narinfo-files))) + (if (null? narinfo-files) + #f + (first narinfo-files))) + (filename + (and selected-narinfo-file + (let ((filename + (string-append + storage-root + (uri-decode + (assq-ref selected-narinfo-file 'url))))) + (and (file-exists? filename) + filename))))) (increment-request-metric "file" - (if selected-narinfo-file "200" "404")) - - (if selected-narinfo-file - (let* ((url - (assq-ref selected-narinfo-file 'url)) - (filename - (string-append storage-root - (uri-decode url)))) - - (serve-fixed-output-file - (open-input-file filename) - (assq-ref selected-narinfo-file - 'compression) - (lambda (nar-port bytes) - (values `((content-type . (application/octet-stream - (charset . "ISO-8859-1"))) - (content-length . ,bytes)) - (lambda (output-port) - (dump-port nar-port - output-port - bytes) - - (close-port output-port)))))) + (if filename "200" "404")) + + (if filename + (serve-fixed-output-file + (open-input-file filename) + (assq-ref selected-narinfo-file + 'compression) + (lambda (nar-port bytes) + (values `((content-type . (application/octet-stream + (charset . "ISO-8859-1"))) + (content-length . ,bytes)) + (if (eq? (request-method request) 'HEAD) + #f + (lambda (output-port) + (dump-port nar-port + output-port + bytes) + + (close-port nar-port)))))) (values (build-response #:code 404) - "404"))) + (if (eq? (request-method request) 'HEAD) + #f + "404")))) (begin (increment-request-metric "file" "404") (values (build-response #:code 404) - "404")))))) + (if (eq? (request-method request) 'HEAD) + #f + "404"))))))) - (('GET "recent-changes") + (((or 'HEAD 'GET) "recent-changes") (let ((query-parameters (or (and=> (uri-query (request-uri request)) parse-query-string) @@ -323,7 +529,7 @@ (or (assoc-ref query-parameters "since") "1970-01-01 00:00:01")))))))) - (('GET "latest-database-dump") + (((or 'HEAD 'GET) "latest-database-dump") (increment-request-metric "latest-database-dump" "200") @@ -331,20 +537,389 @@ #:code 200 #:headers '((X-Accel-Redirect . "/internal/database/nar_herder_dump.db"))) #f)) - (('GET "metrics") + (((or 'HEAD 'GET) "metrics") (gc-metrics-updater) + (process-metrics-updater) + (guile-time-metrics-updater) + (update-database-metrics! database) (increment-request-metric "metrics" "200") (values (build-response #:code 200 #:headers '((content-type . (text/plain)) (vary . (accept)))) - (lambda (port) - (write-metrics metrics-registry - port)))) + (call-with-output-string + (lambda (port) + (write-metrics metrics-registry port) + (write-metrics plain-metrics-registry port))))) (_ (increment-request-metric "unhandled" "404") (values (build-response #:code 404) "404"))))) +(define* (run-nar-herder-service opts lgr) + (define (download-database) + (let ((database-uri + (string->uri + (string-append (assq-ref opts 'mirror) + "/latest-database-dump")))) + (with-port-timeouts + (lambda () + (call-with-values + (lambda () + (simple-format (current-error-port) + "starting downloading the database\n") + (let ((port + socket + (open-socket-for-uri* database-uri))) + (http-get database-uri + #:port port + #:streaming? #t))) + (lambda (response body) + (when (not (= (response-code response) 200)) + (error "unable to fetch database from mirror")) + + (let* ((reporter (progress-reporter/file + (uri->string database-uri) + (response-content-length response) + (current-error-port))) + (port + (progress-report-port + reporter + body + #:download-size (response-content-length response)))) + + (call-with-output-file (assq-ref opts 'database) + (lambda (output-port) + (dump-port port output-port))) + + (close-port port)) + + (simple-format (current-error-port) + "finished downloading the database\n")))) + #:timeout 30))) + + (define metrics-registry + (make-metrics-registry + #:namespace + "narherder")) + + (and=> + (assq-ref opts 'mirror) + (lambda (mirror) + (let ((database-file (assq-ref opts 'database))) + (if (file-exists? database-file) + (begin + ;; TODO Open the database, and check if the + ;; latest changes in the database are visible on + ;; the source to mirror. If they're not, then + ;; delete the database and download it to get + ;; back in sync + + #f) + (download-database))))) + + ;; Used elsewhere + (make-gauge-metric metrics-registry "recent_changes_count") + + (let ((recent-changes-metric + (make-gauge-metric metrics-registry "recent_changes_limit"))) + (metric-set recent-changes-metric (assq-ref opts 'recent-changes-limit))) + + (define maintenance-scheduler + (make-scheduler #:parallelism 1)) + + (let* ((database (setup-database (assq-ref opts 'database) + metrics-registry + #:reader-threads + (assq-ref opts 'database-reader-threads))) + (canonical-storage (and=> (assq-ref opts 'storage) + canonicalize-path)) + + (enabled-cached-compressions + (let ((explicit-cached-compression-directories + (filter-map + (match-lambda + (('cached-compression-directory . details) details) + (_ #f)) + opts)) + (cached-compression-directories-max-sizes + (filter-map + (match-lambda + (('cached-compression-directory-max-size . details) details) + (_ #f)) + opts)) + (cached-compression-ttls + (filter-map + (match-lambda + (('cached-compression-ttl . details) details) + (_ #f)) + opts)) + (cached-compression-new-ttls + (filter-map + (match-lambda + (('cached-compression-new-ttl . details) details) + (_ #f)) + opts)) + (cached-compression-unused-removal-durations + (filter-map + (match-lambda + (('cached-compression-unused-removal-duration . details) + details) + (_ #f)) + opts))) + + (filter-map + (match-lambda + (('cached-compression . details) + (let ((compression + (assq-ref details 'type))) + (cons compression + `(,@(alist-delete 'type details) + (directory + . ,(or (assq-ref explicit-cached-compression-directories + compression) + (simple-format #f "/var/cache/nar-herder/nar/~A" + compression))) + (directory-max-size + . ,(assq-ref cached-compression-directories-max-sizes + compression)) + (ttl + . ,(assq-ref cached-compression-ttls + compression)) + (new-ttl + . ,(assq-ref cached-compression-new-ttls + compression)) + (unused-removal-duration + . ,(assq-ref cached-compression-unused-removal-durations + compression)))))) + (_ #f)) + opts))) + + (cached-compression-min-uses + (assq-ref opts 'cached-compression-min-uses)) + + (cached-compression-management-channel + (if (null? enabled-cached-compressions) + #f + (start-cached-compression-management-fiber + database + metrics-registry + (or (assq-ref opts 'cached-compression-nar-source) + canonical-storage) + enabled-cached-compressions + cached-compression-min-uses + #:cached-compression-workers + (assq-ref opts 'cached-compression-workers) + #:scheduler maintenance-scheduler))) + + (maybe-trigger-creation-of-cached-nars + (if (null? enabled-cached-compressions) + #f + (lambda (narinfo-id) + (spawn-fiber + (lambda () + (put-message cached-compression-management-channel + (cons 'narinfo-id narinfo-id))) + maintenance-scheduler)))) + + (cached-compression-nar-requested-hook + (if (null? enabled-cached-compressions) + #f + (lambda (compression filename) + (spawn-fiber + (lambda () + (let* ((directory + (assq-ref (assq-ref enabled-cached-compressions + compression) + 'directory))) + (utime (string-append directory "/" filename)))) + maintenance-scheduler))))) + + (if (string=? (assq-ref opts 'database-dump) + "disabled") + (log-msg 'INFO "database dump disabled") + (when (not (file-exists? (assq-ref opts 'database-dump))) + (log-msg 'INFO "dumping database...") + (dump-database database (assq-ref opts 'database-dump)))) + + (let ((finished? (make-condition))) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name "maintenance")) + (const #t)) + + (run-fibers + (lambda () + (initialise-storage-metrics + database + canonical-storage + metrics-registry) + + (start-recent-change-removal-and-database-dump-fiber + database + metrics-registry + (let ((filename (assq-ref opts 'database-dump))) + (if (string=? filename "disabled") + #f + filename)) + (* 24 3600) ; 24 hours + (assq-ref opts 'recent-changes-limit)) + + (let ((mirror-channel + (and=> + (assq-ref opts 'mirror) + (lambda (mirror) + (start-fetch-changes-fiber + database + metrics-registry + canonical-storage ; might be #f, but that's fine here + mirror + cached-compression-management-channel) + + (if (assq-ref opts 'storage) + (start-mirroring-fiber database + mirror + (assq-ref opts 'storage-limit) + canonical-storage + metrics-registry) + #f)))) + (removal-channel + (let ((nar-removal-criteria + (filter-map + (match-lambda + ((key . val) + (if (eq? key 'storage-nar-removal-criteria) + val + #f))) + opts))) + (if (and (assq-ref opts 'storage) + (number? (assq-ref opts 'storage-limit)) + (not (null? nar-removal-criteria))) + (start-nar-removal-fiber database + canonical-storage + (assq-ref opts 'storage-limit) + metrics-registry + nar-removal-criteria) + #f))) + (addition-channel (make-channel))) + + (spawn-fiber + (lambda () + (while #t + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception processing addition-channel: " + exn)) + (lambda () + (match (get-message addition-channel) + (('addition file) + (apply update-nar-files-metric + metrics-registry + '() + (if (and canonical-storage + (file-exists? + (string-append canonical-storage + (uri-decode file)))) + '(#:stored-addition-count 1) + '(#:not-stored-addition-count 1))) + + (when mirror-channel + (spawn-fiber + (lambda () + (put-message mirror-channel + `(fetch ,file))))) + (when removal-channel + (spawn-fiber + (lambda () + (sleep 60) + (removal-channel-remove-nar-from-storage removal-channel + file) + (sleep (* 5 60)) + (removal-channel-remove-nar-from-storage removal-channel + file) + (sleep (* 15 60)) + (removal-channel-remove-nar-from-storage removal-channel + file) + (sleep 3600) + (removal-channel-remove-nar-from-storage removal-channel + file))))))) + #:unwind? #t)))) + + (start-recent-change-listener-fiber + database + metrics-registry + addition-channel + removal-channel)) + + (unless (null? enabled-cached-compressions) + (let ((cached-compression-removal-fiber-wakeup-channel + (start-cached-compression-removal-fiber + database + cached-compression-management-channel + enabled-cached-compressions))) + (start-cached-compression-schedule-removal-fiber + database + metrics-registry + cached-compression-management-channel + enabled-cached-compressions + cached-compression-removal-fiber-wakeup-channel + (or (assq-ref opts 'narinfo-ttl) + ;; Default from (guix substitutes) + (* 36 3600))))) + + (log-msg 'DEBUG "finished maintenance setup") + (wait finished?)) + #:scheduler maintenance-scheduler + #:hz 0 + #:parallelism 1))) + + (call-with-sigint + (lambda () + (run-fibers + (lambda () + (let* ((current (current-scheduler)) + (schedulers + (cons current (scheduler-remote-peers current)))) + (for-each + (lambda (i sched) + (spawn-fiber + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append "fibers " (number->string i)))) + (const #t))) + sched)) + (iota (length schedulers)) + schedulers)) + + (log-msg 'INFO "starting server, listening on " + (assq-ref opts 'host) ":" (assq-ref opts 'port)) + + (run-server/patched + (make-request-handler + database + canonical-storage + #:base-ttl (or (assq-ref opts 'new-narinfo-ttl) + (assq-ref opts 'narinfo-ttl)) + #:base-cached-compressions-ttl + (or (assq-ref opts 'new-cached-compressions-narinfo-ttl) + (assq-ref opts 'cached-compressions-narinfo-ttl)) + #:negative-ttl (assq-ref opts 'narinfo-negative-ttl) + #:logger lgr + #:metrics-registry metrics-registry + #:maybe-trigger-creation-of-cached-nars + maybe-trigger-creation-of-cached-nars + #:cached-compression-nar-requested-hook + cached-compression-nar-requested-hook) + #:host (assq-ref opts 'host) + #:port (assq-ref opts 'port)) + + (wait finished?)) + #:hz 0 + #:parallelism (assq-ref opts 'parallelism))) + finished?)))) diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm index c017685..e85d745 100644 --- a/nar-herder/storage.scm +++ b/nar-herder/storage.scm @@ -18,6 +18,7 @@ (define-module (nar-herder storage) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 threads) @@ -25,21 +26,28 @@ #:use-module (web uri) #:use-module (web client) #:use-module (web response) + #:use-module (fibers) + #:use-module (fibers channels) #:use-module (logging logger) #:use-module (logging port-log) #:use-module (prometheus) #:use-module (json) #:use-module ((guix build utils) #:select (dump-port mkdir-p)) #:use-module ((guix store) #:select (store-path-hash-part)) + #:use-module (guix progress) #:use-module (nar-herder utils) #:use-module (nar-herder database) #:export (store-item-in-local-storage? remove-nar-files-by-hash - get-nar-files + initialise-storage-metrics + update-nar-files-metric + check-storage - start-nar-removal-thread - start-mirroring-thread)) + removal-channel-remove-nar-from-storage + + start-nar-removal-fiber + start-mirroring-fiber)) (define (store-item-in-local-storage? database storage-root hash) (let ((narinfo-files (database-select-narinfo-files database hash))) @@ -52,17 +60,12 @@ (assq-ref file 'url))))) narinfo-files))) -(define (remove-nar-files-by-hash database storage-root metrics-registry - hash) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) - +(define* (remove-nar-files-by-hash database storage-root metrics-registry + hash + #:key (error-unless-files-to-remove? #t)) (let ((narinfo-files (database-select-narinfo-files database hash))) - (when (null? narinfo-files) + (when (and (null? narinfo-files) + error-unless-files-to-remove?) (error "no narinfo files")) (for-each (lambda (file) @@ -76,14 +79,21 @@ (remove-nar-from-storage storage-root (assq-ref file 'url))) - (metric-decrement nar-files-metric - #:label-values - `((stored . ,(if exists? "true" "false")))))) + (and=> (metrics-registry-fetch-metric metrics-registry + "nar_files_total") + (lambda (metric) + ;; Just update this metric if it exists, since if it + ;; does, it should be set to a value + (metric-decrement + metric + #:label-values `((stored . ,(if exists? "true" "false")))))))) narinfo-files))) (define (get-storage-size storage-root) (define enter? (const #t)) (define (leaf name stat result) + ;; Allow other fibers to run + (sleep 0) (+ result (or (and=> (stat:blocks stat) (lambda (blocks) @@ -159,50 +169,234 @@ (unrecognised-files . ,(hash-map->list (lambda (key _) key) files-hash))))) -(define* (get-nar-files database storage-root metrics-registry - #:key stored?) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) - - (let* ((index (index-storage database storage-root)) - (selected-files - (filter - (lambda (file) - (eq? (assq-ref file 'stored?) stored?)) - (assq-ref index 'narinfo-files)))) - - (let ((selected-files-count - (length selected-files)) - (all-files-count - (length (assq-ref index 'narinfo-files)))) - - (metric-set nar-files-metric - selected-files-count - #:label-values `((stored . ,(if stored? "true" "false")))) - (metric-set nar-files-metric - (- all-files-count selected-files-count) - #:label-values `((stored . ,(if stored? "false" "true"))))) - - selected-files)) - -(define (start-nar-removal-thread database - storage-root storage-limit - metrics-registry - nar-removal-criteria) +(define* (fold-nar-files database storage-root + proc init + #:key stored?) + (define stored-files-count 0) + (define not-stored-files-count 0) + + (let ((result + (database-fold-all-narinfo-files + database + (lambda (nar result) + (let* ((url + (uri-decode + (assq-ref nar 'url))) + (nar-stored? + (if storage-root + (file-exists? + (string-append storage-root url)) + #f))) + + (if nar-stored? + (set! stored-files-count (1+ stored-files-count)) + (set! not-stored-files-count (1+ not-stored-files-count))) + + (if (or (eq? stored? 'both) + (and stored? nar-stored?) + (and (not stored?) + (not nar-stored?))) + (proc nar result) + result))) + init))) + + (values result + `((stored . ,stored-files-count) + (not-stored . ,not-stored-files-count))))) + +(define* (update-nar-files-metric metrics-registry + nar-file-counts + #:key fetched-count removed-count + not-stored-addition-count + stored-addition-count) + + ;; Avoid incrementing or decrementing the metric if it hasn't been + ;; set yet + (when (or (metrics-registry-fetch-metric metrics-registry + "nar_files_total") + (= (length nar-file-counts) 2)) + + (let ((nar-files-metric + (or (metrics-registry-fetch-metric metrics-registry + "nar_files_total") + (make-gauge-metric metrics-registry + "nar_files_total" + #:labels '(stored))))) + + ;; Set the values if the counts are known + (and=> + (assq-ref nar-file-counts 'stored) + (lambda (stored-count) + (metric-set nar-files-metric + stored-count + #:label-values '((stored . "true"))))) + (and=> + (assq-ref nar-file-counts 'not-stored) + (lambda (not-stored-count) + (metric-set nar-files-metric + not-stored-count + #:label-values '((stored . "false"))))) + + ;; Then adjust by the fetched or removed counts + (when fetched-count + (metric-increment nar-files-metric + #:by fetched-count + #:label-values '((stored . "true"))) + (metric-decrement nar-files-metric + #:by fetched-count + #:label-values '((stored . "false")))) + (when removed-count + (metric-decrement nar-files-metric + #:by removed-count + #:label-values '((stored . "true"))) + (metric-increment nar-files-metric + #:by removed-count + #:label-values '((stored . "false")))) + + (when not-stored-addition-count + (metric-increment nar-files-metric + #:by not-stored-addition-count + #:label-values '((stored . "false")))) + + (when stored-addition-count + (metric-increment nar-files-metric + #:by stored-addition-count + #:label-values '((stored . "true"))))))) + +(define (initialise-storage-metrics database storage-root metrics-registry) + ;; Use a database transaction to block changes + (database-call-with-transaction + database + (lambda _ + (log-msg 'INFO "starting to initialise storage metrics") + (let ((_ + counts + (fold-nar-files + database + storage-root + (const #f) + #f + #:stored? 'both))) + (update-nar-files-metric + metrics-registry + counts)) + (log-msg 'INFO "finished initialising storage metrics")))) + +(define (check-storage database storage-root metrics-registry) + (define files-count + (database-count-narinfo-files database)) + + (call-with-progress-reporter + (progress-reporter/bar files-count + (simple-format #f "checking ~A files" files-count) + (current-error-port)) + (lambda (report) + (fold-nar-files + database + storage-root + (lambda (file _) + (let* ((full-filename + (string-append storage-root + (uri-decode (assq-ref file 'url)))) + (file-size + (stat:size (stat full-filename))) + (database-size + (assq-ref file 'size))) + (report) + (unless (= file-size database-size) + (newline) + (log-msg 'WARN "file " full-filename + " has inconsistent size (database: " + database-size ", file: " file-size ")")) + #f)) + #f + #:stored? 'both)))) + +(define (at-most max-length lst) + "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise +return its MAX-LENGTH first elements and its tail." + (let loop ((len 0) + (lst lst) + (result '())) + (match lst + (() + (values (reverse result) '())) + ((head . tail) + (if (>= len max-length) + (values (reverse result) lst) + (loop (+ 1 len) tail (cons head result))))))) + +(define %max-cached-connections + ;; Maximum number of connections kept in cache by + ;; 'open-connection-for-uri/cached'. + 16) + +(define open-socket-for-uri/cached + (let ((cache '())) + (lambda* (uri #:key fresh? verify-certificate?) + "Return a connection for URI, possibly reusing a cached connection. +When FRESH? is true, delete any cached connections for URI and open a new one. +Return #f if URI's scheme is 'file' or #f. + +When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." + (define host (uri-host uri)) + (define scheme (uri-scheme uri)) + (define key (list host scheme (uri-port uri))) + + (and (not (memq scheme '(file #f))) + (match (assoc-ref cache key) + (#f + ;; Open a new connection to URI and evict old entries from + ;; CACHE, if any. + (let ((socket + (open-socket-for-uri* + uri + #:verify-certificate? verify-certificate?)) + (new-cache evicted + (at-most (- %max-cached-connections 1) cache))) + (for-each (match-lambda + ((_ . port) + (false-if-exception (close-port port)))) + evicted) + (set! cache (alist-cons key socket new-cache)) + socket)) + (socket + (if (or fresh? (port-closed? socket)) + (begin + (false-if-exception (close-port socket)) + (set! cache (alist-delete key cache)) + (open-socket-for-uri/cached uri + #:verify-certificate? + verify-certificate?)) + (begin + ;; Drain input left from the previous use. + (drain-input socket) + socket)))))))) + +(define (call-with-cached-connection uri proc) + (let ((port (open-socket-for-uri/cached uri))) + (with-throw-handler #t + (lambda () + (proc port)) + (lambda _ + (close-port port))))) + +(define (removal-channel-remove-nar-from-storage + channel file) + (let ((reply (make-channel))) + (put-message channel (list 'remove-from-storage reply file)) + (get-message reply))) + +(define (start-nar-removal-fiber database + storage-root storage-limit + metrics-registry + nar-removal-criteria) (define storage-size-metric (make-gauge-metric metrics-registry "storage_size_bytes")) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) + (define removal-channel + (make-channel)) (define (check-removal-criteria nar criteria) (define narinfo @@ -222,21 +416,29 @@ (store-path-hash-part (assq-ref narinfo 'store-path)) ".narinfo/info")))) - (call-with-values - (lambda () - (retry-on-error + (with-port-timeouts + (lambda () + (call-with-values (lambda () - (http-get uri #:decode-body? #f)) - #:times 3 - #:delay 5)) - (lambda (response body) - (and (= (response-code response) - 200) - - (let ((json-body (json-string->scm - (utf8->string body)))) - (eq? (assoc-ref json-body "stored") - #t))))))))) + (retry-on-error + (lambda () + (call-with-cached-connection uri + (lambda (port) + (http-get uri + #:port port + #:decode-body? #f + #:keep-alive? #t + #:streaming? #t)))) + #:times 3 + #:delay 5)) + (lambda (response body) + (and (= (response-code response) + 200) + + (let ((json-body (json->scm body))) + (eq? (assoc-ref json-body "stored") + #t)))))) + #:timeout 30))))) (define (nar-can-be-removed? nar) (any (lambda (criteria) @@ -252,43 +454,97 @@ (metric-set storage-size-metric initial-storage-size) - (let loop ((storage-size - initial-storage-size) - (stored-nar-files - (with-time-logging "getting stored nar files" - (get-nar-files database storage-root metrics-registry - #:stored? #t)))) - ;; Look through items in local storage, check if the removal - ;; criteria have been met, and if so, delete it - - (unless (null? stored-nar-files) - (let ((nar-to-consider (car stored-nar-files))) - (if (nar-can-be-removed? nar-to-consider) - (begin - (remove-nar-from-storage - storage-root - (uri-decode - (assq-ref nar-to-consider 'url))) - - (metric-decrement nar-files-metric - #:label-values '((stored . "true"))) - (metric-increment nar-files-metric - #:label-values '((stored . "false"))) - - (let ((storage-size-estimate - (- storage-size - (assq-ref nar-to-consider 'size)))) - (when (> storage-size-estimate storage-limit) - (loop storage-size-estimate - (cdr stored-nar-files))))) - (loop storage-size - (cdr stored-nar-files))))))) - (log-msg 'INFO "finished looking for nars to remove")) + ;; Look through items in local storage, check if the removal + ;; criteria have been met, and if so, delete it + (let ((result + nar-file-counts + (fold-nar-files + database + storage-root + (lambda (nar result) + (match result + ((storage-size . removed-count) + (if (and (> storage-size storage-limit) + (nar-can-be-removed? nar)) + (let ((response + (removal-channel-remove-nar-from-storage + removal-channel + (assq-ref nar 'url)))) + + (if (eq? response 'removed) + (let ((storage-size-estimate + (- storage-size + (assq-ref nar 'size)))) + (cons storage-size-estimate + (+ removed-count 1))) + (cons storage-size + removed-count))) + (cons storage-size + removed-count))))) + (cons initial-storage-size 0) + #:stored? #t))) + + (match result + ((storage-size . removed-count) + + (log-msg 'INFO "finished looking for nars to remove, removed " + removed-count " files")))))) (when (null? nar-removal-criteria) (error "must be some removal criteria")) - (call-with-new-thread + (spawn-fiber + (lambda () + (while #t + (match (get-message removal-channel) + (('remove-from-storage reply file) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "nar remove from storage failed (" + file "): " exn) + (put-message reply + (cons 'exn exn))) + (lambda () + (with-throw-handler #t + (lambda () + (cond + ((not (file-exists? + (string-append storage-root + (uri-decode file)))) + (put-message reply 'does-not-exist)) + ((not (nar-can-be-removed? + `((url . ,file)))) + (put-message reply + 'removal-criteria-not-met)) + (else + (remove-nar-from-storage + storage-root + (uri-decode file)) + + (update-nar-files-metric + metrics-registry + '() + #:removed-count 1) + + (put-message reply 'removed)))) + (lambda _ + (backtrace)))) + #:unwind? #t)) + (('remove file) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to remove " file ": " exn)) + (lambda () + ;; TODO: Do more checking at this point + (remove-nar-from-storage + storage-root + (uri-decode file)) + (update-nar-files-metric metrics-registry + '() + #:removed-count 1)) + #:unwind? #t)))))) + + (spawn-fiber (lambda () (while #t (with-exception-handler @@ -296,11 +552,12 @@ (log-msg 'ERROR "nar removal pass failed " exn)) run-removal-pass #:unwind? #t) + (sleep (* 60 60 24))))) - (sleep 300))))) + removal-channel) -(define (start-mirroring-thread database mirror storage-limit storage-root - metrics-registry) +(define (start-mirroring-fiber database mirror storage-limit storage-root + metrics-registry) (define no-storage-limit? (not (integer? storage-limit))) @@ -309,13 +566,6 @@ (make-gauge-metric metrics-registry "storage_size_bytes")) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) - (define (fetch-file file) (let* ((string-url (string-append mirror file)) @@ -333,83 +583,147 @@ (when (file-exists? tmp-file-name) (delete-file tmp-file-name)) - (call-with-values - (lambda () - (http-get uri - #:decode-body? #f - #:streaming? #t)) - (lambda (response body) - (unless (= (response-code response) - 200) - (error "unknown response code")) - - (call-with-output-file tmp-file-name - (lambda (output-port) - (dump-port body output-port))) - (rename-file tmp-file-name - destination-file-name) - - (metric-increment nar-files-metric - #:label-values '((stored . "true"))) - (metric-decrement nar-files-metric - #:label-values '((stored . "false"))))))) + (with-exception-handler + (lambda (exn) + (when (file-exists? tmp-file-name) + (delete-file tmp-file-name)) + + (raise-exception exn)) + (lambda () + (with-port-timeouts + (lambda () + (call-with-values + (lambda () + (let ((port + socket + (open-socket-for-uri* uri))) + (http-get uri + #:port port + #:decode-body? #f + #:streaming? #t))) + (lambda (response body) + (unless (= (response-code response) + 200) + (error "unknown response code" + (response-code response))) + + (call-with-output-file tmp-file-name + (lambda (output-port) + (dump-port body output-port)))))) + #:timeout 30)) + #:unwind? #t) + + (rename-file tmp-file-name + destination-file-name) + + (update-nar-files-metric + metrics-registry + '() + #:fetched-count 1))) (define (download-nars initial-storage-size) ;; If there's free space, then consider downloading missing nars - (when (< initial-storage-size storage-limit) - (let loop ((storage-size initial-storage-size) - (missing-nar-files (get-nar-files - database storage-root metrics-registry - #:stored? #f))) - (unless (null? missing-nar-files) - (let ((file (car missing-nar-files))) - (log-msg 'DEBUG "considering " - (assq-ref file 'url)) - (let ((file-bytes (assq-ref file 'size))) - (if (or no-storage-limit? - (< (+ storage-size file-bytes) - storage-limit)) - (let ((success? - (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "failed to fetch " - (assq-ref file 'url) - ": " exn) - #f) - (lambda () - (fetch-file (assq-ref file 'url)) - #t) - #:unwind? #t))) - (loop (if success? - (+ storage-size file-bytes) - storage-size) - (cdr missing-nar-files))) - ;; This file won't fit, so try the next one - (loop storage-size - (cdr missing-nar-files))))))))) + (if (< initial-storage-size storage-limit) + (let ((result + nar-file-counts + (fold-nar-files + database + storage-root + (lambda (file result) + (log-msg 'DEBUG "considering " + (assq-ref file 'url)) + (match result + ((storage-size . fetched-count) + (let ((file-bytes (assq-ref file 'size))) + (if (or no-storage-limit? + (< (+ storage-size file-bytes) + storage-limit)) + (let ((success? + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to fetch " + (assq-ref file 'url) + ": " exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (retry-on-error + (lambda () + (fetch-file (assq-ref file 'url))) + #:times 3 + #:delay 5)) + (lambda _ + (backtrace))) + #t) + #:unwind? #t))) + (if success? + (cons (+ storage-size file-bytes) + (1+ fetched-count)) + result)) + ;; This file won't fit, so try the next one + result))))) + initial-storage-size + #:stored? #f))) + + (match result + ((storage-size . fetched-count) + fetched-count))) + 0)) (define (fast-download-nars) (define parallelism 3) - (let ((missing-nar-files (get-nar-files - database storage-root metrics-registry - #:stored? #f))) - (n-par-for-each - parallelism - (lambda (file) - (log-msg 'DEBUG "considering " - (assq-ref file 'url)) - (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "failed to fetch " - (assq-ref file 'url) - ": " exn) - #f) - (lambda () - (fetch-file (assq-ref file 'url)) - #t) - #:unwind? #t)) - missing-nar-files))) + (let ((channel (make-channel))) + (for-each + (lambda _ + (spawn-fiber + (lambda () + (let loop ((fetched-count 0)) + (match (get-message channel) + (('finished . reply) + (put-message reply fetched-count)) + (url + (log-msg 'DEBUG "considering " url) + (loop + (+ fetched-count + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to fetch " url ": " exn) + 0) + (lambda () + (retry-on-error + (lambda () + (fetch-file url)) + #:times 3 + #:delay 5) + 1) + #:unwind? #t))))))))) + (iota parallelism)) + + (let ((result + nar-file-counts + (fold-nar-files + database + storage-root + (lambda (nar _) + (put-message channel + (assq-ref nar 'url)) + #f) + #f + #:stored? #f))) + + (let* ((reply-channel (make-channel)) + (fetched-count + (apply + + + (map + (lambda _ + (put-message channel + (cons 'finished reply-channel)) + (get-message reply-channel)) + (iota parallelism))))) + fetched-count)))) (define (run-mirror-pass) (log-msg 'DEBUG "running mirror pass") @@ -417,18 +731,38 @@ (get-storage-size storage-root)))) (metric-set storage-size-metric initial-storage-size) - (if no-storage-limit? - (fast-download-nars) - (download-nars initial-storage-size))) - (log-msg 'DEBUG "finished mirror pass")) - - (call-with-new-thread - (lambda () - (while #t - (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "mirror pass failed " exn)) - run-mirror-pass - #:unwind? #t) - - (sleep 300))))) + (let ((fetched-count + (if no-storage-limit? + (fast-download-nars) + (download-nars initial-storage-size)))) + (log-msg 'DEBUG "finished mirror pass (fetched " fetched-count " nars)")))) + + (let ((channel (make-channel))) + (spawn-fiber + (lambda () + (while #t + (match (get-message channel) + ('full-pass + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "mirror pass failed " exn)) + run-mirror-pass + #:unwind? #t)) + (('fetch file) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to mirror " file ": " exn)) + (lambda () + (unless (file-exists? + (string-append storage-root + (uri-decode file))) + (fetch-file file))) + #:unwind? #t)))))) + + (spawn-fiber + (lambda () + (while #t + (put-message channel 'full-pass) + (sleep (* 60 60 24))))) + + channel)) diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index 2d62360..4261b05 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -18,44 +18,37 @@ (define-module (nar-herder utils) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) ; time #:use-module (ice-9 q) - ;; #:use-module (ice-9 ftw) - ;; #:use-module (ice-9 popen) #:use-module (ice-9 iconv) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) + #:use-module (ice-9 atomic) #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) + #:use-module (ice-9 suspendable-ports) + #:use-module ((ice-9 ports internal) #:select (port-poll + port-read-wait-fd + port-write-wait-fd)) #:use-module (web uri) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (fibers) + #:use-module (fibers timers) #:use-module (fibers channels) + #:use-module (fibers scheduler) #:use-module (fibers conditions) - ;; #:use-module (gcrypt pk-crypto) - ;; #:use-module (gcrypt hash) - ;; #:use-module (gcrypt random) - ;; #:use-module (json) - ;; #:use-module (guix pki) - ;; #:use-module (guix utils) - ;; #:use-module (guix config) - ;; #:use-module (guix store) - ;; #:use-module (guix status) - ;; #:use-module (guix base64) - ;; #:use-module (guix scripts substitute) - #:export (call-with-streaming-http-request - &chunked-input-ended-prematurely - chunked-input-ended-prematurely-error? - make-chunked-input-port* - - make-worker-thread-channel + #:use-module (fibers operations) + #:use-module ((guix build syscalls) + #:select (set-thread-name)) + #:export (make-worker-thread-set call-with-worker-thread call-with-time-logging @@ -65,171 +58,20 @@ create-work-queue - check-locale!)) - -;; Chunked Responses -(define (read-chunk-header port) - "Read a chunk header from PORT and return the size in bytes of the -upcoming chunk." - (match (read-line port) - ((? eof-object?) - ;; Connection closed prematurely: there's nothing left to read. - (error "chunked input ended prematurely")) - (str - (let ((extension-start (string-index str - (lambda (c) - (or (char=? c #\;) - (char=? c #\return)))))) - (string->number (if extension-start ; unnecessary? - (substring str 0 extension-start) - str) - 16))))) - -(define &chunked-input-ended-prematurely - (make-exception-type '&chunked-input-error-prematurely - &external-error - '())) + check-locale! -(define make-chunked-input-ended-prematurely-error - (record-constructor &chunked-input-ended-prematurely)) - -(define chunked-input-ended-prematurely-error? - (record-predicate &chunked-input-ended-prematurely)) - -(define* (make-chunked-input-port* port #:key (keep-alive? #f)) - (define (close) - (unless keep-alive? - (close-port port))) - - (define chunk-size 0) ;size of the current chunk - (define remaining 0) ;number of bytes left from the current chunk - (define finished? #f) ;did we get all the chunks? - - (define (read! bv idx to-read) - (define (loop to-read num-read) - (cond ((or finished? (zero? to-read)) - num-read) - ((zero? remaining) ;get a new chunk - (let ((size (read-chunk-header port))) - (set! chunk-size size) - (set! remaining size) - (cond - ((zero? size) - (set! finished? #t) - (get-bytevector-n port 2) ; \r\n follows the last chunk - num-read) - (else - (loop to-read num-read))))) - (else ;read from the current chunk - (let* ((ask-for (min to-read remaining)) - (read (get-bytevector-n! port bv (+ idx num-read) - ask-for))) - (cond - ((eof-object? read) ;premature termination - (raise-exception - (make-chunked-input-ended-prematurely-error))) - (else - (let ((left (- remaining read))) - (set! remaining left) - (when (zero? left) - ;; We're done with this chunk; read CR and LF. - (get-u8 port) (get-u8 port)) - (loop (- to-read read) - (+ num-read read))))))))) - (loop to-read 0)) - - (make-custom-binary-input-port "chunked input port" read! #f #f close)) - -(define* (make-chunked-output-port* port #:key (keep-alive? #f) - (buffering 1200) - report-bytes-sent) - (define heap-allocated-limit - (expt 2 20)) ;; 1MiB - - (define (%put-string s) - (unless (string-null? s) - (let* ((bv (string->bytevector s "ISO-8859-1")) - (length (bytevector-length bv))) - (put-string port (number->string length 16)) - (put-string port "\r\n") - (put-bytevector port bv) - (put-string port "\r\n") - - (when report-bytes-sent - (report-bytes-sent length)) - (let* ((stats (gc-stats)) - (initial-gc-times - (assq-ref stats 'gc-times))) - (when (> (assq-ref stats 'heap-allocated-since-gc) - heap-allocated-limit) - (while (let ((updated-stats (gc-stats))) - (= (assq-ref updated-stats 'gc-times) - initial-gc-times)) - (gc) - (usleep 50))))))) - - (define (%put-char c) - (%put-string (list->string (list c)))) - - (define (flush) #t) - (define (close) - (put-string port "0\r\n\r\n") - (force-output port) - (unless keep-alive? - (close-port port))) - (let ((ret (make-soft-port - (vector %put-char %put-string flush #f close) "w"))) - (setvbuf ret 'block buffering) - ret)) - -(define* (call-with-streaming-http-request uri callback - #:key (headers '()) - (method 'PUT) - report-bytes-sent) - (let* ((port (open-socket-for-uri uri)) - (request - (build-request - uri - #:method method - #:version '(1 . 1) - #:headers `((connection close) - (Transfer-Encoding . "chunked") - (Content-Type . "application/octet-stream") - ,@headers) - #:port port))) - - (set-port-encoding! port "ISO-8859-1") - (setvbuf port 'block (expt 2 13)) - (with-exception-handler - (lambda (exp) - (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp) - (close-port port) - (raise-exception exp)) - (lambda () - (let ((request (write-request request port))) - (let* ((chunked-output-port - (make-chunked-output-port* - port - #:buffering (expt 2 12) - #:keep-alive? #t - #:report-bytes-sent report-bytes-sent))) - - ;; A SIGPIPE will kill Guile, so ignore it - (sigaction SIGPIPE - (lambda (arg) - (simple-format (current-error-port) "warning: SIGPIPE\n"))) - - (set-port-encoding! chunked-output-port "ISO-8859-1") - (callback chunked-output-port) - (close-port chunked-output-port) - - (let ((response (read-response port))) - (let ((body (read-response-body response))) - (close-port port) - (values response - body))))))))) - -(define* (retry-on-error f #:key times delay ignore) + open-socket-for-uri* + + call-with-sigint + run-server/patched + + timeout-error? + + port-read-timeout-error? + port-write-timeout-error? + with-port-timeouts)) + +(define* (retry-on-error f #:key times delay ignore error-hook) (let loop ((attempt 1)) (match (with-exception-handler (lambda (exn) @@ -259,15 +101,26 @@ upcoming chunk." times)) (apply values return-values)) ((#f . exn) - (if (>= attempt times) + (if (>= attempt + (- times 1)) (begin (simple-format (current-error-port) - "error: ~A:\n ~A,\n giving up after ~A attempts\n" + "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n" f exn - times) - (raise-exception exn)) + attempt + times + delay) + (when error-hook + (error-hook attempt exn)) + (sleep delay) + (simple-format + (current-error-port) + "running last retry of ~A after ~A failed attempts\n" + f + attempt) + (f)) (begin (simple-format (current-error-port) @@ -277,71 +130,11 @@ upcoming chunk." attempt times delay) + (when error-hook + (error-hook attempt exn)) (sleep delay) (loop (+ 1 attempt)))))))) -(define delay-logging-fluid - (make-thread-local-fluid)) -(define delay-logging-depth-fluid - (make-thread-local-fluid 0)) - -(define (log-delay proc duration) - (and=> (fluid-ref delay-logging-fluid) - (lambda (recorder) - (recorder proc duration)))) - -(define* (call-with-delay-logging proc #:key (threshold 1) (args '())) - (let ((start (get-internal-real-time)) - (trace '()) - (root-logger? (eq? #f (fluid-ref delay-logging-fluid)))) - - (define (format-seconds seconds) - (format #f "~4f" seconds)) - - (call-with-values - (lambda () - (with-fluid* delay-logging-depth-fluid - (+ 1 (fluid-ref delay-logging-depth-fluid)) - (lambda () - (if root-logger? - (with-fluid* delay-logging-fluid - (lambda (proc duration) - (set! trace - (cons (list proc - duration - (fluid-ref delay-logging-depth-fluid)) - trace)) - #t) - (lambda () - (apply proc args))) - (apply proc args))))) - (lambda vals - (let ((elapsed-seconds - (/ (- (get-internal-real-time) - start) - internal-time-units-per-second))) - (if (and (> elapsed-seconds threshold) - root-logger?) - (let ((lines - (cons - (simple-format #f "warning: delay of ~A seconds: ~A" - (format-seconds elapsed-seconds) - proc) - (map (match-lambda - ((proc duration depth) - (string-append - (make-string (* 2 depth) #\space) - (simple-format #f "~A: ~A" - (format-seconds duration) - proc)))) - trace)))) - (display (string-append - (string-join lines "\n") - "\n"))) - (unless root-logger? - ((fluid-ref delay-logging-fluid) proc elapsed-seconds)))) - (apply values vals))))) - (define (call-with-time-logging name thunk) (let ((start (current-time time-utc))) (call-with-values @@ -364,7 +157,9 @@ upcoming chunk." (define* (create-work-queue thread-count-parameter proc #:key thread-start-delay (thread-stop-delay - (make-time time-duration 0 0))) + (make-time time-duration 0 0)) + (name "unnamed") + priority<?) (let ((queue (make-q)) (queue-mutex (make-mutex)) (job-available (make-condition-variable)) @@ -384,11 +179,26 @@ upcoming chunk." (else thread-count-parameter))) - (define (process-job . args) - (with-mutex queue-mutex - (enq! queue args) - (start-new-threads-if-necessary (get-thread-count)) - (signal-condition-variable job-available))) + (define process-job + (if priority<? + (lambda* (args #:key priority) + (with-mutex queue-mutex + (enq! queue (cons priority args)) + (set-car! + queue + (stable-sort! (car queue) + (lambda (a b) + (priority<? + (car a) + (car b))))) + (sync-q! queue) + (start-new-threads-if-necessary (get-thread-count)) + (signal-condition-variable job-available))) + (lambda args + (with-mutex queue-mutex + (enq! queue args) + (start-new-threads-if-necessary (get-thread-count)) + (signal-condition-variable job-available))))) (define (count-threads) (with-mutex queue-mutex @@ -403,11 +213,12 @@ upcoming chunk." (define (list-jobs) (with-mutex queue-mutex - (append (list-copy - (car queue)) + (append (if priority<? + (map cdr (car queue)) + (list-copy (car queue))) (hash-fold (lambda (key val result) - (or (and val - (cons val result)) + (if val + (cons val result) result)) '() running-job-args)))) @@ -416,16 +227,17 @@ upcoming chunk." (with-exception-handler (lambda (exn) (simple-format (current-error-port) - "job raised exception: ~A\n" - job-args)) + "~A work queue, job raised exception ~A: ~A\n" + name job-args exn)) (lambda () (with-throw-handler #t (lambda () (apply proc job-args)) (lambda (key . args) - (simple-format (current-error-port) - "exception when handling job: ~A ~A\n" - key args) + (simple-format + (current-error-port) + "~A work queue, exception when handling job: ~A ~A\n" + name key args) (backtrace)))) #:unwind? #t)) @@ -453,6 +265,13 @@ upcoming chunk." (call-with-new-thread (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append name " q t " + (number->string thread-index)))) + (const #t)) + (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex queue-mutex) @@ -469,9 +288,13 @@ upcoming chunk." ;; the job in the mean time (if (q-empty? queue) #f - (deq! queue)) + (if priority<? + (cdr (deq! queue)) + (deq! queue))) #f) - (deq! queue)))) + (if priority<? + (cdr (deq! queue)) + (deq! queue))))) (if job-args (begin @@ -499,32 +322,38 @@ upcoming chunk." (define start-new-threads-if-necessary (let ((previous-thread-started-at (make-time time-monotonic 0 0))) (lambda (desired-count) - (let* ((thread-count - (hash-count (const #t) running-job-args)) - (threads-to-start - (- desired-count thread-count))) - (when (> threads-to-start 0) - (for-each - (lambda (thread-index) - (when (eq? (hash-ref running-job-args - thread-index - 'slot-free) - 'slot-free) - (let* ((now (current-time time-monotonic)) - (elapsed (time-difference now - previous-thread-started-at))) - (when (or (eq? #f thread-start-delay) - (time>=? elapsed thread-start-delay)) - (set! previous-thread-started-at now) - (hash-set! running-job-args - thread-index - #f) - (start-thread thread-index))))) - (iota desired-count))))))) + (let* ((thread-count + (hash-count (const #t) running-job-args)) + (threads-to-start + (- desired-count thread-count))) + (when (> threads-to-start 0) + (for-each + (lambda (thread-index) + (when (eq? (hash-ref running-job-args + thread-index + 'slot-free) + 'slot-free) + (let* ((now (current-time time-monotonic)) + (elapsed (time-difference now + previous-thread-started-at))) + (when (or (eq? #f thread-start-delay) + (time>=? elapsed thread-start-delay)) + (set! previous-thread-started-at now) + (hash-set! running-job-args + thread-index + #f) + (start-thread thread-index))))) + (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append name " q t"))) + (const #t)) + (while #t (sleep 15) (with-mutex queue-mutex @@ -565,83 +394,171 @@ falling back to en_US.utf8\n" (setlocale LC_ALL "")) #:unwind? #t)) -(define %worker-thread-args - (make-parameter #f)) - -(define* (make-worker-thread-channel initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - destructor - lifetime - (log-exception? (const #t))) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." +(define-record-type <worker-thread-set> + (worker-thread-set channel arguments-parameter) + worker-thread-set? + (channel worker-thread-set-channel) + (arguments-parameter worker-thread-set-arguments-parameter)) + +(define* (make-worker-thread-set initializer + #:key (parallelism 1) + (delay-logger (lambda _ #f)) + (duration-logger (const #f)) + destructor + lifetime + (log-exception? (const #t)) + (expire-on-exception? #f) + (name "unnamed")) + (define param + (make-parameter #f)) + + (define (initializer/safe) + (let ((args + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running initializer in worker thread (~A): ~A:\n ~A\n" + name + initializer + exn) + #f) + (lambda () + (with-throw-handler #t + initializer + (lambda args + (backtrace)))) + #:unwind? #t))) + + (if args + args + ;; never give up, just keep retrying + (begin + (sleep 5) + (initializer/safe))))) + + (define (destructor/safe args) + (let ((success? + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running destructor in worker thread (~A): ~A:\n ~A\n" + name + initializer + exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (apply destructor args) + #t) + (lambda _ + (backtrace)))) + #:unwind? #t))) + + (or success? + #t + (begin + (sleep 5) + (destructor/safe args))))) + (let ((channel (make-channel))) (for-each - (lambda _ + (lambda (thread-index) (call-with-new-thread (lambda () - (let init ((args (initializer))) - (parameterize ((%worker-thread-args args)) + (catch 'system-error + (lambda () + (set-thread-name + (string-append + name " w t " + (number->string thread-index)))) + (const #t)) + + (let init ((args (initializer/safe))) + (parameterize ((param args)) (let loop ((current-lifetime lifetime)) - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second)) - (put-message - reply - (let ((start-time (get-internal-real-time))) - (with-exception-handler - (lambda (exn) - (list 'worker-thread-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (call-with-values - (lambda () - (apply proc args)) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) + (let ((exception? + (match (get-message channel) + (((? channel? reply) sent-time (? procedure? proc)) + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay internal-time-units-per-second) - vals)))) - (lambda args - (when (match args - (('%exception exn) - (log-exception? exn)) - (_ #t)) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" args) - (backtrace))))) - #:unwind? #t)))))) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))) + proc) + + (let* ((start-time (get-internal-real-time)) + (response + (with-exception-handler + (lambda (exn) + (list 'worker-thread-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))) + (lambda args + (when (match args + (('%exception exn) + (log-exception? exn)) + (_ #t)) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" args) + (backtrace))))) + #:unwind? #t))) + (put-message reply + response) + + (match response + (('worker-thread-error duration _) + (when duration-logger + (duration-logger duration proc)) + #t) + ((duration . _) + (when duration-logger + (duration-logger duration proc)) + #f)))))))) + + (unless (and expire-on-exception? + exception?) + (if (number? current-lifetime) + (unless (< current-lifetime 0) + (loop (if current-lifetime + (- current-lifetime 1) + #f))) + (loop #f)))))) + (when destructor - (apply destructor args)) - (init (initializer)))))) + (destructor/safe args)) + + (init (initializer/safe)))))) (iota parallelism)) - channel)) -(define* (call-with-worker-thread channel proc #:key duration-logger) + (worker-thread-set channel + param))) + +(define* (call-with-worker-thread record proc #:key duration-logger) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) + (let ((args ((worker-thread-set-arguments-parameter record)))) (if args (apply proc args) (let ((reply (make-channel))) - (put-message channel (list reply (get-internal-real-time) proc)) + (put-message (worker-thread-set-channel record) + (list reply (get-internal-real-time) proc)) (match (get-message reply) (('worker-thread-error duration exn) (when duration-logger @@ -651,3 +568,220 @@ If already in the worker thread, call PROC immediately." (when duration-logger (duration-logger duration)) (apply values result))))))) + +(define* (open-socket-for-uri* uri + #:key (verify-certificate? #t)) + (define tls-wrap + (@@ (web client) tls-wrap)) + + (define https? + (eq? 'https (uri-scheme uri))) + + (define plain-uri + (if https? + (build-uri + 'http + #:userinfo (uri-userinfo uri) + #:host (uri-host uri) + #:port (or (uri-port uri) 443) + #:path (uri-path uri) + #:query (uri-query uri) + #:fragment (uri-fragment uri)) + uri)) + + (let ((s (open-socket-for-uri plain-uri))) + (values + (if https? + (let ((port + (tls-wrap s (uri-host uri) + #:verify-certificate? verify-certificate?))) + ;; Guile/guile-gnutls don't handle the handshake happening on a non + ;; blocking socket, so change the behavior here. + (let ((flags (fcntl s F_GETFL))) + (fcntl s F_SETFL (logior O_NONBLOCK flags))) + port) + (let ((flags (fcntl s F_GETFL))) + (fcntl s F_SETFL (logior O_NONBLOCK flags)) + s)) + s))) + +;; Copied from (fibers web server) +(define (call-with-sigint thunk cvar) + (let ((handler #f)) + (dynamic-wind + (lambda () + (set! handler + (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) + thunk + (lambda () + (if handler + ;; restore Scheme handler, SIG_IGN or SIG_DFL. + (sigaction SIGINT (car handler) (cdr handler)) + ;; restore original C handler. + (sigaction SIGINT #f)))))) + +;; This variant of run-server from the fibers library supports running +;; multiple servers within one process. +(define run-server/patched + (let ((fibers-web-server-module + (resolve-module '(fibers web server)))) + + (define set-nonblocking! + (module-ref fibers-web-server-module 'set-nonblocking!)) + + (define make-default-socket + (module-ref fibers-web-server-module 'make-default-socket)) + + (define socket-loop + (module-ref fibers-web-server-module 'socket-loop)) + + (lambda* (handler + #:key + (host #f) + (family AF_INET) + (addr (if host + (inet-pton family host) + INADDR_LOOPBACK)) + (port 8080) + (socket (make-default-socket family addr port))) + ;; We use a large backlog by default. If the server is suddenly hit + ;; with a number of connections on a small backlog, clients won't + ;; receive confirmation for their SYN, leading them to retry -- + ;; probably successfully, but with a large latency. + (listen socket 1024) + (set-nonblocking! socket) + (sigaction SIGPIPE SIG_IGN) + (spawn-fiber (lambda () (socket-loop socket handler)))))) + +;; These procedure are subject to spurious wakeups. + +(define (readable? port) + "Test if PORT is writable." + (match (select (vector port) #() #() 0) + ((#() #() #()) #f) + ((#(_) #() #()) #t))) + +(define (writable? port) + "Test if PORT is writable." + (match (select #() (vector port) #() 0) + ((#() #() #()) #f) + ((#() #(_) #()) #t))) + +(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) + (make-base-operation #f + (lambda _ + (and (ready? (port-ready-fd port)) values)) + (lambda (flag sched resume) + (define (commit) + (match (atomic-box-compare-and-swap! flag 'W 'S) + ('W (resume values)) + ('C (commit)) + ('S #f))) + (schedule-when-ready + sched (port-ready-fd port) commit)))) + +(define (wait-until-port-readable-operation port) + "Make an operation that will succeed when PORT is readable." + (unless (input-port? port) + (error "refusing to wait forever for input on non-input port")) + (make-wait-operation readable? schedule-task-when-fd-readable port + port-read-wait-fd + wait-until-port-readable-operation)) + +(define (wait-until-port-writable-operation port) + "Make an operation that will succeed when PORT is writable." + (unless (output-port? port) + (error "refusing to wait forever for output on non-output port")) + (make-wait-operation writable? schedule-task-when-fd-writable port + port-write-wait-fd + wait-until-port-writable-operation)) + + + +(define &port-timeout + (make-exception-type '&port-timeout + &external-error + '(thunk port))) + +(define make-port-timeout-error + (record-constructor &port-timeout)) + +(define port-timeout-error? + (record-predicate &port-timeout)) + +(define &port-read-timeout + (make-exception-type '&port-read-timeout + &port-timeout + '())) + +(define make-port-read-timeout-error + (record-constructor &port-read-timeout)) + +(define port-read-timeout-error? + (record-predicate &port-read-timeout)) + +(define &port-write-timeout + (make-exception-type '&port-write-timeout + &port-timeout + '())) + +(define make-port-write-timeout-error + (record-constructor &port-write-timeout)) + +(define port-write-timeout-error? + (record-predicate &port-write-timeout)) + +(define* (with-port-timeouts thunk + #:key timeout + (read-timeout timeout) + (write-timeout timeout)) + (define (no-fibers-wait thunk port mode timeout) + (define poll-timeout-ms 200) + + ;; When the GC runs, it restarts the poll syscall, but the timeout + ;; remains unchanged! When the timeout is longer than the time + ;; between the syscall restarting, I think this renders the + ;; timeout useless. Therefore, this code uses a short timeout, and + ;; repeatedly calls poll while watching the clock to see if it has + ;; timed out overall. + (let ((timeout-internal + (+ (get-internal-real-time) + (* timeout internal-time-units-per-second)))) + (let loop ((poll-value + (port-poll port mode poll-timeout-ms))) + (if (= poll-value 0) + (if (> (get-internal-real-time) + timeout-internal) + (raise-exception + (if (string=? mode "r") + (make-port-read-timeout-error thunk port) + (make-port-write-timeout-error thunk port))) + (loop (port-poll port mode poll-timeout-ms))) + poll-value)))) + + (parameterize + ((current-read-waiter + (lambda (port) + (if (current-scheduler) + (perform-operation + (choice-operation + (wait-until-port-readable-operation port) + (wrap-operation + (sleep-operation read-timeout) + (lambda () + (raise-exception + (make-port-read-timeout-error thunk port)))))) + (no-fibers-wait thunk port "r" read-timeout)))) + (current-write-waiter + (lambda (port) + (if (current-scheduler) + (perform-operation + (choice-operation + (wait-until-port-writable-operation port) + (wrap-operation + (sleep-operation write-timeout) + (lambda () + (raise-exception + (make-port-write-timeout-error thunk port)))))) + (no-fibers-wait thunk port "w" write-timeout))))) + (thunk))) |