aboutsummaryrefslogtreecommitdiff
path: root/nar-herder
diff options
context:
space:
mode:
Diffstat (limited to 'nar-herder')
-rw-r--r--nar-herder/cached-compression.scm753
-rw-r--r--nar-herder/database.scm1006
-rw-r--r--nar-herder/mirror.scm85
-rw-r--r--nar-herder/recent-changes.scm159
-rw-r--r--nar-herder/server.scm795
-rw-r--r--nar-herder/storage.scm730
-rw-r--r--nar-herder/utils.scm836
7 files changed, 3524 insertions, 840 deletions
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm
new file mode 100644
index 0000000..5c257dc
--- /dev/null
+++ b/nar-herder/cached-compression.scm
@@ -0,0 +1,753 @@
+;;; Nar Herder
+;;;
+;;; Copyright © 2022, 2023 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (nar-herder cached-compression)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-19)
+ #:use-module (srfi srfi-26)
+ #:use-module (srfi srfi-71)
+ #:use-module (ice-9 ftw)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 threads)
+ #:use-module (logging logger)
+ #:use-module (prometheus)
+ #:use-module (fibers)
+ #:use-module (fibers timers)
+ #:use-module (fibers channels)
+ #:use-module (fibers operations)
+ #:use-module (web uri)
+ #:use-module (web client)
+ #:use-module (web response)
+ #:use-module (guix store)
+ #:use-module ((guix utils) #:select (compressed-file?
+ call-with-decompressed-port))
+ #:use-module ((guix build utils)
+ #:select (dump-port mkdir-p))
+ #:use-module (nar-herder utils)
+ #:use-module (nar-herder database)
+ #:export (start-cached-compression-management-fiber
+ start-cached-compression-removal-fiber
+ start-cached-compression-schedule-removal-fiber))
+
+;; Nar caching overview
+;;
+;; On start
+;; - Compute the size of each cached compression directory
+;; - Remove database entries if they're missing from the directory
+;; - Remove files if they're missing from the database
+;;
+;; On nar usage
+;; - Bump count
+;; - If count is sufficient
+;; - Trigger generation of the cached nar
+;; - Skip if the work queue already includes this job
+;; - At the start of the job, check for a database entry and exit
+;; early if one exists bumping the atime of the file
+;; - If the file doesn't exist, check if there's free space
+;; - If there's free space, create the file
+;;
+;; Nar removal fiber
+;; - Periodically remove nars that have been scheduled for removal
+;; (when the scheduled time passes)
+;;
+;; Nar schedule removal fiber
+;; - Periodically check for nars that haven't been accessed in some
+;; time and schedule them for removal
+
+(define (perform-cached-compression-startup database
+ enabled-cached-compressions
+ nar-cache-files)
+ (let* ((database-entries-missing-files
+ ;; List tracking entries in the database for cached files,
+ ;; where the file is missing from the disk.
+ ;;
+ ;; These database entries will be removed at the end of the
+ ;; startup procecss.
+ '())
+
+ ;; alist of compression to hash table of files
+ ;;
+ ;; Entries from the hash tables will be removed when the
+ ;; database entry is processed below, so these hash tables
+ ;; will be left reflecting files in the directories, but with
+ ;; no entry in the database.
+ ;;
+ ;; These files will be deleted at the end of the startup
+ ;; process.
+ (files-by-compression
+ (map
+ (match-lambda
+ ((compression . details)
+ (let ((result (make-hash-table)))
+ (for-each
+ (lambda (file)
+ (hash-set! result file #t))
+ (scandir (assq-ref details 'directory)
+ (negate (cut member <> '("." "..")))))
+ (cons compression
+ result))))
+ enabled-cached-compressions))
+
+ (cached-bytes-by-compression
+ (database-fold-cached-narinfo-files
+ database
+ (lambda (details result)
+ (let ((compression
+ (assq-ref details 'compression))
+ (filename
+ (store-path-base
+ (assq-ref details 'store-path))))
+
+ (let ((files-hash
+ (assq-ref files-by-compression compression)))
+ (if (hash-ref files-hash filename)
+ (begin
+ (hash-remove! files-hash filename)
+
+ (metric-increment nar-cache-files
+ #:label-values
+ `((compression . ,compression)))
+
+ `((,compression . ,(+ (assq-ref details 'size)
+ (or (assq-ref result compression)
+ 0)))
+ ,@(alist-delete compression result)))
+
+ ;; Database entry, but file missing
+ (begin
+ (set! database-entries-missing-files
+ (cons details
+ database-entries-missing-files))
+ result)))))
+ (map
+ (lambda (compression)
+ (cons compression 0))
+ (map car enabled-cached-compressions)))))
+
+ ;; Delete cached files with no database entries
+ (for-each
+ (match-lambda
+ ((compression . hash-table)
+ (let ((count (hash-count (const #t)
+ hash-table)))
+ (unless (= 0 count)
+ (let ((directory
+ (assq-ref
+ (assq-ref enabled-cached-compressions compression)
+ 'directory)))
+ (simple-format #t "deleting ~A cached files from ~A\n"
+ count
+ directory)
+ (hash-for-each
+ (lambda (filename _)
+ (delete-file (string-append directory "/" filename)))
+ hash-table))))))
+ files-by-compression)
+
+ ;; Delete database entries where the file is missing
+ (let ((count (length database-entries-missing-files)))
+ (unless (= 0 count)
+ (simple-format
+ #t
+ "deleting ~A cached_narinfo_files entries due to missing files\n"
+ count)
+
+ (for-each
+ (lambda (details)
+ (database-remove-cached-narinfo-file database
+ (assq-ref details 'narinfo-id)
+ (symbol->string
+ (assq-ref details 'compression))))
+ database-entries-missing-files)))
+
+ cached-bytes-by-compression))
+
+;; This fiber manages metadata around cached compressions, and
+;; delegates tasks to the thread pool to generate newly compressed
+;; nars
+(define* (start-cached-compression-management-fiber
+ database
+ metrics-registry
+ nar-source
+ enabled-cached-compressions
+ cached-compression-min-uses
+ #:key (cached-compression-workers 2)
+ scheduler)
+
+ (define nar-cache-bytes-metric
+ (make-gauge-metric metrics-registry
+ "nar_cache_size_bytes"
+ #:labels '(compression)))
+
+ (define nar-cache-files
+ (make-gauge-metric metrics-registry
+ "nar_cache_files"
+ #:labels '(compression)))
+
+ (define channel
+ (make-channel))
+
+ (let ((process-job
+ count-jobs
+ count-threads
+ list-jobs
+ (create-work-queue cached-compression-workers
+ (lambda (thunk)
+ (thunk))
+ #:name "cached compression")))
+
+ (define (consider-narinfo cached-bytes-by-compression
+ usage-hash-table
+ narinfo-id)
+ (let* ((existing-cached-files
+ ;; This is important both to avoid trying to create
+ ;; files twice, but also in the case where additional
+ ;; compressions are enabled and more files need to be
+ ;; generated
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+ (existing-compressions
+ (map (lambda (details)
+ (assq-ref details 'compression))
+ existing-cached-files))
+ (missing-compressions
+ (lset-difference eq?
+ (map car enabled-cached-compressions)
+ existing-compressions))
+ (narinfo-files
+ (database-select-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+
+ (compress-file?
+ (let ((url (assq-ref (first narinfo-files) 'url)))
+ ;; TODO: Maybe this should be configurable?
+ (not (compressed-file? url)))))
+
+ (when (and compress-file?
+ (not (null? missing-compressions)))
+ (let ((new-count
+ (let ((val (+ 1
+ (or (hash-ref usage-hash-table narinfo-id)
+ 0))))
+ (hash-set! usage-hash-table
+ narinfo-id
+ val)
+ val)))
+
+ (when (and (> new-count
+ cached-compression-min-uses))
+ (let* ((narinfo-details
+ (database-select-narinfo database narinfo-id))
+ (nar-size
+ (assq-ref narinfo-details 'nar-size))
+ (compressions-with-space
+ (filter
+ (lambda (compression)
+ (let ((directory-max-size
+ (assq-ref
+ (assq-ref enabled-cached-compressions
+ compression)
+ 'directory-max-size))
+ (current-directory-size
+ (assq-ref cached-bytes-by-compression
+ compression)))
+
+ (unless current-directory-size
+ (log-msg 'ERROR "current-directory-size unset: "
+ current-directory-size))
+ (unless nar-size
+ (log-msg 'ERROR "nar-size unset: "
+ nar-size))
+
+ (if directory-max-size
+ (< (+ current-directory-size
+ ;; Assume the commpressed nar could be
+ ;; as big as the uncompressed nar
+ nar-size)
+ directory-max-size)
+ #t)))
+ missing-compressions)))
+ (for-each
+ (lambda (compression)
+ (spawn-fiber
+ (lambda ()
+ (process-job
+ (lambda ()
+ (let ((new-bytes
+ (make-compressed-nar
+ narinfo-files
+ nar-source
+ enabled-cached-compressions
+ compression
+ #:level (assq-ref
+ (assq-ref enabled-cached-compressions
+ compression)
+ 'level))))
+ (put-message channel
+ (list 'cached-narinfo-added
+ narinfo-id
+ compression
+ new-bytes
+ #f))))))))
+ compressions-with-space)))))))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((initial-cached-bytes-by-compression
+ (perform-cached-compression-startup
+ database
+ enabled-cached-compressions
+ nar-cache-files))
+ (nar-cached-compression-usage-hash-table
+ (make-hash-table 65536)))
+
+ (for-each
+ (match-lambda
+ ((compression . bytes)
+ (metric-set
+ nar-cache-bytes-metric
+ bytes
+ #:label-values `((compression . ,compression)))))
+ initial-cached-bytes-by-compression)
+
+ (let loop ((cached-bytes-by-compression
+ initial-cached-bytes-by-compression))
+ (match (get-message channel)
+ (('narinfo-id . narinfo-id)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception considering narinfo ("
+ narinfo-id "): " exn)
+ #f)
+ (lambda ()
+ (consider-narinfo cached-bytes-by-compression
+ nar-cached-compression-usage-hash-table
+ narinfo-id))
+ #:unwind? #t)
+ (loop cached-bytes-by-compression))
+
+ (((and (or 'cached-narinfo-added 'cached-narinfo-removed)
+ action)
+ narinfo-id compression size reply)
+ (let ((updated-bytes
+ ((if (eq? action 'cached-narinfo-added)
+ +
+ -)
+ (or (assq-ref cached-bytes-by-compression
+ compression)
+ 0)
+ size)))
+
+ (metric-set
+ nar-cache-bytes-metric
+ updated-bytes
+ #:label-values `((compression . ,compression)))
+
+ ((if (eq? action 'cached-narinfo-added)
+ metric-increment
+ metric-decrement)
+ nar-cache-files
+ #:label-values `((compression . ,compression)))
+
+ ;; Use an explicit transaction as it handles the
+ ;; database being busy,
+ (database-call-with-transaction
+ database
+ (lambda _
+ (if (eq? action 'cached-narinfo-added)
+ (database-insert-cached-narinfo-file
+ database
+ narinfo-id
+ size
+ compression)
+ (let ((cached-narinfo-details
+ (database-select-cached-narinfo-file-by-narinfo-id-and-compression
+ database
+ narinfo-id
+ compression)))
+
+ ;; It might not have been scheduled for
+ ;; removal, but remove any schedule that
+ ;; exists
+ (let ((schedule-removed?
+ (database-delete-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-details 'id))))
+ (when schedule-removed?
+ (metric-decrement
+ (metrics-registry-fetch-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total"))))
+
+ ;; Remove all the database entries first, as
+ ;; that'll stop these files appearing in narinfos
+ (database-remove-cached-narinfo-file
+ database
+ narinfo-id
+ (symbol->string compression))))))
+
+ (hash-remove! nar-cached-compression-usage-hash-table
+ narinfo-id)
+
+ (when reply
+ (put-message reply #t))
+
+ (loop (alist-cons
+ compression
+ updated-bytes
+ (alist-delete compression
+ cached-bytes-by-compression)))))))))
+ scheduler)
+
+ channel))
+
+;; Periodically check for nars that haven't been accessed in some time
+;; and schedule them for removal
+(define (start-cached-compression-schedule-removal-fiber
+ database
+ metrics-registry
+ cached-compression-management-channel
+ enabled-cached-compressions
+ cached-compression-removal-fiber-wakeup-channel
+ base-ttl)
+
+ (define (files-to-schedule-for-removal compression-details)
+ (let* ((directory (assq-ref compression-details 'directory))
+ (unused-removal-duration
+ (assq-ref compression-details 'unused-removal-duration))
+ (atime-threshold
+ (time-second
+ (subtract-duration (current-time)
+ unused-removal-duration))))
+ (scandir
+ directory
+ (lambda (filename)
+ (and
+ (< (stat:atime (stat (string-append directory "/" filename)))
+ atime-threshold)
+ (not (member filename '("." ".."))))))))
+
+ (define (schedule-removal compression compression-details)
+ (let ((files
+ (let ((files
+ (with-time-logging "files-to-schedule-for-removal"
+ (files-to-schedule-for-removal compression-details))))
+ (log-msg 'INFO "cached-compression-schedule-removal-fiber "
+ "looking at " (length files) " files")
+ files))
+ (count-metric
+ (metrics-registry-fetch-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total")))
+
+ (with-time-logging "inserting scheduled-cached-narinfo-removals"
+ (for-each
+ (lambda (file)
+ (let* ((cached-narinfo-file-details
+ (database-select-cached-narinfo-file-by-hash
+ database
+ (string-take file 32) ; hash part
+ compression))
+ (existing-scheduled-removal
+ (database-select-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-file-details 'id))))
+ (unless existing-scheduled-removal
+ (let ((removal-time
+ ;; The earliest this can be removed is the current
+ ;; time, plus the TTL
+ (add-duration
+ (current-time)
+ (make-time time-duration
+ 0
+ (or (assq-ref compression-details 'ttl)
+ base-ttl)))))
+ (database-insert-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-file-details 'id)
+ removal-time)
+
+ (metric-increment count-metric)))))
+ files))
+
+ ;; Wake the cached compression removal fiber in case one of
+ ;; the new scheduled removals is before it's scheduled to wake
+ ;; up
+ (put-message cached-compression-removal-fiber-wakeup-channel
+ #t)))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((sleep-duration
+ (max
+ (* 60 60 6) ; Maybe this be confirgurable
+ (apply min
+ (map (lambda (compression-details)
+ (/ (time-second
+ (assq-ref compression-details
+ 'unused-removal-duration))
+ 4))
+ enabled-cached-compressions))))
+ (start-count
+ (database-count-scheduled-cached-narinfo-removal
+ database)))
+
+ (metric-set
+ (or (metrics-registry-fetch-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total")
+ (make-gauge-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total"))
+ start-count)
+
+ (while #t
+ (log-msg 'INFO "cached-compression-schedule-removal-fiber starting pass")
+
+ (for-each
+ (match-lambda
+ ((compression . details)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "cached-compression-schedule-removal-fiber: "
+ "exception: " exn))
+ (lambda ()
+ (schedule-removal compression details))
+ #:unwind? #t)))
+ enabled-cached-compressions)
+
+ (log-msg 'INFO "cached-compression-schedule-removal-fiber sleeping for "
+ sleep-duration)
+ (sleep sleep-duration))))))
+
+;; Process the scheduled removals for cached nars
+(define (start-cached-compression-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions)
+
+ (define wakeup-channel
+ (make-channel))
+
+ (define (make-pass)
+ (log-msg 'INFO "cached-compression-removal-fiber starting pass")
+
+ (let ((scheduled-cached-narinfo-removal
+ (database-select-oldest-scheduled-cached-narinfo-removal
+ database)))
+
+ (log-msg 'INFO "scheduled removal: " scheduled-cached-narinfo-removal)
+
+ (if scheduled-cached-narinfo-removal
+ (if (time<=? (assq-ref scheduled-cached-narinfo-removal
+ 'scheduled-removal-time)
+ (current-time))
+ (let ((id (assq-ref scheduled-cached-narinfo-removal 'id))
+ (narinfo-id (assq-ref scheduled-cached-narinfo-removal
+ 'narinfo-id))
+ (compression (assq-ref scheduled-cached-narinfo-removal
+ 'compression))
+ (size (assq-ref scheduled-cached-narinfo-removal
+ 'size))
+ (store-path (assq-ref scheduled-cached-narinfo-removal
+ 'store-path)))
+ (let ((reply (make-channel)))
+ (put-message
+ cached-compression-management-channel
+ (list 'cached-narinfo-removed
+ narinfo-id
+ compression
+ size
+ reply))
+
+ ;; Wait for the management fiber to delete the
+ ;; database entry before removing the file.
+ (get-message reply))
+
+ (let ((directory
+ (assq-ref (assq-ref enabled-cached-compressions
+ compression)
+ 'directory)))
+ (let ((filename
+ (string-append
+ directory "/"
+ (basename store-path))))
+ (log-msg 'INFO "deleting " filename)
+ (delete-file filename))))
+
+ (let ((duration
+ (time-difference
+ (assq-ref scheduled-cached-narinfo-removal
+ 'scheduled-removal-time)
+ (current-time))))
+ (perform-operation
+ (choice-operation
+ (sleep-operation (max 1 (+ 1 (time-second duration))))
+ (get-operation wakeup-channel)))))
+
+ ;; Sleep until woken
+ (get-message wakeup-channel))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception in cached-compression-removal-fiber: "
+ exn))
+ (lambda ()
+ (with-throw-handler #t
+ make-pass
+ (lambda _
+ (backtrace))))
+ #:unwind? #t))))
+
+ wakeup-channel)
+
+(define* (make-compressed-nar narinfo-files
+ nar-source
+ enabled-cached-compressions
+ target-compression
+ #:key level)
+ (define cached-compression-details
+ (assq-ref enabled-cached-compressions target-compression))
+
+ (log-msg 'INFO "making " target-compression " for "
+ (uri-decode
+ (basename
+ (assq-ref (first narinfo-files) 'url))))
+
+ (let* ((source-narinfo-file
+ ;; There's no specific logic to this, it should be possible
+ ;; to use any file
+ (first narinfo-files))
+ (source-filename
+ (cond
+ ((string-prefix? "http" nar-source)
+ (let* ((output-port (mkstemp "/tmp/nar-herder-source-nar-XXXXXX"))
+ (filename
+ (port-filename output-port))
+ (uri
+ (string->uri
+ (string-append nar-source
+ (assq-ref source-narinfo-file 'url)))))
+
+ (log-msg 'DEBUG "downloading " (uri->string uri))
+ (with-exception-handler
+ (lambda (exn)
+ (close-port output-port)
+ (delete-file filename)
+ (raise-exception exn))
+ (lambda ()
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (let ((port
+ socket
+ (open-socket-for-uri* uri)))
+ (http-get uri
+ #:port port
+ #:decode-body? #f
+ #:streaming? #t)))
+ (lambda (response body)
+ (unless (= (response-code response)
+ 200)
+ (error "unknown response code"
+ (response-code response)))
+
+ (dump-port body output-port)))
+ (close-port output-port))
+ #:timeout 30))
+ #:unwind? #t)
+
+ filename))
+ ((string-prefix? "/" nar-source)
+ (string-append
+ ;; If it's a filename, then it's the canonical path to
+ ;; the storage directory
+ nar-source
+ (uri-decode (assq-ref source-narinfo-file 'url))))
+ (else
+ (error "unknown nar source")))))
+
+ (let* ((dest-directory
+ (assq-ref cached-compression-details
+ 'directory))
+ (dest-filename
+ (string-append
+ dest-directory
+ "/"
+ (uri-decode
+ (basename
+ (assq-ref source-narinfo-file 'url)))))
+ (tmp-dest-filename
+ (string-append dest-filename ".tmp")))
+
+ (when (file-exists? tmp-dest-filename)
+ (delete-file tmp-dest-filename))
+ (when (file-exists? dest-filename)
+ (delete-file dest-filename))
+
+ (mkdir-p dest-directory)
+
+ (call-with-input-file
+ source-filename
+ (lambda (source-port)
+ (call-with-decompressed-port
+ (string->symbol
+ (assq-ref source-narinfo-file
+ 'compression))
+ source-port
+ (lambda (decompressed-source-port)
+ (let ((call-with-compressed-output-port*
+ (match target-compression
+ ('gzip
+ (@ (zlib) call-with-gzip-output-port))
+ ('lzip
+ (@ (lzlib) call-with-lzip-output-port))
+ ('zstd
+ (@ (zstd) call-with-zstd-output-port))
+ ('none
+ (lambda (port proc)
+ (proc port))))))
+ (apply
+ call-with-compressed-output-port*
+ (open-output-file tmp-dest-filename)
+ (lambda (compressed-port)
+ (dump-port decompressed-source-port
+ compressed-port))
+ (if level
+ `(#:level ,level)
+ '())))))))
+ (rename-file
+ tmp-dest-filename
+ dest-filename)
+
+ (when (string-prefix? "http" nar-source)
+ (log-msg 'DEBUG "deleting temporary file " source-filename)
+ (delete-file source-filename))
+
+ (let ((bytes
+ (stat:size (stat dest-filename))))
+ (log-msg 'INFO "created " dest-filename)
+
+ bytes))))
diff --git a/nar-herder/database.scm b/nar-herder/database.scm
index 61f5bb1..ded7c2c 100644
--- a/nar-herder/database.scm
+++ b/nar-herder/database.scm
@@ -28,10 +28,12 @@
#:use-module (sqlite3)
#:use-module (fibers)
#:use-module (prometheus)
+ #:use-module (guix store)
#:use-module (guix narinfo)
#:use-module (guix derivations)
#:use-module (nar-herder utils)
#:export (setup-database
+ update-database-metrics!
database-optimize
database-spawn-fibers
@@ -42,8 +44,11 @@
database-insert-narinfo
database-remove-narinfo
+ database-select-narinfo
+ database-select-narinfo-by-hash
database-select-narinfo-contents-by-hash
+ database-count-recent-changes
database-select-recent-changes
database-select-latest-recent-change-datetime
database-get-recent-changes-id-for-deletion
@@ -51,8 +56,25 @@
database-select-narinfo-for-file
database-select-narinfo-files
-
- database-map-all-narinfo-files))
+ database-select-narinfo-files-by-narinfo-id
+
+ database-fold-all-narinfo-files
+ database-map-all-narinfo-files
+ database-count-narinfo-files
+
+ database-insert-cached-narinfo-file
+ database-select-cached-narinfo-file-by-hash
+ database-select-cached-narinfo-file-by-narinfo-id-and-compression
+ database-select-cached-narinfo-files-by-narinfo-id
+ database-fold-cached-narinfo-files
+ database-remove-cached-narinfo-file
+
+ database-select-scheduled-narinfo-removal
+ database-select-scheduled-cached-narinfo-removal
+ database-delete-scheduled-cached-narinfo-removal
+ database-select-oldest-scheduled-cached-narinfo-removal
+ database-count-scheduled-cached-narinfo-removal
+ database-insert-scheduled-cached-narinfo-removal))
(define-record-type <database>
(make-database database-file reader-thread-channel writer-thread-channel
@@ -85,7 +107,8 @@ CREATE TABLE narinfos (
nar_size INTEGER NOT NULL,
deriver TEXT,
system TEXT,
- contents NOT NULL
+ contents NOT NULL,
+ added_at TEXT
);
CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32));
@@ -97,6 +120,8 @@ CREATE TABLE narinfo_files (
url TEXT NOT NULL
);
+CREATE INDEX narinfo_files_narinfo_id ON narinfo_files (narinfo_id);
+
CREATE TABLE narinfo_references (
narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
store_path TEXT NOT NULL
@@ -120,28 +145,124 @@ CREATE TABLE recent_changes (
datetime TEXT NOT NULL,
change TEXT NOT NULl,
data TEXT NOT NULL
+);
+
+CREATE TABLE cached_narinfo_files (
+ id INTEGER PRIMARY KEY ASC,
+ narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
+ size INTEGER NOT NULL,
+ compression TEXT
+);
+
+CREATE INDEX cached_narinfo_files_narinfo_id
+ ON cached_narinfo_files (narinfo_id);
+
+CREATE TABLE scheduled_narinfo_removal (
+ narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id),
+ removal_datetime TEXT NOT NULL
+);
+
+CREATE TABLE scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id),
+ removal_datetime TEXT NOT NULL
);")
(sqlite-exec db schema))
-(define (update-schema db)
+(define (table-exists? db name)
(let ((statement
(sqlite-prepare
db
"
-SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))
+SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = :name")))
(sqlite-bind-arguments
statement
- #:name "narinfos")
+ #:name name)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#f #f)
+ (#(1) #t))))
+ (sqlite-finalize statement)
- (match (sqlite-step statement)
- (#f (perform-initial-database-setup db))
- (_ #f))
+ result)))
+
+(define (column-exists? db table-name column-name)
+ (let ((statement
+ (sqlite-prepare
+ db
+ (simple-format #f "PRAGMA table_info(~A);" table-name))))
+
+ (let ((columns
+ (sqlite-map
+ (lambda (row)
+ (vector-ref row 1))
+ statement)))
+ (sqlite-finalize statement)
+
+ (member column-name columns))))
+
+(define (update-schema db)
+ (unless (table-exists? db "narinfos")
+ (perform-initial-database-setup db))
+
+ (unless (table-exists? db "cached_narinfo_files")
+ (sqlite-exec
+ db
+ "
+CREATE TABLE cached_narinfo_files (
+ id INTEGER PRIMARY KEY ASC,
+ narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
+ size INTEGER NOT NULL,
+ compression TEXT NOT NULL
+);
- (sqlite-finalize statement)))
+CREATE INDEX cached_narinfo_files_narinfo_id
+ ON cached_narinfo_files (narinfo_id);"))
+
+ (unless (column-exists? db "narinfos" "added_at")
+ (sqlite-exec
+ db
+ "ALTER TABLE narinfos ADD COLUMN added_at TEXT;"))
+
+ (unless (table-exists? db "scheduled_narinfo_removal")
+ (sqlite-exec
+ db
+ "
+CREATE TABLE scheduled_narinfo_removal (
+ narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id),
+ removal_datetime TEXT NOT NULL
+);"))
+
+ (unless (table-exists? db "scheduled_cached_narinfo_removal")
+ (sqlite-exec
+ db
+ "
+CREATE TABLE scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id),
+ removal_datetime TEXT NOT NULL
+);"))
+
+ (sqlite-exec
+ db
+ "CREATE INDEX IF NOT EXISTS narinfo_tags_narinfo_id
+ ON narinfo_tags (narinfo_id);")
+
+ (sqlite-exec
+ db
+ "CREATE INDEX IF NOT EXISTS narinfo_references_narinfo_id
+ ON narinfo_references (narinfo_id);")
+
+ (sqlite-exec
+ db
+ "CREATE INDEX IF NOT EXISTS narinfo_files_narinfo_id
+ ON narinfo_files (narinfo_id);"))
+
+(define* (setup-database database-file metrics-registry
+ #:key (reader-threads 1))
+ (define mmap-size #f)
-(define (setup-database database-file metrics-registry)
(let ((db (db-open database-file)))
(sqlite-exec db "PRAGMA journal_mode=WAL;")
(sqlite-exec db "PRAGMA optimize;")
@@ -149,44 +270,77 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))
(update-schema db)
+ ;; (let ((requested-mmap-bytes 2147418112)
+ ;; (statement
+ ;; (sqlite-prepare
+ ;; db
+ ;; (simple-format #f "PRAGMA mmap_size=~A;"
+ ;; 2147418112))))
+ ;; (match (sqlite-step statement)
+ ;; (#(result-mmap-size)
+ ;; (sqlite-finalize statement)
+ ;; (set! mmap-size
+ ;; result-mmap-size))))
+
(sqlite-close db))
(let ((reader-thread-channel
- (make-worker-thread-channel
+ (make-worker-thread-set
(lambda ()
(let ((db
(db-open database-file #:write? #f)))
(sqlite-exec db "PRAGMA busy_timeout = 5000;")
+ (when mmap-size
+ (sqlite-exec
+ db
+ (simple-format #f "PRAGMA mmap_size=~A;"
+ (number->string mmap-size))))
(list db)))
#:destructor
(lambda (db)
(sqlite-close db))
#:lifetime 50000
+ #:name "db r"
;; Use a minimum of 2 and a maximum of 8 threads
- #:parallelism
- (min (max (current-processor-count)
- 2)
- 64)
+ #:parallelism reader-threads
#:delay-logger (let ((delay-metric
(make-histogram-metric
metrics-registry
"database_read_delay_seconds")))
- (lambda (seconds-delayed)
+ (lambda (seconds-delayed proc)
(metric-observe delay-metric seconds-delayed)
(when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database read delayed by ~1,2f seconds~%"
- seconds-delayed))))))
+ (display
+ (format
+ #f
+ "warning: database read (~a) delayed by ~1,2f seconds~%"
+ proc
+ seconds-delayed)
+ (current-error-port)))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 5)
+ (display
+ (format
+ #f
+ "warning: database read took ~1,2f seconds (~a)~%"
+ duration
+ proc)
+ (current-error-port))))))
(writer-thread-channel
- (make-worker-thread-channel
+ (make-worker-thread-set
(lambda ()
(let ((db
(db-open database-file)))
(sqlite-exec db "PRAGMA busy_timeout = 5000;")
(sqlite-exec db "PRAGMA foreign_keys = ON;")
+ (when mmap-size
+ (sqlite-exec
+ db
+ (simple-format #f "PRAGMA mmap_size=~A;"
+ (number->string mmap-size))))
(list db)))
#:destructor
(lambda (db)
@@ -195,6 +349,7 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))
(sqlite-close db))
#:lifetime 500
+ #:name "db w"
;; SQLite doesn't support parallel writes
#:parallelism 1
@@ -202,19 +357,56 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))
(make-histogram-metric
metrics-registry
"database_write_delay_seconds")))
- (lambda (seconds-delayed)
+ (lambda (seconds-delayed proc)
(metric-observe delay-metric seconds-delayed)
(when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database write delayed by ~1,2f seconds~%"
- seconds-delayed)))))))
+ (display
+ (format
+ #f
+ "warning: database write (~a) delayed by ~1,2f seconds~%"
+ proc
+ seconds-delayed)
+ (current-error-port)))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 5)
+ (display
+ (format
+ #f
+ "warning: database write took ~1,2f seconds (~a)~%"
+ duration
+ proc)
+ (current-error-port)))))))
(make-database database-file
reader-thread-channel
writer-thread-channel
metrics-registry)))
+(define (update-database-metrics! database)
+ (let* ((db-filename (database-file database))
+ (db-wal-filename
+ (string-append db-filename "-wal"))
+
+ (registry (database-metrics-registry database))
+ (db-bytes
+ (or (metrics-registry-fetch-metric registry
+ "database_bytes")
+ (make-gauge-metric
+ registry "database_bytes"
+ #:docstring "Size of the SQLite database file")))
+ (db-wal-bytes
+ (or (metrics-registry-fetch-metric registry
+ "database_wal_bytes")
+ (make-gauge-metric
+ registry "database_wal_bytes"
+ #:docstring "Size of the SQLite Write Ahead Log file"))))
+
+
+ (metric-set db-bytes (stat:size (stat db-filename)))
+ (metric-set db-wal-bytes (stat:size (stat db-wal-filename))))
+ #t)
+
(define (db-optimize db db-filename)
(define (wal-size)
(let ((db-wal-filename
@@ -267,43 +459,110 @@ PRAGMA optimize;")))
(string-append "database_" thing "_duration_seconds"))
(if registry
- (let* ((metric
- (or (metrics-registry-fetch-metric registry metric-name)
- (make-histogram-metric registry
- metric-name)))
- (start-time (get-internal-real-time)))
- (let ((result (thunk)))
- (metric-observe metric
- (/ (- (get-internal-real-time) start-time)
- internal-time-units-per-second))
- result))
+ (call-with-duration-metric registry
+ metric-name
+ thunk)
(thunk)))
(define %current-transaction-proc
(make-parameter #f))
(define* (database-call-with-transaction database proc
- #:key
- readonly?)
+ #:key
+ readonly?
+ (immediate? (not readonly?)))
(define (run-proc-within-transaction db)
- (if (%current-transaction-proc)
- (proc db) ; already in transaction
- (begin
- (sqlite-exec db "BEGIN TRANSACTION;")
- (with-exception-handler
- (lambda (exn)
- (simple-format (current-error-port)
- "error: sqlite rolling back transaction\n")
- (sqlite-exec db "ROLLBACK TRANSACTION;")
- (raise-exception exn))
+ (define (attempt-begin)
+ (with-exception-handler
+ (lambda (exn)
+ (match (exception-args exn)
+ (('sqlite-exec 5 msg)
+ (simple-format
+ (current-error-port)
+ "warning: issue starting transaction (code: 5, proc: ~A): ~A\n"
+ proc msg)
+ #f)
+ (_
+ (simple-format (current-error-port)
+ "exception starting transaction: ~A\n" exn)
+ (raise-exception exn))))
+ (lambda ()
+ (sqlite-exec db (if immediate?
+ "BEGIN IMMEDIATE TRANSACTION;"
+ "BEGIN TRANSACTION;"))
+ #t)
+ #:unwind? #t))
+
+ (define (attempt-commit)
+ (with-exception-handler
+ (lambda (exn)
+ (match (exception-args exn)
+ (('sqlite-exec 5 msg)
+ (simple-format
+ (current-error-port)
+ "warning: attempt commit (code: 5, proc: ~A): ~A\n"
+ proc msg)
+ #f)
+ (_
+ (simple-format (current-error-port)
+ "exception committing transaction: ~A\n" exn)
+ (raise-exception exn))))
+ (lambda ()
+ (sqlite-exec db "COMMIT TRANSACTION;")
+ #t)
+ #:unwind? #t))
+
+ (if (attempt-begin)
+ (call-with-values
(lambda ()
- (call-with-values
- (lambda ()
- (parameterize ((%current-transaction-proc proc))
- (proc db)))
- (lambda vals
- (sqlite-exec db "COMMIT TRANSACTION;")
- (apply values vals))))))))
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "error: sqlite rolling back transaction (~A)\n"
+ exn)
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (raise-exception exn))
+ (lambda ()
+ (parameterize ((%current-transaction-proc proc))
+ (proc-with-duration-timing db)))
+ #:unwind? #t))
+ (lambda vals
+ (let loop ((success? (attempt-commit)))
+ (if success?
+ (apply values vals)
+ (loop (attempt-commit))))))
+
+ ;; Database is busy, so retry
+ (run-proc-within-transaction db)))
+
+ (define (proc-with-duration-timing db)
+ (let ((start-time (get-internal-real-time)))
+ (call-with-values
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (proc db))
+ (lambda (key . args)
+ (simple-format
+ (current-error-port)
+ "exception in transaction: ~A: ~A\n"
+ key args)
+ (backtrace))))
+ (lambda vals
+ (let ((duration-seconds
+ (/ (- (get-internal-real-time) start-time)
+ internal-time-units-per-second)))
+ (when (and (not readonly?)
+ (> duration-seconds 2))
+ (display
+ (format
+ #f
+ "warning: ~a:\n took ~4f seconds in transaction\n"
+ proc
+ duration-seconds)
+ (current-error-port)))
+
+ (cons duration-seconds vals))))))
(match (call-with-worker-thread
((if readonly?
@@ -311,25 +570,9 @@ PRAGMA optimize;")))
database-writer-thread-channel)
database)
(lambda (db)
- (let ((start-time (get-internal-real-time)))
- (call-with-values
- (lambda ()
- (run-proc-within-transaction db))
- (lambda vals
- (let ((duration-seconds
- (/ (- (get-internal-real-time) start-time)
- internal-time-units-per-second)))
- (when (and (not readonly?)
- (> duration-seconds 2))
- (display
- (format
- #f
- "warning: ~a:\n took ~4f seconds in transaction\n"
- proc
- duration-seconds)
- (current-error-port)))
-
- (cons duration-seconds vals)))))))
+ (if (%current-transaction-proc)
+ (proc-with-duration-timing db) ; already in transaction
+ (run-proc-within-transaction db))))
((duration vals ...)
(apply values vals))))
@@ -417,9 +660,9 @@ SELECT id FROM tags WHERE key = :key AND value = :value"
db
"
INSERT INTO narinfos (
- store_path, nar_hash, nar_size, deriver, system, contents
+ store_path, nar_hash, nar_size, deriver, system, contents, added_at
) VALUES (
- :store_path, :nar_hash, :nar_size, :deriver, :system, :contents
+ :store_path, :nar_hash, :nar_size, :deriver, :system, :contents, :added_at
)"
#:cache? #t)))
(sqlite-bind-arguments
@@ -429,7 +672,8 @@ INSERT INTO narinfos (
#:nar_size (narinfo-size narinfo)
#:deriver (narinfo-deriver narinfo)
#:system (narinfo-system narinfo)
- #:contents (narinfo-contents narinfo))
+ #:contents (narinfo-contents narinfo)
+ #:added_at (date->string (current-date) "~1 ~3"))
(sqlite-step statement)
(sqlite-reset statement)
@@ -563,23 +807,6 @@ INSERT INTO narinfo_tags (narinfo_id, tag_id) VALUES (:narinfo_id, :tag_id)"
(define* (database-remove-narinfo database store-path
#:key change-datetime)
- (define (store-path->narinfo-id db)
- (let ((statement
- (sqlite-prepare
- db
- "
-SELECT id FROM narinfos WHERE store_path = :store_path"
- #:cache? #t)))
-
- (sqlite-bind-arguments
- statement
- #:store_path store-path)
-
- (let ((result (vector-ref (sqlite-step statement) 0)))
- (sqlite-reset statement)
-
- result)))
-
(define (remove-narinfo-record db id)
(let ((statement
(sqlite-prepare
@@ -677,18 +904,90 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id"
(database-call-with-transaction
database
(lambda (db)
- (let ((narinfo-id (store-path->narinfo-id db)))
- (if change-datetime
- (insert-change-with-datetime db store-path
- change-datetime)
- (insert-change db store-path))
+ (let ((narinfo-details
+ (database-select-narinfo-by-hash
+ database
+ (store-path-hash-part store-path))))
+ (if narinfo-details
+ (let ((narinfo-id (assq-ref narinfo-details
+ 'id)))
+ (if change-datetime
+ (insert-change-with-datetime db store-path
+ change-datetime)
+ (insert-change db store-path))
+
+ (remove-narinfo-files db narinfo-id)
+ (remove-narinfo-references db narinfo-id)
+ (remove-tags db narinfo-id)
+
+ (remove-narinfo-record db narinfo-id)
+ #t)
+ #f)))))
+
+(define (database-select-narinfo database id)
+ (call-with-time-tracking
+ database
+ "select_narinfo"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT store_path, nar_hash, nar_size, deriver, system
+FROM narinfos
+WHERE id = :id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:id id)
- (remove-narinfo-files db narinfo-id)
- (remove-narinfo-references db narinfo-id)
- (remove-tags db narinfo-id)
- (remove-narinfo-record db narinfo-id)
+ (match (let ((result (sqlite-step statement)))
+ (sqlite-reset statement)
+ result)
+ (#(store_path nar_hash nar_size deriver system)
+ `((store-path . ,store_path)
+ (nar-hash . ,nar_hash)
+ (nar-size . ,nar_size)
+ (deriver . ,deriver)
+ (system . ,system)))
+ (_
+ #f))))))))
+
+(define (database-select-narinfo-by-hash database hash)
+ (call-with-time-tracking
+ database
+ "select_narinfo_by_hash"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT id, store_path, nar_hash, nar_size, deriver, system
+FROM narinfos
+WHERE substr(store_path, 12, 32) = :hash"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:hash hash)
- #t))))
+ (match (let ((result (sqlite-step statement)))
+ (sqlite-reset statement)
+ result)
+ (#(id store_path nar_hash nar_size deriver system)
+ `((id . ,id)
+ (store-path . ,store_path)
+ (nar-hash . ,nar_hash)
+ (nar-size . ,nar_size)
+ (deriver . ,deriver)
+ (system . ,system)))
+ (_
+ #f))))))))
(define (database-select-narinfo-contents-by-hash database hash)
(call-with-time-tracking
@@ -702,7 +1001,7 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id"
(sqlite-prepare
db
"
-SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
+SELECT id, contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
#:cache? #t)))
(sqlite-bind-arguments
statement
@@ -711,8 +1010,27 @@ SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
(match (let ((result (sqlite-step statement)))
(sqlite-reset statement)
result)
- (#(contents) contents)
- (_ #f))))))))
+ (#(id contents)
+ (values contents id))
+ (_
+ (values #f #f)))))))))
+
+(define (database-count-recent-changes database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT COUNT(*) FROM recent_changes"
+ #:cache? #t)))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(count) count))))
+ (sqlite-reset statement)
+ result)))))
(define* (database-select-recent-changes database after-date #:key (limit 8192))
(call-with-worker-thread
@@ -722,26 +1040,27 @@ SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
(sqlite-prepare
db
"
-SELECT datetime, change, data FROM recent_changes WHERE datetime >= :datetime LIMIT :limit"
+SELECT datetime, change, data
+FROM recent_changes
+WHERE datetime >= :datetime
+ORDER BY datetime ASC
+LIMIT :limit"
#:cache? #t)))
(sqlite-bind-arguments
statement
#:datetime after-date
#:limit limit)
- (let loop ((row (sqlite-step statement))
- (result '()))
- (match row
- (#(datetime change data)
- (loop (sqlite-step statement)
- (cons `((datetime . ,datetime)
- (change . ,change)
- (data . ,data))
- result)))
- (#f
- (sqlite-reset statement)
-
- (reverse result))))))))
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(datetime change data)
+ `((datetime . ,datetime)
+ (change . ,change)
+ (data . ,data))))
+ statement)))
+ (sqlite-reset statement)
+ result)))))
(define (database-select-latest-recent-change-datetime database)
(call-with-worker-thread
@@ -869,7 +1188,42 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash"
result)))))))
-(define (database-map-all-narinfo-files database proc)
+(define (database-select-narinfo-files-by-narinfo-id database narinfo-id)
+ (call-with-time-tracking
+ database
+ "select_narinfo_files_by_narinfo_id"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url
+FROM narinfos
+INNER JOIN narinfo_files
+ ON narinfos.id = narinfo_files.narinfo_id
+WHERE narinfos.id = :narinfo_id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id)
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(size compression url)
+ `((size . ,size)
+ (compression . ,compression)
+ (url . ,url))))
+ statement)))
+ (sqlite-reset statement)
+
+ result)))))))
+
+(define (database-fold-all-narinfo-files database proc init)
(call-with-worker-thread
(database-reader-thread-channel database)
(lambda (db)
@@ -885,12 +1239,410 @@ FROM narinfo_files"
(lambda (row result)
(match row
(#(size compression url)
- (cons (proc `((size . ,size)
- (compression . ,compression)
- (url . ,url)))
+ (proc `((size . ,size)
+ (compression . ,compression)
+ (url . ,url))
+ result))))
+ init
+ statement)))
+ (sqlite-reset statement)
+
+ result-list)))))
+
+(define (database-map-all-narinfo-files database proc)
+ (database-fold-all-narinfo-files
+ database
+ (lambda (nar-file result)
+ (cons (proc nar-file)
+ result))
+ '()))
+
+(define (database-count-narinfo-files database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT COUNT(*) FROM narinfo_files"
+ #:cache? #t)))
+
+ (let ((result
+ (vector-ref (sqlite-step statement)
+ 0)))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define (database-insert-cached-narinfo-file database
+ narinfo-id
+ size
+ compression)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO cached_narinfo_files (
+ narinfo_id, size, compression
+) VALUES (
+ :narinfo_id, :size, :compression
+)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:size size
+ #:compression (symbol->string compression))
+
+ (sqlite-step statement)
+ (sqlite-reset statement)
+
+ (last-insert-rowid db)))))
+
+(define (database-select-cached-narinfo-file-by-hash database
+ hash
+ compression)
+ (call-with-time-tracking
+ database
+ "select_cached_narinfo_file_by_hash"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id, cached_narinfo_files.size
+FROM narinfos
+INNER JOIN cached_narinfo_files
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+WHERE substr(narinfos.store_path, 12, 32) = :hash
+ AND cached_narinfo_files.compression = :compression"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:hash hash
+ #:compression (symbol->string compression))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(id size)
+ `((id . ,id)
+ (size . ,size)))
+ (#f #f))))
+ (sqlite-reset statement)
+
+ result)))))))
+
+(define (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ narinfo-id)
+ (call-with-time-tracking
+ database
+ "select_cached_narinfo_file_by_narinfo_id"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id,
+ narinfos.store_path,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression
+FROM cached_narinfo_files
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+WHERE narinfo_id = :narinfo_id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id)
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(id store_path size compression)
+ `((id . ,id)
+ (store-path . ,store_path)
+ (size . ,size)
+ (compression . ,(string->symbol compression)))))
+ statement)))
+ (sqlite-reset statement)
+
+ result)))))))
+
+(define (database-select-cached-narinfo-file-by-narinfo-id-and-compression
+ database
+ narinfo-id
+ compression)
+ (call-with-time-tracking
+ database
+ "select_cached_narinfo_file_by_narinfo_id_and_compression"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id,
+ narinfos.store_path,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression
+FROM cached_narinfo_files
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+WHERE narinfo_id = :narinfo_id
+ AND compression = :compression"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:compression (symbol->string compression))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(id store_path size compression)
+ `((id . ,id)
+ (store-path . ,store_path)
+ (size . ,size)
+ (compression . ,(string->symbol compression)))))))
+ (sqlite-reset statement)
+
+ result)))))))
+
+(define (database-fold-cached-narinfo-files database
+ proc
+ init)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT store_path, size, compression, narinfo_id
+FROM cached_narinfo_files
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id"
+ #:cache? #t)))
+ (let ((result-list
+ (sqlite-fold
+ (lambda (row result)
+ (match row
+ (#(store_path size compression narinfo_id)
+ (proc `((size . ,size)
+ (compression . ,(string->symbol compression))
+ (store-path . ,store_path)
+ (narinfo-id . ,narinfo_id))
result))))
- '()
+ init
statement)))
(sqlite-reset statement)
result-list)))))
+
+(define (database-remove-cached-narinfo-file database narinfo-id compression)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+DELETE FROM cached_narinfo_files
+WHERE narinfo_id = :narinfo_id
+ AND compression = :compression"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:compression compression)
+
+ (sqlite-step statement)
+ (sqlite-reset statement)))))
+
+(define (database-select-scheduled-narinfo-removal database narinfo-id)
+ (call-with-time-tracking
+ database
+ "select_scheduled_narinfo_removal"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT removal_datetime
+FROM scheduled_narinfo_removal
+WHERE narinfo_id = :narinfo_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(datetime)
+ (date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))))
+
+(define (database-select-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id)
+ (call-with-time-tracking
+ database
+ "select_scheduled_narinfo_removal"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT removal_datetime
+FROM scheduled_cached_narinfo_removal
+WHERE cached_narinfo_file_id = :cached_narinfo_file_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(datetime)
+ (date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))))
+
+(define (database-delete-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+DELETE FROM scheduled_cached_narinfo_removal
+WHERE cached_narinfo_file_id = :cached_narinfo_file_id
+RETURNING 1"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id)
+
+ (let ((result (->bool (sqlite-step statement))))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define (database-select-oldest-scheduled-cached-narinfo-removal database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id,
+ cached_narinfo_files.narinfo_id,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression,
+ narinfos.store_path,
+ scheduled_cached_narinfo_removal.removal_datetime
+FROM scheduled_cached_narinfo_removal
+INNER JOIN cached_narinfo_files
+ ON scheduled_cached_narinfo_removal.cached_narinfo_file_id =
+ cached_narinfo_files.id
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+ORDER BY scheduled_cached_narinfo_removal.removal_datetime DESC
+LIMIT 1"
+ #:cache? #t)))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(id narinfo_id size compression store_path datetime)
+ `((id . ,id)
+ (narinfo-id . ,narinfo_id)
+ (size . ,size)
+ (compression . ,(string->symbol compression))
+ (store-path . ,store_path)
+ (scheduled-removal-time . ,(date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))
+
+(define (database-count-scheduled-cached-narinfo-removal database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT COUNT(*) FROM scheduled_cached_narinfo_removal"
+ #:cache? #t)))
+
+ (let ((result
+ (vector-ref (sqlite-step statement)
+ 0)))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define (database-insert-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id
+ removal-datetime)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id, removal_datetime
+) VALUES (
+ :cached_narinfo_file_id, :removal_datetime
+)"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id
+ #:removal_datetime (date->string
+ (time-utc->date removal-datetime)
+ "~Y-~m-~d ~H:~M:~S"))
+
+ (sqlite-step statement)
+ (sqlite-reset statement)
+
+ #t))))
diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm
index 6b9f4f7..8aae845 100644
--- a/nar-herder/mirror.scm
+++ b/nar-herder/mirror.scm
@@ -19,6 +19,7 @@
(define-module (nar-herder mirror)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-43)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (ice-9 exceptions)
@@ -29,22 +30,18 @@
#:use-module (prometheus)
#:use-module (logging logger)
#:use-module (json)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
#:use-module (guix narinfo)
#:use-module ((guix store) #:select (store-path-hash-part))
#:use-module (nar-herder utils)
#:use-module (nar-herder database)
#:use-module (nar-herder storage)
- #:export (start-fetch-changes-thread))
-
-(define (start-fetch-changes-thread database storage-root
- mirror metrics-registry)
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
+ #:export (start-fetch-changes-fiber))
+(define (start-fetch-changes-fiber database metrics-registry
+ storage-root mirror
+ cached-compression-management-channel)
(define (request-recent-changes)
(define latest-recent-change
(database-select-latest-recent-change-datetime database))
@@ -72,18 +69,25 @@
(lambda ()
(retry-on-error
(lambda ()
- (http-get uri #:decode-body? #f))
+ (log-msg 'INFO "querying for recent changes since "
+ latest-recent-change)
+ (with-port-timeouts
+ (lambda ()
+ (let ((port
+ socket
+ (open-socket-for-uri* uri)))
+ (http-get uri
+ #:port port
+ #:streaming? #t)))
+ #:timeout 30))
#:times 3
#:delay 15))
(lambda (response body)
(if (= (response-code response) 200)
- (let* ((json-body (json-string->scm
- (utf8->string body)))
+ (let* ((json-body (json->scm body))
(recent-changes
(assoc-ref json-body "recent_changes")))
- (log-msg 'INFO "queried for recent changes since "
- latest-recent-change)
(log-msg 'INFO "got " (vector-length recent-changes) " changes")
;; Switch to symbol keys and standardise the key order
@@ -116,48 +120,62 @@
narinfo
#:change-datetime
(assq-ref change-details
- 'datetime))
-
- (let ((new-files-count (length (narinfo-uris narinfo))))
- (metric-increment nar-files-metric
- #:by new-files-count
- ;; TODO This should be
- ;; checked, rather than
- ;; assumed to be false
- #:label-values '((stored . "false"))))))
+ 'datetime))))
+
((string=? change "removal")
(let ((store-path (assq-ref change-details 'data)))
(log-msg 'INFO "processing removal change for "
store-path
" (" (assq-ref change-details 'datetime) ")")
+ (let* ((hash (store-path-hash-part store-path))
+ (narinfo-details
+ (database-select-narinfo-by-hash
+ database
+ hash)))
+
(when storage-root
(remove-nar-files-by-hash
database
storage-root
metrics-registry
- (store-path-hash-part store-path)))
+ hash))
+
+ (let ((cached-narinfo-files
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ (assq-ref narinfo-details 'id))))
+ (for-each
+ (lambda (cached-narinfo-file-details)
+ ;; TODO Delete the file as well
+
+ (let ((reply (make-channel)))
+ (put-message
+ cached-compression-management-channel
+ (list 'cached-narinfo-removed
+ (assq-ref narinfo-details 'id)
+ (assq-ref cached-narinfo-files 'compression)
+ (assq-ref cached-narinfo-files 'size)
+ reply))
+ (get-message reply)))
+ cached-narinfo-files))
(database-remove-narinfo database
store-path
#:change-datetime
(assq-ref change-details
- 'datetime))))
+ 'datetime)))))
(else
(error "unimplemented"))))))
recent-changes))
(raise-exception
(make-exception-with-message
- (simple-format #f "unknown response: ~A\n code: ~A response: ~A"
+ (simple-format #f "unknown response: ~A code: ~A"
(uri->string uri)
- (response-code response)
- (utf8->string body))))))))
+ (response-code response))))))))
- (call-with-new-thread
+ (spawn-fiber
(lambda ()
- ;; This will initialise the nar_files_total metric
- (get-nar-files database storage-root metrics-registry)
-
(while #t
(with-exception-handler
(lambda (exn)
@@ -165,4 +183,5 @@
request-recent-changes
#:unwind? #t)
+ (log-msg 'DEBUG "finished requesting recent changes, sleeping")
(sleep 60)))))
diff --git a/nar-herder/recent-changes.scm b/nar-herder/recent-changes.scm
index fee63f3..62bd604 100644
--- a/nar-herder/recent-changes.scm
+++ b/nar-herder/recent-changes.scm
@@ -18,15 +18,25 @@
(define-module (nar-herder recent-changes)
#:use-module (srfi srfi-1)
+ #:use-module (ice-9 match)
#:use-module (ice-9 threads)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (logging logger)
+ #:use-module (prometheus)
+ #:use-module (web uri)
+ #:use-module (guix narinfo)
#:use-module (nar-herder database)
- #:export (start-recent-change-removal-and-database-dump-thread))
+ #:export (start-recent-change-removal-and-database-dump-fiber
+ start-recent-change-listener-fiber))
-(define (start-recent-change-removal-and-database-dump-thread database
- database-dump-filename
- check-interval
- recent-changes-limit)
+(define (start-recent-change-removal-and-database-dump-fiber database
+ metrics-registry
+ database-dump-filename
+ check-interval
+ recent-changes-limit)
(define (update-database-dump)
+ (log-msg 'DEBUG "updating the database dump at " database-dump-filename)
(let ((temp-database-dump-filename
(string-append database-dump-filename ".tmp")))
@@ -41,23 +51,130 @@
(simple-format (current-error-port)
"updated database dump\n")))
- (call-with-new-thread
+ (define recent-changes-count-metric
+ (metrics-registry-fetch-metric metrics-registry "recent_changes_count"))
+
+ (spawn-fiber
(lambda ()
(while #t
- (let ((recent-changes-id-for-deletion
- (database-get-recent-changes-id-for-deletion database
- recent-changes-limit)))
- (when recent-changes-id-for-deletion
- (update-database-dump)
-
- (let ((deleted-recent-changes
- (database-delete-recent-changes-with-id-below
- database
- recent-changes-id-for-deletion)))
- (simple-format (current-error-port)
- "deleted ~A recent changes\n"
- deleted-recent-changes)))
-
- (sleep check-interval))))))
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception in recent change removal thread: ~A\n"
+ exn)
+ (sleep 120))
+ (lambda ()
+ (let ((recent-changes-id-for-deletion
+ (database-get-recent-changes-id-for-deletion database
+ recent-changes-limit)))
+ (when recent-changes-id-for-deletion
+ (when database-dump-filename
+ (update-database-dump))
+
+ (let ((deleted-recent-changes
+ (database-call-with-transaction
+ database
+ (lambda _
+ (database-delete-recent-changes-with-id-below
+ database
+ recent-changes-id-for-deletion)))))
+
+ (metric-decrement recent-changes-count-metric
+ #:by deleted-recent-changes)
+
+ (simple-format (current-error-port)
+ "deleted ~A recent changes\n"
+ deleted-recent-changes)))
+
+ (sleep check-interval)))
+ #:unwind? #t)))))
+
+(define (start-recent-change-listener-fiber database
+ metrics-registry
+ addition-channel
+ removal-channel)
+ (define recent-changes-count-metric
+ (metrics-registry-fetch-metric metrics-registry "recent_changes_count"))
+
+ (define (process-addition-change change-details)
+ (let ((narinfo
+ (call-with-input-string
+ (assq-ref change-details 'data)
+ (lambda (port)
+ (read-narinfo port
+ "https://narherderdummyvalue")))))
+ (for-each
+ (lambda (uri)
+ (log-msg 'DEBUG "processing recent addition of " (uri-path uri))
+ (put-message addition-channel (list 'addition (uri-path uri))))
+ (narinfo-uris narinfo))))
+
+ (define (process-removal-change change-details)
+ (log-msg 'DEBUG "processing recent change triggered removal of "
+ (assq-ref change-details 'data))
+ (put-message removal-channel
+ (list 'remove (assq-ref change-details 'data))))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((recent-changes-count
+ (database-count-recent-changes database)))
+ (metric-set recent-changes-count-metric recent-changes-count)
+ (log-msg 'DEBUG recent-changes-count " recent changes in the database"))
+
+ (log-msg 'DEBUG "starting to listen for recent changes")
+ (let ((after-initial
+ (database-select-latest-recent-change-datetime database)))
+ (let loop ((after after-initial)
+ (last-processed-recent-changes
+ (database-select-recent-changes database after-initial)))
+ (sleep 10)
+
+ (match
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception in recent change listener " exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (let* ((recent-changes
+ (database-select-recent-changes database after))
+ (unprocessed-recent-changes
+ (remove
+ (lambda (change-details)
+ (member change-details last-processed-recent-changes))
+ recent-changes)))
+
+ (unless (null? unprocessed-recent-changes)
+ (log-msg 'INFO "processing " (length unprocessed-recent-changes)
+ " recent changes")
+
+ (for-each
+ (lambda (change-details)
+ (let ((change (assq-ref change-details 'change)))
+ (cond
+ ((string=? change "addition")
+ (process-addition-change change-details))
+ ((string=? change "removal")
+ (process-removal-change change-details))
+ (else #f))))
+ unprocessed-recent-changes)
+ (metric-increment recent-changes-count-metric
+ #:by (length unprocessed-recent-changes)))
+ ;; Use the unprocessed recent changes here to carry
+ ;; forward all processed changes to the next pass
+ unprocessed-recent-changes))
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)
+ (#f (loop after '()))
+ (recent-changes
+ (if (null? recent-changes)
+ (loop after last-processed-recent-changes)
+ (loop (assq-ref (last recent-changes)
+ 'datetime)
+ recent-changes)))))))))
diff --git a/nar-herder/server.scm b/nar-herder/server.scm
index 522ff3f..2e2c1e7 100644
--- a/nar-herder/server.scm
+++ b/nar-herder/server.scm
@@ -19,39 +19,65 @@
(define-module (nar-herder server)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-11)
+ #:use-module (srfi srfi-19)
#:use-module (srfi srfi-34)
+ #:use-module (srfi srfi-71)
+ #:use-module (ice-9 ftw)
+ #:use-module (ice-9 iconv)
#:use-module (ice-9 match)
+ #:use-module (ice-9 threads)
#:use-module (ice-9 binary-ports)
#:use-module (rnrs bytevectors)
#:use-module (web uri)
+ #:use-module (web client)
#:use-module (web response)
#:use-module (web request)
#:use-module (logging logger)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers scheduler)
+ #:use-module (fibers conditions)
+ #:use-module (fibers operations)
#:use-module (prometheus)
#:use-module (json)
#:use-module ((system foreign)
#:select (bytevector->pointer pointer->bytevector))
#:use-module (guix store)
#:use-module (guix base32)
+ #:use-module (guix progress)
#:use-module (guix serialization)
#:use-module ((guix utils)
#:select (decompressed-port))
#:use-module ((guix build utils)
#:select (dump-port))
+ #:use-module ((guix build syscalls) #:select (set-thread-name))
#:use-module (nar-herder database)
#:use-module (nar-herder storage)
+ #:use-module (nar-herder utils)
+ #:use-module (nar-herder mirror)
+ #:use-module (nar-herder recent-changes)
+ #:use-module (nar-herder cached-compression)
#:use-module (ice-9 textual-ports)
- #:export (make-request-handler))
+ #:export (%compression-options
+
+ run-nar-herder-service
+ make-request-handler))
+
+(define %compression-options
+ '(gzip lzip zstd none))
(define* (render-json json #:key (extra-headers '())
(code 200))
(values (build-response
#:code code
#:headers (append extra-headers
- '((content-type . (application/json))
+ '((content-type . (application/json
+ (charset . "utf-8")))
(vary . (accept)))))
- (lambda (port)
- (scm->json json port))))
+ (call-with-encoded-output-string
+ "utf-8"
+ (lambda (port)
+ (scm->json json port)))))
(define (parse-query-string query)
(let lp ((lst (map uri-decode (string-split query (char-set #\& #\=)))))
@@ -61,32 +87,6 @@
(("") '())
(() '()))))
-(define (get-gc-metrics-updater registry)
- (define metrics
- `((gc-time-taken
- . ,(make-gauge-metric registry "guile_gc_time_taken"))
- (heap-size
- . ,(make-gauge-metric registry "guile_heap_size"))
- (heap-free-size
- . ,(make-gauge-metric registry "guile_heap_free_size"))
- (heap-total-allocated
- . ,(make-gauge-metric registry "guile_heap_total_allocated"))
- (heap-allocated-since-gc
- . ,(make-gauge-metric registry "guile_allocated_since_gc"))
- (protected-objects
- . ,(make-gauge-metric registry "guile_gc_protected_objects"))
- (gc-times
- . ,(make-gauge-metric registry "guile_gc_times"))))
-
- (lambda ()
- (let ((stats (gc-stats)))
- (for-each
- (match-lambda
- ((name . metric)
- (let ((value (assq-ref stats name)))
- (metric-set metric value))))
- metrics))))
-
(define (serve-fixed-output-file input compression proc)
;; TODO It's hard with fold-archive from (guix serialization) to
;; read just the singular file from the archive, so the following
@@ -121,10 +121,11 @@
(define (read-string p)
(utf8->string (read-byte-string p)))
- (let*-values (((port pids)
- (decompressed-port
- (string->symbol compression)
- input)))
+ (let ((port
+ pids
+ (decompressed-port
+ (string->symbol compression)
+ input)))
;; The decompressor can be an external program, so wait for it to
;; exit
@@ -147,26 +148,78 @@
(proc port size)))))
+(define (add-cached-compressions-to-narinfo initial-narinfo-contents
+ cached-narinfo-files)
+ (let ((cached-nar-strings
+ (map (lambda (cached-nar-details)
+ (let ((compression
+ (symbol->string
+ (assq-ref cached-nar-details 'compression))))
+ (string-append
+ "URL: nar/" compression "/"
+ (uri-encode
+ (store-path-base
+ (assq-ref cached-nar-details 'store-path)))
+ "\n"
+ "Compression: " compression "\n"
+ "FileSize: " (number->string
+ (assq-ref cached-nar-details 'size))
+ "\n")))
+ cached-narinfo-files)))
+ (string-append
+ initial-narinfo-contents
+ (string-join
+ cached-nar-strings
+ "\n"))))
+
(define* (make-request-handler database storage-root
- #:key ttl negative-ttl logger
- metrics-registry)
+ #:key base-ttl base-cached-compressions-ttl
+ negative-ttl logger
+ metrics-registry
+ maybe-trigger-creation-of-cached-nars
+ cached-compression-nar-requested-hook)
+ (define hostname
+ (gethostname))
+
(define (narinfo? str)
(and
(= (string-length str) 40)
(string-suffix? ".narinfo" str)))
+ (define plain-metrics-registry
+ (make-metrics-registry))
+
(define gc-metrics-updater
- (get-gc-metrics-updater metrics-registry))
+ (get-gc-metrics-updater plain-metrics-registry))
+
+ (define process-metrics-updater
+ (get-process-metrics-updater plain-metrics-registry))
+
+ (define guile-time-metrics-updater
+ (let ((internal-real-time
+ (make-gauge-metric plain-metrics-registry "guile_internal_real_time"))
+ (internal-run-time
+ (make-gauge-metric plain-metrics-registry "guile_internal_run_time")))
+ (lambda ()
+ (metric-set internal-real-time
+ (get-internal-real-time))
+ (metric-set internal-run-time
+ (get-internal-run-time)))))
(define requests-total-metric
(make-counter-metric metrics-registry
"server_requests_total"))
- (define (increment-request-metric category response-code)
+ (define* (increment-request-metric category response-code #:key (labels '()))
(metric-increment
requests-total-metric
#:label-values `((category . ,category)
- (response_code . ,response-code))))
+ (response_code . ,response-code)
+ ,@labels)))
+
+ (define %compression-strings
+ (map symbol->string
+ %compression-options))
(lambda (request body)
(log-msg logger
@@ -178,30 +231,81 @@
(match (cons (request-method request)
(split-and-decode-uri-path
(uri-path (request-uri request))))
- (('GET (? narinfo? narinfo))
- (let ((narinfo-contents
+ (((or 'HEAD 'GET) (? narinfo? narinfo))
+ (let ((base-narinfo-contents
+ narinfo-id
(database-select-narinfo-contents-by-hash
database
(string-take narinfo 32))))
(increment-request-metric "narinfo"
- (if narinfo-contents
+ (if base-narinfo-contents
"200"
"404"))
- (if narinfo-contents
- (values `((content-type . (text/plain))
- ,@(if ttl
- `((cache-control (max-age . ,ttl)))
- '()))
- narinfo-contents)
+ (if base-narinfo-contents
+ (let* ((cached-narinfo-files
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+ (narinfo-contents
+ (if (null? cached-narinfo-files)
+ base-narinfo-contents
+ (add-cached-compressions-to-narinfo
+ base-narinfo-contents
+ cached-narinfo-files)))
+ (potential-ttls
+ (remove
+ not
+ `(,(if (null? cached-narinfo-files)
+ base-ttl
+ base-cached-compressions-ttl)
+
+ ,(and=> (database-select-scheduled-narinfo-removal
+ database
+ narinfo-id)
+ (lambda (scheduled-removal-time)
+ (list
+ (max
+ (- (time-second
+ (time-difference scheduled-removal-time
+ (current-time)))
+ 60)
+ 0))))
+
+ ,@(if (null? cached-narinfo-files)
+ '()
+ (map
+ (lambda (details)
+ (and=>
+ (database-select-scheduled-cached-narinfo-removal
+ database
+ (assq-ref details 'id))
+ (lambda (scheduled-removal-time)
+ (max
+ (- (time-second
+ (time-difference scheduled-removal-time
+ (current-time)))
+ 60)
+ 0))))
+ cached-narinfo-files)))))
+ (ttl
+ (cond
+ ((null? potential-ttls) #f)
+ (else (apply min potential-ttls)))))
+
+ (values `((content-type . (text/plain))
+ ,@(if ttl
+ `((cache-control (max-age . ,ttl)))
+ '()))
+ narinfo-contents))
(values (build-response #:code 404
#:headers (if negative-ttl
`((cache-control
(max-age . ,negative-ttl)))
'()))
"404"))))
- (('GET (? narinfo? narinfo) "info")
+ (((or 'HEAD 'GET) (? narinfo? narinfo) "info")
(let ((narinfo-contents
(database-select-narinfo-contents-by-hash
database
@@ -220,42 +324,133 @@
(string-take narinfo 32)))))
(values (build-response #:code 404)
"404"))))
- (('GET "nar" compression filename)
- (let* ((hash (string-take filename 32))
+ (((or 'HEAD 'GET) "nar" compression filename)
+ (let* ((hash (and (>= (string-length filename) 32)
+ (string-take filename 32)))
+ (narinfo
+ (and hash
+ (database-select-narinfo-by-hash
+ database
+ hash)))
(narinfo-files
- (database-select-narinfo-files
- database
- hash))
+ (and=> (assq-ref narinfo 'id)
+ (lambda (id)
+ (database-select-narinfo-files-by-narinfo-id
+ database
+ id))))
(narinfo-file-for-compression
(find (lambda (file)
- (string=? (assq-ref file 'compression)
- compression))
- narinfo-files)))
-
- (when (or narinfo-file-for-compression
- ;; Check for a common compression to avoid lots of
- ;; metrics being generated if compression is random
- (member compression '("gzip" "lzip" "zstd")))
- (increment-request-metric
- (string-append "nar/"
- compression)
- (if narinfo-file-for-compression "200" "404")))
+ (and (string=? (assq-ref file 'compression)
+ compression)
+ (string=?
+ (last (string-split (assq-ref file 'url)
+ #\/))
+ (uri-encode filename))))
+ (or narinfo-files '())))
+ (compression-symbol
+ (if (member
+ compression
+ %compression-strings
+ string=?)
+ (string->symbol compression)
+ #f)))
(if narinfo-file-for-compression
- (values (build-response
- #:code 200
- #:headers `((X-Accel-Redirect
- . ,(string-append
- "/internal/nar/"
- compression "/"
- (uri-encode filename)))))
- #f)
- (values (build-response #:code 404)
- "404"))))
- (('GET "file" name algo hash)
+ (let ((loop?
+ (any
+ (lambda (via)
+ (string=? (last (string-split via #\space))
+ hostname))
+ (request-via request))))
+
+ (when (and (not loop?)
+ maybe-trigger-creation-of-cached-nars)
+ (maybe-trigger-creation-of-cached-nars
+ (assq-ref narinfo 'id)))
+
+ (when loop?
+ (log-msg logger 'WARN
+ (request-method request)
+ " "
+ (uri-path (request-uri request))
+ ": loop detected (" hostname "): "
+ (string-join (request-via request) ", ")))
+
+ (increment-request-metric
+ (string-append "nar/"
+ compression)
+ (if loop?
+ "500"
+ "200")
+ #:labels
+ (let ((system (assq-ref narinfo 'system)))
+ (if (string? system)
+ `((system . ,system))
+ '())))
+
+ (if loop?
+ (values (build-response #:code 500)
+ (simple-format #f "loop detected (~A): ~A\n"
+ hostname
+ (request-via request)))
+ (values (build-response
+ #:code 200
+ #:headers `((X-Accel-Redirect
+ . ,(string-append
+ "/internal/nar/"
+ compression "/"
+ (uri-encode filename)))))
+ #f)))
+ (let ((cached-narinfo-file
+ (and narinfo ; must be a known hash
+ compression-symbol ; must be a known compression
+ ;; Check that the filename given in the
+ ;; request matches the narinfo store-path
+ (string=? filename
+ (basename
+ (assq-ref narinfo 'store-path)))
+ (database-select-cached-narinfo-file-by-hash
+ database
+ hash
+ compression-symbol))))
+
+ (when (or cached-narinfo-file
+ ;; Check for a common compression to avoid lots of
+ ;; metrics being generated if compression is random
+ compression-symbol)
+ (increment-request-metric
+ (string-append "nar/"
+ compression)
+ (if cached-narinfo-file "200" "404")
+ #:labels
+ (if cached-narinfo-file
+ (let ((system (assq-ref narinfo 'system)))
+ (if (string? system)
+ `((system . ,system))
+ '()))
+ '())))
+
+ (when cached-narinfo-file
+ (cached-compression-nar-requested-hook compression-symbol
+ filename))
+
+ (if cached-narinfo-file
+ (values (build-response
+ #:code 200
+ #:headers `((X-Accel-Redirect
+ . ,(string-append
+ "/internal/cached-nar/"
+ compression "/"
+ (uri-encode filename)))))
+ #f)
+ (values (build-response #:code 404)
+ "404"))))))
+ (((or 'HEAD 'GET) "file" name algo hash)
(guard (c ((invalid-base32-character? c)
(values (build-response #:code 404)
- "404")))
+ (if (eq? (request-method request) 'HEAD)
+ #f
+ "404"))))
(let ((hash-bytevector (nix-base32-string->bytevector hash)))
(if (and (string=? algo "sha256") (= 32 (bytevector-length
hash-bytevector)))
@@ -272,43 +467,54 @@
store-path-hash))
(selected-narinfo-file
;; TODO Select intelligently
- (first narinfo-files)))
+ (if (null? narinfo-files)
+ #f
+ (first narinfo-files)))
+ (filename
+ (and selected-narinfo-file
+ (let ((filename
+ (string-append
+ storage-root
+ (uri-decode
+ (assq-ref selected-narinfo-file 'url)))))
+ (and (file-exists? filename)
+ filename)))))
(increment-request-metric
"file"
- (if selected-narinfo-file "200" "404"))
-
- (if selected-narinfo-file
- (let* ((url
- (assq-ref selected-narinfo-file 'url))
- (filename
- (string-append storage-root
- (uri-decode url))))
-
- (serve-fixed-output-file
- (open-input-file filename)
- (assq-ref selected-narinfo-file
- 'compression)
- (lambda (nar-port bytes)
- (values `((content-type . (application/octet-stream
- (charset . "ISO-8859-1")))
- (content-length . ,bytes))
- (lambda (output-port)
- (dump-port nar-port
- output-port
- bytes)
-
- (close-port output-port))))))
+ (if filename "200" "404"))
+
+ (if filename
+ (serve-fixed-output-file
+ (open-input-file filename)
+ (assq-ref selected-narinfo-file
+ 'compression)
+ (lambda (nar-port bytes)
+ (values `((content-type . (application/octet-stream
+ (charset . "ISO-8859-1")))
+ (content-length . ,bytes))
+ (if (eq? (request-method request) 'HEAD)
+ #f
+ (lambda (output-port)
+ (dump-port nar-port
+ output-port
+ bytes)
+
+ (close-port nar-port))))))
(values (build-response #:code 404)
- "404")))
+ (if (eq? (request-method request) 'HEAD)
+ #f
+ "404"))))
(begin
(increment-request-metric "file" "404")
(values (build-response #:code 404)
- "404"))))))
+ (if (eq? (request-method request) 'HEAD)
+ #f
+ "404")))))))
- (('GET "recent-changes")
+ (((or 'HEAD 'GET) "recent-changes")
(let ((query-parameters
(or (and=> (uri-query (request-uri request))
parse-query-string)
@@ -323,7 +529,7 @@
(or
(assoc-ref query-parameters "since")
"1970-01-01 00:00:01"))))))))
- (('GET "latest-database-dump")
+ (((or 'HEAD 'GET) "latest-database-dump")
(increment-request-metric "latest-database-dump" "200")
@@ -331,20 +537,389 @@
#:code 200
#:headers '((X-Accel-Redirect . "/internal/database/nar_herder_dump.db")))
#f))
- (('GET "metrics")
+ (((or 'HEAD 'GET) "metrics")
(gc-metrics-updater)
+ (process-metrics-updater)
+ (guile-time-metrics-updater)
+ (update-database-metrics! database)
(increment-request-metric "metrics" "200")
(values (build-response
#:code 200
#:headers '((content-type . (text/plain))
(vary . (accept))))
- (lambda (port)
- (write-metrics metrics-registry
- port))))
+ (call-with-output-string
+ (lambda (port)
+ (write-metrics metrics-registry port)
+ (write-metrics plain-metrics-registry port)))))
(_
(increment-request-metric "unhandled" "404")
(values (build-response #:code 404)
"404")))))
+(define* (run-nar-herder-service opts lgr)
+ (define (download-database)
+ (let ((database-uri
+ (string->uri
+ (string-append (assq-ref opts 'mirror)
+ "/latest-database-dump"))))
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (simple-format (current-error-port)
+ "starting downloading the database\n")
+ (let ((port
+ socket
+ (open-socket-for-uri* database-uri)))
+ (http-get database-uri
+ #:port port
+ #:streaming? #t)))
+ (lambda (response body)
+ (when (not (= (response-code response) 200))
+ (error "unable to fetch database from mirror"))
+
+ (let* ((reporter (progress-reporter/file
+ (uri->string database-uri)
+ (response-content-length response)
+ (current-error-port)))
+ (port
+ (progress-report-port
+ reporter
+ body
+ #:download-size (response-content-length response))))
+
+ (call-with-output-file (assq-ref opts 'database)
+ (lambda (output-port)
+ (dump-port port output-port)))
+
+ (close-port port))
+
+ (simple-format (current-error-port)
+ "finished downloading the database\n"))))
+ #:timeout 30)))
+
+ (define metrics-registry
+ (make-metrics-registry
+ #:namespace
+ "narherder"))
+
+ (and=>
+ (assq-ref opts 'mirror)
+ (lambda (mirror)
+ (let ((database-file (assq-ref opts 'database)))
+ (if (file-exists? database-file)
+ (begin
+ ;; TODO Open the database, and check if the
+ ;; latest changes in the database are visible on
+ ;; the source to mirror. If they're not, then
+ ;; delete the database and download it to get
+ ;; back in sync
+
+ #f)
+ (download-database)))))
+
+ ;; Used elsewhere
+ (make-gauge-metric metrics-registry "recent_changes_count")
+
+ (let ((recent-changes-metric
+ (make-gauge-metric metrics-registry "recent_changes_limit")))
+ (metric-set recent-changes-metric (assq-ref opts 'recent-changes-limit)))
+
+ (define maintenance-scheduler
+ (make-scheduler #:parallelism 1))
+
+ (let* ((database (setup-database (assq-ref opts 'database)
+ metrics-registry
+ #:reader-threads
+ (assq-ref opts 'database-reader-threads)))
+ (canonical-storage (and=> (assq-ref opts 'storage)
+ canonicalize-path))
+
+ (enabled-cached-compressions
+ (let ((explicit-cached-compression-directories
+ (filter-map
+ (match-lambda
+ (('cached-compression-directory . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-directories-max-sizes
+ (filter-map
+ (match-lambda
+ (('cached-compression-directory-max-size . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-ttls
+ (filter-map
+ (match-lambda
+ (('cached-compression-ttl . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-new-ttls
+ (filter-map
+ (match-lambda
+ (('cached-compression-new-ttl . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-unused-removal-durations
+ (filter-map
+ (match-lambda
+ (('cached-compression-unused-removal-duration . details)
+ details)
+ (_ #f))
+ opts)))
+
+ (filter-map
+ (match-lambda
+ (('cached-compression . details)
+ (let ((compression
+ (assq-ref details 'type)))
+ (cons compression
+ `(,@(alist-delete 'type details)
+ (directory
+ . ,(or (assq-ref explicit-cached-compression-directories
+ compression)
+ (simple-format #f "/var/cache/nar-herder/nar/~A"
+ compression)))
+ (directory-max-size
+ . ,(assq-ref cached-compression-directories-max-sizes
+ compression))
+ (ttl
+ . ,(assq-ref cached-compression-ttls
+ compression))
+ (new-ttl
+ . ,(assq-ref cached-compression-new-ttls
+ compression))
+ (unused-removal-duration
+ . ,(assq-ref cached-compression-unused-removal-durations
+ compression))))))
+ (_ #f))
+ opts)))
+
+ (cached-compression-min-uses
+ (assq-ref opts 'cached-compression-min-uses))
+
+ (cached-compression-management-channel
+ (if (null? enabled-cached-compressions)
+ #f
+ (start-cached-compression-management-fiber
+ database
+ metrics-registry
+ (or (assq-ref opts 'cached-compression-nar-source)
+ canonical-storage)
+ enabled-cached-compressions
+ cached-compression-min-uses
+ #:cached-compression-workers
+ (assq-ref opts 'cached-compression-workers)
+ #:scheduler maintenance-scheduler)))
+
+ (maybe-trigger-creation-of-cached-nars
+ (if (null? enabled-cached-compressions)
+ #f
+ (lambda (narinfo-id)
+ (spawn-fiber
+ (lambda ()
+ (put-message cached-compression-management-channel
+ (cons 'narinfo-id narinfo-id)))
+ maintenance-scheduler))))
+
+ (cached-compression-nar-requested-hook
+ (if (null? enabled-cached-compressions)
+ #f
+ (lambda (compression filename)
+ (spawn-fiber
+ (lambda ()
+ (let* ((directory
+ (assq-ref (assq-ref enabled-cached-compressions
+ compression)
+ 'directory)))
+ (utime (string-append directory "/" filename))))
+ maintenance-scheduler)))))
+
+ (if (string=? (assq-ref opts 'database-dump)
+ "disabled")
+ (log-msg 'INFO "database dump disabled")
+ (when (not (file-exists? (assq-ref opts 'database-dump)))
+ (log-msg 'INFO "dumping database...")
+ (dump-database database (assq-ref opts 'database-dump))))
+
+ (let ((finished? (make-condition)))
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name "maintenance"))
+ (const #t))
+
+ (run-fibers
+ (lambda ()
+ (initialise-storage-metrics
+ database
+ canonical-storage
+ metrics-registry)
+
+ (start-recent-change-removal-and-database-dump-fiber
+ database
+ metrics-registry
+ (let ((filename (assq-ref opts 'database-dump)))
+ (if (string=? filename "disabled")
+ #f
+ filename))
+ (* 24 3600) ; 24 hours
+ (assq-ref opts 'recent-changes-limit))
+
+ (let ((mirror-channel
+ (and=>
+ (assq-ref opts 'mirror)
+ (lambda (mirror)
+ (start-fetch-changes-fiber
+ database
+ metrics-registry
+ canonical-storage ; might be #f, but that's fine here
+ mirror
+ cached-compression-management-channel)
+
+ (if (assq-ref opts 'storage)
+ (start-mirroring-fiber database
+ mirror
+ (assq-ref opts 'storage-limit)
+ canonical-storage
+ metrics-registry)
+ #f))))
+ (removal-channel
+ (let ((nar-removal-criteria
+ (filter-map
+ (match-lambda
+ ((key . val)
+ (if (eq? key 'storage-nar-removal-criteria)
+ val
+ #f)))
+ opts)))
+ (if (and (assq-ref opts 'storage)
+ (number? (assq-ref opts 'storage-limit))
+ (not (null? nar-removal-criteria)))
+ (start-nar-removal-fiber database
+ canonical-storage
+ (assq-ref opts 'storage-limit)
+ metrics-registry
+ nar-removal-criteria)
+ #f)))
+ (addition-channel (make-channel)))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception processing addition-channel: "
+ exn))
+ (lambda ()
+ (match (get-message addition-channel)
+ (('addition file)
+ (apply update-nar-files-metric
+ metrics-registry
+ '()
+ (if (and canonical-storage
+ (file-exists?
+ (string-append canonical-storage
+ (uri-decode file))))
+ '(#:stored-addition-count 1)
+ '(#:not-stored-addition-count 1)))
+
+ (when mirror-channel
+ (spawn-fiber
+ (lambda ()
+ (put-message mirror-channel
+ `(fetch ,file)))))
+ (when removal-channel
+ (spawn-fiber
+ (lambda ()
+ (sleep 60)
+ (removal-channel-remove-nar-from-storage removal-channel
+ file)
+ (sleep (* 5 60))
+ (removal-channel-remove-nar-from-storage removal-channel
+ file)
+ (sleep (* 15 60))
+ (removal-channel-remove-nar-from-storage removal-channel
+ file)
+ (sleep 3600)
+ (removal-channel-remove-nar-from-storage removal-channel
+ file)))))))
+ #:unwind? #t))))
+
+ (start-recent-change-listener-fiber
+ database
+ metrics-registry
+ addition-channel
+ removal-channel))
+
+ (unless (null? enabled-cached-compressions)
+ (let ((cached-compression-removal-fiber-wakeup-channel
+ (start-cached-compression-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions)))
+ (start-cached-compression-schedule-removal-fiber
+ database
+ metrics-registry
+ cached-compression-management-channel
+ enabled-cached-compressions
+ cached-compression-removal-fiber-wakeup-channel
+ (or (assq-ref opts 'narinfo-ttl)
+ ;; Default from (guix substitutes)
+ (* 36 3600)))))
+
+ (log-msg 'DEBUG "finished maintenance setup")
+ (wait finished?))
+ #:scheduler maintenance-scheduler
+ #:hz 0
+ #:parallelism 1)))
+
+ (call-with-sigint
+ (lambda ()
+ (run-fibers
+ (lambda ()
+ (let* ((current (current-scheduler))
+ (schedulers
+ (cons current (scheduler-remote-peers current))))
+ (for-each
+ (lambda (i sched)
+ (spawn-fiber
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append "fibers " (number->string i))))
+ (const #t)))
+ sched))
+ (iota (length schedulers))
+ schedulers))
+
+ (log-msg 'INFO "starting server, listening on "
+ (assq-ref opts 'host) ":" (assq-ref opts 'port))
+
+ (run-server/patched
+ (make-request-handler
+ database
+ canonical-storage
+ #:base-ttl (or (assq-ref opts 'new-narinfo-ttl)
+ (assq-ref opts 'narinfo-ttl))
+ #:base-cached-compressions-ttl
+ (or (assq-ref opts 'new-cached-compressions-narinfo-ttl)
+ (assq-ref opts 'cached-compressions-narinfo-ttl))
+ #:negative-ttl (assq-ref opts 'narinfo-negative-ttl)
+ #:logger lgr
+ #:metrics-registry metrics-registry
+ #:maybe-trigger-creation-of-cached-nars
+ maybe-trigger-creation-of-cached-nars
+ #:cached-compression-nar-requested-hook
+ cached-compression-nar-requested-hook)
+ #:host (assq-ref opts 'host)
+ #:port (assq-ref opts 'port))
+
+ (wait finished?))
+ #:hz 0
+ #:parallelism (assq-ref opts 'parallelism)))
+ finished?))))
diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm
index c017685..e85d745 100644
--- a/nar-herder/storage.scm
+++ b/nar-herder/storage.scm
@@ -18,6 +18,7 @@
(define-module (nar-herder storage)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
@@ -25,21 +26,28 @@
#:use-module (web uri)
#:use-module (web client)
#:use-module (web response)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
#:use-module (logging logger)
#:use-module (logging port-log)
#:use-module (prometheus)
#:use-module (json)
#:use-module ((guix build utils) #:select (dump-port mkdir-p))
#:use-module ((guix store) #:select (store-path-hash-part))
+ #:use-module (guix progress)
#:use-module (nar-herder utils)
#:use-module (nar-herder database)
#:export (store-item-in-local-storage?
remove-nar-files-by-hash
- get-nar-files
+ initialise-storage-metrics
+ update-nar-files-metric
+ check-storage
- start-nar-removal-thread
- start-mirroring-thread))
+ removal-channel-remove-nar-from-storage
+
+ start-nar-removal-fiber
+ start-mirroring-fiber))
(define (store-item-in-local-storage? database storage-root hash)
(let ((narinfo-files (database-select-narinfo-files database hash)))
@@ -52,17 +60,12 @@
(assq-ref file 'url)))))
narinfo-files)))
-(define (remove-nar-files-by-hash database storage-root metrics-registry
- hash)
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
-
+(define* (remove-nar-files-by-hash database storage-root metrics-registry
+ hash
+ #:key (error-unless-files-to-remove? #t))
(let ((narinfo-files (database-select-narinfo-files database hash)))
- (when (null? narinfo-files)
+ (when (and (null? narinfo-files)
+ error-unless-files-to-remove?)
(error "no narinfo files"))
(for-each
(lambda (file)
@@ -76,14 +79,21 @@
(remove-nar-from-storage storage-root
(assq-ref file 'url)))
- (metric-decrement nar-files-metric
- #:label-values
- `((stored . ,(if exists? "true" "false"))))))
+ (and=> (metrics-registry-fetch-metric metrics-registry
+ "nar_files_total")
+ (lambda (metric)
+ ;; Just update this metric if it exists, since if it
+ ;; does, it should be set to a value
+ (metric-decrement
+ metric
+ #:label-values `((stored . ,(if exists? "true" "false"))))))))
narinfo-files)))
(define (get-storage-size storage-root)
(define enter? (const #t))
(define (leaf name stat result)
+ ;; Allow other fibers to run
+ (sleep 0)
(+ result
(or (and=> (stat:blocks stat)
(lambda (blocks)
@@ -159,50 +169,234 @@
(unrecognised-files . ,(hash-map->list (lambda (key _) key)
files-hash)))))
-(define* (get-nar-files database storage-root metrics-registry
- #:key stored?)
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
-
- (let* ((index (index-storage database storage-root))
- (selected-files
- (filter
- (lambda (file)
- (eq? (assq-ref file 'stored?) stored?))
- (assq-ref index 'narinfo-files))))
-
- (let ((selected-files-count
- (length selected-files))
- (all-files-count
- (length (assq-ref index 'narinfo-files))))
-
- (metric-set nar-files-metric
- selected-files-count
- #:label-values `((stored . ,(if stored? "true" "false"))))
- (metric-set nar-files-metric
- (- all-files-count selected-files-count)
- #:label-values `((stored . ,(if stored? "false" "true")))))
-
- selected-files))
-
-(define (start-nar-removal-thread database
- storage-root storage-limit
- metrics-registry
- nar-removal-criteria)
+(define* (fold-nar-files database storage-root
+ proc init
+ #:key stored?)
+ (define stored-files-count 0)
+ (define not-stored-files-count 0)
+
+ (let ((result
+ (database-fold-all-narinfo-files
+ database
+ (lambda (nar result)
+ (let* ((url
+ (uri-decode
+ (assq-ref nar 'url)))
+ (nar-stored?
+ (if storage-root
+ (file-exists?
+ (string-append storage-root url))
+ #f)))
+
+ (if nar-stored?
+ (set! stored-files-count (1+ stored-files-count))
+ (set! not-stored-files-count (1+ not-stored-files-count)))
+
+ (if (or (eq? stored? 'both)
+ (and stored? nar-stored?)
+ (and (not stored?)
+ (not nar-stored?)))
+ (proc nar result)
+ result)))
+ init)))
+
+ (values result
+ `((stored . ,stored-files-count)
+ (not-stored . ,not-stored-files-count)))))
+
+(define* (update-nar-files-metric metrics-registry
+ nar-file-counts
+ #:key fetched-count removed-count
+ not-stored-addition-count
+ stored-addition-count)
+
+ ;; Avoid incrementing or decrementing the metric if it hasn't been
+ ;; set yet
+ (when (or (metrics-registry-fetch-metric metrics-registry
+ "nar_files_total")
+ (= (length nar-file-counts) 2))
+
+ (let ((nar-files-metric
+ (or (metrics-registry-fetch-metric metrics-registry
+ "nar_files_total")
+ (make-gauge-metric metrics-registry
+ "nar_files_total"
+ #:labels '(stored)))))
+
+ ;; Set the values if the counts are known
+ (and=>
+ (assq-ref nar-file-counts 'stored)
+ (lambda (stored-count)
+ (metric-set nar-files-metric
+ stored-count
+ #:label-values '((stored . "true")))))
+ (and=>
+ (assq-ref nar-file-counts 'not-stored)
+ (lambda (not-stored-count)
+ (metric-set nar-files-metric
+ not-stored-count
+ #:label-values '((stored . "false")))))
+
+ ;; Then adjust by the fetched or removed counts
+ (when fetched-count
+ (metric-increment nar-files-metric
+ #:by fetched-count
+ #:label-values '((stored . "true")))
+ (metric-decrement nar-files-metric
+ #:by fetched-count
+ #:label-values '((stored . "false"))))
+ (when removed-count
+ (metric-decrement nar-files-metric
+ #:by removed-count
+ #:label-values '((stored . "true")))
+ (metric-increment nar-files-metric
+ #:by removed-count
+ #:label-values '((stored . "false"))))
+
+ (when not-stored-addition-count
+ (metric-increment nar-files-metric
+ #:by not-stored-addition-count
+ #:label-values '((stored . "false"))))
+
+ (when stored-addition-count
+ (metric-increment nar-files-metric
+ #:by stored-addition-count
+ #:label-values '((stored . "true")))))))
+
+(define (initialise-storage-metrics database storage-root metrics-registry)
+ ;; Use a database transaction to block changes
+ (database-call-with-transaction
+ database
+ (lambda _
+ (log-msg 'INFO "starting to initialise storage metrics")
+ (let ((_
+ counts
+ (fold-nar-files
+ database
+ storage-root
+ (const #f)
+ #f
+ #:stored? 'both)))
+ (update-nar-files-metric
+ metrics-registry
+ counts))
+ (log-msg 'INFO "finished initialising storage metrics"))))
+
+(define (check-storage database storage-root metrics-registry)
+ (define files-count
+ (database-count-narinfo-files database))
+
+ (call-with-progress-reporter
+ (progress-reporter/bar files-count
+ (simple-format #f "checking ~A files" files-count)
+ (current-error-port))
+ (lambda (report)
+ (fold-nar-files
+ database
+ storage-root
+ (lambda (file _)
+ (let* ((full-filename
+ (string-append storage-root
+ (uri-decode (assq-ref file 'url))))
+ (file-size
+ (stat:size (stat full-filename)))
+ (database-size
+ (assq-ref file 'size)))
+ (report)
+ (unless (= file-size database-size)
+ (newline)
+ (log-msg 'WARN "file " full-filename
+ " has inconsistent size (database: "
+ database-size ", file: " file-size ")"))
+ #f))
+ #f
+ #:stored? 'both))))
+
+(define (at-most max-length lst)
+ "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise
+return its MAX-LENGTH first elements and its tail."
+ (let loop ((len 0)
+ (lst lst)
+ (result '()))
+ (match lst
+ (()
+ (values (reverse result) '()))
+ ((head . tail)
+ (if (>= len max-length)
+ (values (reverse result) lst)
+ (loop (+ 1 len) tail (cons head result)))))))
+
+(define %max-cached-connections
+ ;; Maximum number of connections kept in cache by
+ ;; 'open-connection-for-uri/cached'.
+ 16)
+
+(define open-socket-for-uri/cached
+ (let ((cache '()))
+ (lambda* (uri #:key fresh? verify-certificate?)
+ "Return a connection for URI, possibly reusing a cached connection.
+When FRESH? is true, delete any cached connections for URI and open a new one.
+Return #f if URI's scheme is 'file' or #f.
+
+When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
+ (define host (uri-host uri))
+ (define scheme (uri-scheme uri))
+ (define key (list host scheme (uri-port uri)))
+
+ (and (not (memq scheme '(file #f)))
+ (match (assoc-ref cache key)
+ (#f
+ ;; Open a new connection to URI and evict old entries from
+ ;; CACHE, if any.
+ (let ((socket
+ (open-socket-for-uri*
+ uri
+ #:verify-certificate? verify-certificate?))
+ (new-cache evicted
+ (at-most (- %max-cached-connections 1) cache)))
+ (for-each (match-lambda
+ ((_ . port)
+ (false-if-exception (close-port port))))
+ evicted)
+ (set! cache (alist-cons key socket new-cache))
+ socket))
+ (socket
+ (if (or fresh? (port-closed? socket))
+ (begin
+ (false-if-exception (close-port socket))
+ (set! cache (alist-delete key cache))
+ (open-socket-for-uri/cached uri
+ #:verify-certificate?
+ verify-certificate?))
+ (begin
+ ;; Drain input left from the previous use.
+ (drain-input socket)
+ socket))))))))
+
+(define (call-with-cached-connection uri proc)
+ (let ((port (open-socket-for-uri/cached uri)))
+ (with-throw-handler #t
+ (lambda ()
+ (proc port))
+ (lambda _
+ (close-port port)))))
+
+(define (removal-channel-remove-nar-from-storage
+ channel file)
+ (let ((reply (make-channel)))
+ (put-message channel (list 'remove-from-storage reply file))
+ (get-message reply)))
+
+(define (start-nar-removal-fiber database
+ storage-root storage-limit
+ metrics-registry
+ nar-removal-criteria)
(define storage-size-metric
(make-gauge-metric metrics-registry
"storage_size_bytes"))
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
+ (define removal-channel
+ (make-channel))
(define (check-removal-criteria nar criteria)
(define narinfo
@@ -222,21 +416,29 @@
(store-path-hash-part
(assq-ref narinfo 'store-path))
".narinfo/info"))))
- (call-with-values
- (lambda ()
- (retry-on-error
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
(lambda ()
- (http-get uri #:decode-body? #f))
- #:times 3
- #:delay 5))
- (lambda (response body)
- (and (= (response-code response)
- 200)
-
- (let ((json-body (json-string->scm
- (utf8->string body))))
- (eq? (assoc-ref json-body "stored")
- #t)))))))))
+ (retry-on-error
+ (lambda ()
+ (call-with-cached-connection uri
+ (lambda (port)
+ (http-get uri
+ #:port port
+ #:decode-body? #f
+ #:keep-alive? #t
+ #:streaming? #t))))
+ #:times 3
+ #:delay 5))
+ (lambda (response body)
+ (and (= (response-code response)
+ 200)
+
+ (let ((json-body (json->scm body)))
+ (eq? (assoc-ref json-body "stored")
+ #t))))))
+ #:timeout 30)))))
(define (nar-can-be-removed? nar)
(any (lambda (criteria)
@@ -252,43 +454,97 @@
(metric-set storage-size-metric
initial-storage-size)
- (let loop ((storage-size
- initial-storage-size)
- (stored-nar-files
- (with-time-logging "getting stored nar files"
- (get-nar-files database storage-root metrics-registry
- #:stored? #t))))
- ;; Look through items in local storage, check if the removal
- ;; criteria have been met, and if so, delete it
-
- (unless (null? stored-nar-files)
- (let ((nar-to-consider (car stored-nar-files)))
- (if (nar-can-be-removed? nar-to-consider)
- (begin
- (remove-nar-from-storage
- storage-root
- (uri-decode
- (assq-ref nar-to-consider 'url)))
-
- (metric-decrement nar-files-metric
- #:label-values '((stored . "true")))
- (metric-increment nar-files-metric
- #:label-values '((stored . "false")))
-
- (let ((storage-size-estimate
- (- storage-size
- (assq-ref nar-to-consider 'size))))
- (when (> storage-size-estimate storage-limit)
- (loop storage-size-estimate
- (cdr stored-nar-files)))))
- (loop storage-size
- (cdr stored-nar-files)))))))
- (log-msg 'INFO "finished looking for nars to remove"))
+ ;; Look through items in local storage, check if the removal
+ ;; criteria have been met, and if so, delete it
+ (let ((result
+ nar-file-counts
+ (fold-nar-files
+ database
+ storage-root
+ (lambda (nar result)
+ (match result
+ ((storage-size . removed-count)
+ (if (and (> storage-size storage-limit)
+ (nar-can-be-removed? nar))
+ (let ((response
+ (removal-channel-remove-nar-from-storage
+ removal-channel
+ (assq-ref nar 'url))))
+
+ (if (eq? response 'removed)
+ (let ((storage-size-estimate
+ (- storage-size
+ (assq-ref nar 'size))))
+ (cons storage-size-estimate
+ (+ removed-count 1)))
+ (cons storage-size
+ removed-count)))
+ (cons storage-size
+ removed-count)))))
+ (cons initial-storage-size 0)
+ #:stored? #t)))
+
+ (match result
+ ((storage-size . removed-count)
+
+ (log-msg 'INFO "finished looking for nars to remove, removed "
+ removed-count " files"))))))
(when (null? nar-removal-criteria)
(error "must be some removal criteria"))
- (call-with-new-thread
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (match (get-message removal-channel)
+ (('remove-from-storage reply file)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "nar remove from storage failed ("
+ file "): " exn)
+ (put-message reply
+ (cons 'exn exn)))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (cond
+ ((not (file-exists?
+ (string-append storage-root
+ (uri-decode file))))
+ (put-message reply 'does-not-exist))
+ ((not (nar-can-be-removed?
+ `((url . ,file))))
+ (put-message reply
+ 'removal-criteria-not-met))
+ (else
+ (remove-nar-from-storage
+ storage-root
+ (uri-decode file))
+
+ (update-nar-files-metric
+ metrics-registry
+ '()
+ #:removed-count 1)
+
+ (put-message reply 'removed))))
+ (lambda _
+ (backtrace))))
+ #:unwind? #t))
+ (('remove file)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to remove " file ": " exn))
+ (lambda ()
+ ;; TODO: Do more checking at this point
+ (remove-nar-from-storage
+ storage-root
+ (uri-decode file))
+ (update-nar-files-metric metrics-registry
+ '()
+ #:removed-count 1))
+ #:unwind? #t))))))
+
+ (spawn-fiber
(lambda ()
(while #t
(with-exception-handler
@@ -296,11 +552,12 @@
(log-msg 'ERROR "nar removal pass failed " exn))
run-removal-pass
#:unwind? #t)
+ (sleep (* 60 60 24)))))
- (sleep 300)))))
+ removal-channel)
-(define (start-mirroring-thread database mirror storage-limit storage-root
- metrics-registry)
+(define (start-mirroring-fiber database mirror storage-limit storage-root
+ metrics-registry)
(define no-storage-limit?
(not (integer? storage-limit)))
@@ -309,13 +566,6 @@
(make-gauge-metric metrics-registry
"storage_size_bytes"))
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
-
(define (fetch-file file)
(let* ((string-url
(string-append mirror file))
@@ -333,83 +583,147 @@
(when (file-exists? tmp-file-name)
(delete-file tmp-file-name))
- (call-with-values
- (lambda ()
- (http-get uri
- #:decode-body? #f
- #:streaming? #t))
- (lambda (response body)
- (unless (= (response-code response)
- 200)
- (error "unknown response code"))
-
- (call-with-output-file tmp-file-name
- (lambda (output-port)
- (dump-port body output-port)))
- (rename-file tmp-file-name
- destination-file-name)
-
- (metric-increment nar-files-metric
- #:label-values '((stored . "true")))
- (metric-decrement nar-files-metric
- #:label-values '((stored . "false")))))))
+ (with-exception-handler
+ (lambda (exn)
+ (when (file-exists? tmp-file-name)
+ (delete-file tmp-file-name))
+
+ (raise-exception exn))
+ (lambda ()
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (let ((port
+ socket
+ (open-socket-for-uri* uri)))
+ (http-get uri
+ #:port port
+ #:decode-body? #f
+ #:streaming? #t)))
+ (lambda (response body)
+ (unless (= (response-code response)
+ 200)
+ (error "unknown response code"
+ (response-code response)))
+
+ (call-with-output-file tmp-file-name
+ (lambda (output-port)
+ (dump-port body output-port))))))
+ #:timeout 30))
+ #:unwind? #t)
+
+ (rename-file tmp-file-name
+ destination-file-name)
+
+ (update-nar-files-metric
+ metrics-registry
+ '()
+ #:fetched-count 1)))
(define (download-nars initial-storage-size)
;; If there's free space, then consider downloading missing nars
- (when (< initial-storage-size storage-limit)
- (let loop ((storage-size initial-storage-size)
- (missing-nar-files (get-nar-files
- database storage-root metrics-registry
- #:stored? #f)))
- (unless (null? missing-nar-files)
- (let ((file (car missing-nar-files)))
- (log-msg 'DEBUG "considering "
- (assq-ref file 'url))
- (let ((file-bytes (assq-ref file 'size)))
- (if (or no-storage-limit?
- (< (+ storage-size file-bytes)
- storage-limit))
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "failed to fetch "
- (assq-ref file 'url)
- ": " exn)
- #f)
- (lambda ()
- (fetch-file (assq-ref file 'url))
- #t)
- #:unwind? #t)))
- (loop (if success?
- (+ storage-size file-bytes)
- storage-size)
- (cdr missing-nar-files)))
- ;; This file won't fit, so try the next one
- (loop storage-size
- (cdr missing-nar-files)))))))))
+ (if (< initial-storage-size storage-limit)
+ (let ((result
+ nar-file-counts
+ (fold-nar-files
+ database
+ storage-root
+ (lambda (file result)
+ (log-msg 'DEBUG "considering "
+ (assq-ref file 'url))
+ (match result
+ ((storage-size . fetched-count)
+ (let ((file-bytes (assq-ref file 'size)))
+ (if (or no-storage-limit?
+ (< (+ storage-size file-bytes)
+ storage-limit))
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to fetch "
+ (assq-ref file 'url)
+ ": " exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (retry-on-error
+ (lambda ()
+ (fetch-file (assq-ref file 'url)))
+ #:times 3
+ #:delay 5))
+ (lambda _
+ (backtrace)))
+ #t)
+ #:unwind? #t)))
+ (if success?
+ (cons (+ storage-size file-bytes)
+ (1+ fetched-count))
+ result))
+ ;; This file won't fit, so try the next one
+ result)))))
+ initial-storage-size
+ #:stored? #f)))
+
+ (match result
+ ((storage-size . fetched-count)
+ fetched-count)))
+ 0))
(define (fast-download-nars)
(define parallelism 3)
- (let ((missing-nar-files (get-nar-files
- database storage-root metrics-registry
- #:stored? #f)))
- (n-par-for-each
- parallelism
- (lambda (file)
- (log-msg 'DEBUG "considering "
- (assq-ref file 'url))
- (with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "failed to fetch "
- (assq-ref file 'url)
- ": " exn)
- #f)
- (lambda ()
- (fetch-file (assq-ref file 'url))
- #t)
- #:unwind? #t))
- missing-nar-files)))
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda _
+ (spawn-fiber
+ (lambda ()
+ (let loop ((fetched-count 0))
+ (match (get-message channel)
+ (('finished . reply)
+ (put-message reply fetched-count))
+ (url
+ (log-msg 'DEBUG "considering " url)
+ (loop
+ (+ fetched-count
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to fetch " url ": " exn)
+ 0)
+ (lambda ()
+ (retry-on-error
+ (lambda ()
+ (fetch-file url))
+ #:times 3
+ #:delay 5)
+ 1)
+ #:unwind? #t)))))))))
+ (iota parallelism))
+
+ (let ((result
+ nar-file-counts
+ (fold-nar-files
+ database
+ storage-root
+ (lambda (nar _)
+ (put-message channel
+ (assq-ref nar 'url))
+ #f)
+ #f
+ #:stored? #f)))
+
+ (let* ((reply-channel (make-channel))
+ (fetched-count
+ (apply
+ +
+ (map
+ (lambda _
+ (put-message channel
+ (cons 'finished reply-channel))
+ (get-message reply-channel))
+ (iota parallelism)))))
+ fetched-count))))
(define (run-mirror-pass)
(log-msg 'DEBUG "running mirror pass")
@@ -417,18 +731,38 @@
(get-storage-size storage-root))))
(metric-set storage-size-metric
initial-storage-size)
- (if no-storage-limit?
- (fast-download-nars)
- (download-nars initial-storage-size)))
- (log-msg 'DEBUG "finished mirror pass"))
-
- (call-with-new-thread
- (lambda ()
- (while #t
- (with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "mirror pass failed " exn))
- run-mirror-pass
- #:unwind? #t)
-
- (sleep 300)))))
+ (let ((fetched-count
+ (if no-storage-limit?
+ (fast-download-nars)
+ (download-nars initial-storage-size))))
+ (log-msg 'DEBUG "finished mirror pass (fetched " fetched-count " nars)"))))
+
+ (let ((channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (match (get-message channel)
+ ('full-pass
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "mirror pass failed " exn))
+ run-mirror-pass
+ #:unwind? #t))
+ (('fetch file)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to mirror " file ": " exn))
+ (lambda ()
+ (unless (file-exists?
+ (string-append storage-root
+ (uri-decode file)))
+ (fetch-file file)))
+ #:unwind? #t))))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (put-message channel 'full-pass)
+ (sleep (* 60 60 24)))))
+
+ channel))
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index 2d62360..4261b05 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -18,44 +18,37 @@
(define-module (nar-herder utils)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
#:use-module (srfi srfi-19) ; time
#:use-module (ice-9 q)
- ;; #:use-module (ice-9 ftw)
- ;; #:use-module (ice-9 popen)
#:use-module (ice-9 iconv)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
+ #:use-module (ice-9 atomic)
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 binary-ports)
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
+ #:use-module (ice-9 suspendable-ports)
+ #:use-module ((ice-9 ports internal) #:select (port-poll
+ port-read-wait-fd
+ port-write-wait-fd))
#:use-module (web uri)
#:use-module (web http)
#:use-module (web client)
#:use-module (web request)
#:use-module (web response)
#:use-module (fibers)
+ #:use-module (fibers timers)
#:use-module (fibers channels)
+ #:use-module (fibers scheduler)
#:use-module (fibers conditions)
- ;; #:use-module (gcrypt pk-crypto)
- ;; #:use-module (gcrypt hash)
- ;; #:use-module (gcrypt random)
- ;; #:use-module (json)
- ;; #:use-module (guix pki)
- ;; #:use-module (guix utils)
- ;; #:use-module (guix config)
- ;; #:use-module (guix store)
- ;; #:use-module (guix status)
- ;; #:use-module (guix base64)
- ;; #:use-module (guix scripts substitute)
- #:export (call-with-streaming-http-request
- &chunked-input-ended-prematurely
- chunked-input-ended-prematurely-error?
- make-chunked-input-port*
-
- make-worker-thread-channel
+ #:use-module (fibers operations)
+ #:use-module ((guix build syscalls)
+ #:select (set-thread-name))
+ #:export (make-worker-thread-set
call-with-worker-thread
call-with-time-logging
@@ -65,171 +58,20 @@
create-work-queue
- check-locale!))
-
-;; Chunked Responses
-(define (read-chunk-header port)
- "Read a chunk header from PORT and return the size in bytes of the
-upcoming chunk."
- (match (read-line port)
- ((? eof-object?)
- ;; Connection closed prematurely: there's nothing left to read.
- (error "chunked input ended prematurely"))
- (str
- (let ((extension-start (string-index str
- (lambda (c)
- (or (char=? c #\;)
- (char=? c #\return))))))
- (string->number (if extension-start ; unnecessary?
- (substring str 0 extension-start)
- str)
- 16)))))
-
-(define &chunked-input-ended-prematurely
- (make-exception-type '&chunked-input-error-prematurely
- &external-error
- '()))
+ check-locale!
-(define make-chunked-input-ended-prematurely-error
- (record-constructor &chunked-input-ended-prematurely))
-
-(define chunked-input-ended-prematurely-error?
- (record-predicate &chunked-input-ended-prematurely))
-
-(define* (make-chunked-input-port* port #:key (keep-alive? #f))
- (define (close)
- (unless keep-alive?
- (close-port port)))
-
- (define chunk-size 0) ;size of the current chunk
- (define remaining 0) ;number of bytes left from the current chunk
- (define finished? #f) ;did we get all the chunks?
-
- (define (read! bv idx to-read)
- (define (loop to-read num-read)
- (cond ((or finished? (zero? to-read))
- num-read)
- ((zero? remaining) ;get a new chunk
- (let ((size (read-chunk-header port)))
- (set! chunk-size size)
- (set! remaining size)
- (cond
- ((zero? size)
- (set! finished? #t)
- (get-bytevector-n port 2) ; \r\n follows the last chunk
- num-read)
- (else
- (loop to-read num-read)))))
- (else ;read from the current chunk
- (let* ((ask-for (min to-read remaining))
- (read (get-bytevector-n! port bv (+ idx num-read)
- ask-for)))
- (cond
- ((eof-object? read) ;premature termination
- (raise-exception
- (make-chunked-input-ended-prematurely-error)))
- (else
- (let ((left (- remaining read)))
- (set! remaining left)
- (when (zero? left)
- ;; We're done with this chunk; read CR and LF.
- (get-u8 port) (get-u8 port))
- (loop (- to-read read)
- (+ num-read read)))))))))
- (loop to-read 0))
-
- (make-custom-binary-input-port "chunked input port" read! #f #f close))
-
-(define* (make-chunked-output-port* port #:key (keep-alive? #f)
- (buffering 1200)
- report-bytes-sent)
- (define heap-allocated-limit
- (expt 2 20)) ;; 1MiB
-
- (define (%put-string s)
- (unless (string-null? s)
- (let* ((bv (string->bytevector s "ISO-8859-1"))
- (length (bytevector-length bv)))
- (put-string port (number->string length 16))
- (put-string port "\r\n")
- (put-bytevector port bv)
- (put-string port "\r\n")
-
- (when report-bytes-sent
- (report-bytes-sent length))
- (let* ((stats (gc-stats))
- (initial-gc-times
- (assq-ref stats 'gc-times)))
- (when (> (assq-ref stats 'heap-allocated-since-gc)
- heap-allocated-limit)
- (while (let ((updated-stats (gc-stats)))
- (= (assq-ref updated-stats 'gc-times)
- initial-gc-times))
- (gc)
- (usleep 50)))))))
-
- (define (%put-char c)
- (%put-string (list->string (list c))))
-
- (define (flush) #t)
- (define (close)
- (put-string port "0\r\n\r\n")
- (force-output port)
- (unless keep-alive?
- (close-port port)))
- (let ((ret (make-soft-port
- (vector %put-char %put-string flush #f close) "w")))
- (setvbuf ret 'block buffering)
- ret))
-
-(define* (call-with-streaming-http-request uri callback
- #:key (headers '())
- (method 'PUT)
- report-bytes-sent)
- (let* ((port (open-socket-for-uri uri))
- (request
- (build-request
- uri
- #:method method
- #:version '(1 . 1)
- #:headers `((connection close)
- (Transfer-Encoding . "chunked")
- (Content-Type . "application/octet-stream")
- ,@headers)
- #:port port)))
-
- (set-port-encoding! port "ISO-8859-1")
- (setvbuf port 'block (expt 2 13))
- (with-exception-handler
- (lambda (exp)
- (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp)
- (close-port port)
- (raise-exception exp))
- (lambda ()
- (let ((request (write-request request port)))
- (let* ((chunked-output-port
- (make-chunked-output-port*
- port
- #:buffering (expt 2 12)
- #:keep-alive? #t
- #:report-bytes-sent report-bytes-sent)))
-
- ;; A SIGPIPE will kill Guile, so ignore it
- (sigaction SIGPIPE
- (lambda (arg)
- (simple-format (current-error-port) "warning: SIGPIPE\n")))
-
- (set-port-encoding! chunked-output-port "ISO-8859-1")
- (callback chunked-output-port)
- (close-port chunked-output-port)
-
- (let ((response (read-response port)))
- (let ((body (read-response-body response)))
- (close-port port)
- (values response
- body)))))))))
-
-(define* (retry-on-error f #:key times delay ignore)
+ open-socket-for-uri*
+
+ call-with-sigint
+ run-server/patched
+
+ timeout-error?
+
+ port-read-timeout-error?
+ port-write-timeout-error?
+ with-port-timeouts))
+
+(define* (retry-on-error f #:key times delay ignore error-hook)
(let loop ((attempt 1))
(match (with-exception-handler
(lambda (exn)
@@ -259,15 +101,26 @@ upcoming chunk."
times))
(apply values return-values))
((#f . exn)
- (if (>= attempt times)
+ (if (>= attempt
+ (- times 1))
(begin
(simple-format
(current-error-port)
- "error: ~A:\n ~A,\n giving up after ~A attempts\n"
+ "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n"
f
exn
- times)
- (raise-exception exn))
+ attempt
+ times
+ delay)
+ (when error-hook
+ (error-hook attempt exn))
+ (sleep delay)
+ (simple-format
+ (current-error-port)
+ "running last retry of ~A after ~A failed attempts\n"
+ f
+ attempt)
+ (f))
(begin
(simple-format
(current-error-port)
@@ -277,71 +130,11 @@ upcoming chunk."
attempt
times
delay)
+ (when error-hook
+ (error-hook attempt exn))
(sleep delay)
(loop (+ 1 attempt))))))))
-(define delay-logging-fluid
- (make-thread-local-fluid))
-(define delay-logging-depth-fluid
- (make-thread-local-fluid 0))
-
-(define (log-delay proc duration)
- (and=> (fluid-ref delay-logging-fluid)
- (lambda (recorder)
- (recorder proc duration))))
-
-(define* (call-with-delay-logging proc #:key (threshold 1) (args '()))
- (let ((start (get-internal-real-time))
- (trace '())
- (root-logger? (eq? #f (fluid-ref delay-logging-fluid))))
-
- (define (format-seconds seconds)
- (format #f "~4f" seconds))
-
- (call-with-values
- (lambda ()
- (with-fluid* delay-logging-depth-fluid
- (+ 1 (fluid-ref delay-logging-depth-fluid))
- (lambda ()
- (if root-logger?
- (with-fluid* delay-logging-fluid
- (lambda (proc duration)
- (set! trace
- (cons (list proc
- duration
- (fluid-ref delay-logging-depth-fluid))
- trace))
- #t)
- (lambda ()
- (apply proc args)))
- (apply proc args)))))
- (lambda vals
- (let ((elapsed-seconds
- (/ (- (get-internal-real-time)
- start)
- internal-time-units-per-second)))
- (if (and (> elapsed-seconds threshold)
- root-logger?)
- (let ((lines
- (cons
- (simple-format #f "warning: delay of ~A seconds: ~A"
- (format-seconds elapsed-seconds)
- proc)
- (map (match-lambda
- ((proc duration depth)
- (string-append
- (make-string (* 2 depth) #\space)
- (simple-format #f "~A: ~A"
- (format-seconds duration)
- proc))))
- trace))))
- (display (string-append
- (string-join lines "\n")
- "\n")))
- (unless root-logger?
- ((fluid-ref delay-logging-fluid) proc elapsed-seconds))))
- (apply values vals)))))
-
(define (call-with-time-logging name thunk)
(let ((start (current-time time-utc)))
(call-with-values
@@ -364,7 +157,9 @@ upcoming chunk."
(define* (create-work-queue thread-count-parameter proc
#:key thread-start-delay
(thread-stop-delay
- (make-time time-duration 0 0)))
+ (make-time time-duration 0 0))
+ (name "unnamed")
+ priority<?)
(let ((queue (make-q))
(queue-mutex (make-mutex))
(job-available (make-condition-variable))
@@ -384,11 +179,26 @@ upcoming chunk."
(else
thread-count-parameter)))
- (define (process-job . args)
- (with-mutex queue-mutex
- (enq! queue args)
- (start-new-threads-if-necessary (get-thread-count))
- (signal-condition-variable job-available)))
+ (define process-job
+ (if priority<?
+ (lambda* (args #:key priority)
+ (with-mutex queue-mutex
+ (enq! queue (cons priority args))
+ (set-car!
+ queue
+ (stable-sort! (car queue)
+ (lambda (a b)
+ (priority<?
+ (car a)
+ (car b)))))
+ (sync-q! queue)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))
+ (lambda args
+ (with-mutex queue-mutex
+ (enq! queue args)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))))
(define (count-threads)
(with-mutex queue-mutex
@@ -403,11 +213,12 @@ upcoming chunk."
(define (list-jobs)
(with-mutex queue-mutex
- (append (list-copy
- (car queue))
+ (append (if priority<?
+ (map cdr (car queue))
+ (list-copy (car queue)))
(hash-fold (lambda (key val result)
- (or (and val
- (cons val result))
+ (if val
+ (cons val result)
result))
'()
running-job-args))))
@@ -416,16 +227,17 @@ upcoming chunk."
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
- "job raised exception: ~A\n"
- job-args))
+ "~A work queue, job raised exception ~A: ~A\n"
+ name job-args exn))
(lambda ()
(with-throw-handler #t
(lambda ()
(apply proc job-args))
(lambda (key . args)
- (simple-format (current-error-port)
- "exception when handling job: ~A ~A\n"
- key args)
+ (simple-format
+ (current-error-port)
+ "~A work queue, exception when handling job: ~A ~A\n"
+ name key args)
(backtrace))))
#:unwind? #t))
@@ -453,6 +265,13 @@ upcoming chunk."
(call-with-new-thread
(lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append name " q t "
+ (number->string thread-index))))
+ (const #t))
+
(let loop ((last-job-finished-at (current-time time-monotonic)))
(lock-mutex queue-mutex)
@@ -469,9 +288,13 @@ upcoming chunk."
;; the job in the mean time
(if (q-empty? queue)
#f
- (deq! queue))
+ (if priority<?
+ (cdr (deq! queue))
+ (deq! queue)))
#f)
- (deq! queue))))
+ (if priority<?
+ (cdr (deq! queue))
+ (deq! queue)))))
(if job-args
(begin
@@ -499,32 +322,38 @@ upcoming chunk."
(define start-new-threads-if-necessary
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
(lambda (desired-count)
- (let* ((thread-count
- (hash-count (const #t) running-job-args))
- (threads-to-start
- (- desired-count thread-count)))
- (when (> threads-to-start 0)
- (for-each
- (lambda (thread-index)
- (when (eq? (hash-ref running-job-args
- thread-index
- 'slot-free)
- 'slot-free)
- (let* ((now (current-time time-monotonic))
- (elapsed (time-difference now
- previous-thread-started-at)))
- (when (or (eq? #f thread-start-delay)
- (time>=? elapsed thread-start-delay))
- (set! previous-thread-started-at now)
- (hash-set! running-job-args
- thread-index
- #f)
- (start-thread thread-index)))))
- (iota desired-count)))))))
+ (let* ((thread-count
+ (hash-count (const #t) running-job-args))
+ (threads-to-start
+ (- desired-count thread-count)))
+ (when (> threads-to-start 0)
+ (for-each
+ (lambda (thread-index)
+ (when (eq? (hash-ref running-job-args
+ thread-index
+ 'slot-free)
+ 'slot-free)
+ (let* ((now (current-time time-monotonic))
+ (elapsed (time-difference now
+ previous-thread-started-at)))
+ (when (or (eq? #f thread-start-delay)
+ (time>=? elapsed thread-start-delay))
+ (set! previous-thread-started-at now)
+ (hash-set! running-job-args
+ thread-index
+ #f)
+ (start-thread thread-index)))))
+ (iota desired-count)))))))
(if (procedure? thread-count-parameter)
(call-with-new-thread
(lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append name " q t")))
+ (const #t))
+
(while #t
(sleep 15)
(with-mutex queue-mutex
@@ -565,83 +394,171 @@ falling back to en_US.utf8\n"
(setlocale LC_ALL ""))
#:unwind? #t))
-(define %worker-thread-args
- (make-parameter #f))
-
-(define* (make-worker-thread-channel initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- destructor
- lifetime
- (log-exception? (const #t)))
- "Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
+(define-record-type <worker-thread-set>
+ (worker-thread-set channel arguments-parameter)
+ worker-thread-set?
+ (channel worker-thread-set-channel)
+ (arguments-parameter worker-thread-set-arguments-parameter))
+
+(define* (make-worker-thread-set initializer
+ #:key (parallelism 1)
+ (delay-logger (lambda _ #f))
+ (duration-logger (const #f))
+ destructor
+ lifetime
+ (log-exception? (const #t))
+ (expire-on-exception? #f)
+ (name "unnamed"))
+ (define param
+ (make-parameter #f))
+
+ (define (initializer/safe)
+ (let ((args
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running initializer in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ initializer
+ (lambda args
+ (backtrace))))
+ #:unwind? #t)))
+
+ (if args
+ args
+ ;; never give up, just keep retrying
+ (begin
+ (sleep 5)
+ (initializer/safe)))))
+
+ (define (destructor/safe args)
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running destructor in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply destructor args)
+ #t)
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)))
+
+ (or success?
+ #t
+ (begin
+ (sleep 5)
+ (destructor/safe args)))))
+
(let ((channel (make-channel)))
(for-each
- (lambda _
+ (lambda (thread-index)
(call-with-new-thread
(lambda ()
- (let init ((args (initializer)))
- (parameterize ((%worker-thread-args args))
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append
+ name " w t "
+ (number->string thread-index))))
+ (const #t))
+
+ (let init ((args (initializer/safe)))
+ (parameterize ((param args))
(let loop ((current-lifetime lifetime))
- (match (get-message channel)
- (((? channel? reply) sent-time (? procedure? proc))
- (let ((time-delay
- (- (get-internal-real-time)
- sent-time)))
- (delay-logger (/ time-delay
- internal-time-units-per-second))
- (put-message
- reply
- (let ((start-time (get-internal-real-time)))
- (with-exception-handler
- (lambda (exn)
- (list 'worker-thread-error
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (call-with-values
- (lambda ()
- (apply proc args))
- (lambda vals
- (cons (/ (- (get-internal-real-time)
- start-time)
+ (let ((exception?
+ (match (get-message channel)
+ (((? channel? reply) sent-time (? procedure? proc))
+ (let ((time-delay
+ (- (get-internal-real-time)
+ sent-time)))
+ (delay-logger (/ time-delay
internal-time-units-per-second)
- vals))))
- (lambda args
- (when (match args
- (('%exception exn)
- (log-exception? exn))
- (_ #t))
- (simple-format
- (current-error-port)
- "worker-thread: exception: ~A\n" args)
- (backtrace)))))
- #:unwind? #t))))))
- (if (number? current-lifetime)
- (unless (< current-lifetime 0)
- (loop (if current-lifetime
- (- current-lifetime 1)
- #f)))
- (loop #f))))
+ proc)
+
+ (let* ((start-time (get-internal-real-time))
+ (response
+ (with-exception-handler
+ (lambda (exn)
+ (list 'worker-thread-error
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ exn))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (apply proc args))
+ (lambda vals
+ (cons (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ vals))))
+ (lambda args
+ (when (match args
+ (('%exception exn)
+ (log-exception? exn))
+ (_ #t))
+ (simple-format
+ (current-error-port)
+ "worker-thread: exception: ~A\n" args)
+ (backtrace)))))
+ #:unwind? #t)))
+ (put-message reply
+ response)
+
+ (match response
+ (('worker-thread-error duration _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #t)
+ ((duration . _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #f))))))))
+
+ (unless (and expire-on-exception?
+ exception?)
+ (if (number? current-lifetime)
+ (unless (< current-lifetime 0)
+ (loop (if current-lifetime
+ (- current-lifetime 1)
+ #f)))
+ (loop #f))))))
+
(when destructor
- (apply destructor args))
- (init (initializer))))))
+ (destructor/safe args))
+
+ (init (initializer/safe))))))
(iota parallelism))
- channel))
-(define* (call-with-worker-thread channel proc #:key duration-logger)
+ (worker-thread-set channel
+ param)))
+
+(define* (call-with-worker-thread record proc #:key duration-logger)
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
- (let ((args (%worker-thread-args)))
+ (let ((args ((worker-thread-set-arguments-parameter record))))
(if args
(apply proc args)
(let ((reply (make-channel)))
- (put-message channel (list reply (get-internal-real-time) proc))
+ (put-message (worker-thread-set-channel record)
+ (list reply (get-internal-real-time) proc))
(match (get-message reply)
(('worker-thread-error duration exn)
(when duration-logger
@@ -651,3 +568,220 @@ If already in the worker thread, call PROC immediately."
(when duration-logger
(duration-logger duration))
(apply values result)))))))
+
+(define* (open-socket-for-uri* uri
+ #:key (verify-certificate? #t))
+ (define tls-wrap
+ (@@ (web client) tls-wrap))
+
+ (define https?
+ (eq? 'https (uri-scheme uri)))
+
+ (define plain-uri
+ (if https?
+ (build-uri
+ 'http
+ #:userinfo (uri-userinfo uri)
+ #:host (uri-host uri)
+ #:port (or (uri-port uri) 443)
+ #:path (uri-path uri)
+ #:query (uri-query uri)
+ #:fragment (uri-fragment uri))
+ uri))
+
+ (let ((s (open-socket-for-uri plain-uri)))
+ (values
+ (if https?
+ (let ((port
+ (tls-wrap s (uri-host uri)
+ #:verify-certificate? verify-certificate?)))
+ ;; Guile/guile-gnutls don't handle the handshake happening on a non
+ ;; blocking socket, so change the behavior here.
+ (let ((flags (fcntl s F_GETFL)))
+ (fcntl s F_SETFL (logior O_NONBLOCK flags)))
+ port)
+ (let ((flags (fcntl s F_GETFL)))
+ (fcntl s F_SETFL (logior O_NONBLOCK flags))
+ s))
+ s)))
+
+;; Copied from (fibers web server)
+(define (call-with-sigint thunk cvar)
+ (let ((handler #f))
+ (dynamic-wind
+ (lambda ()
+ (set! handler
+ (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
+ thunk
+ (lambda ()
+ (if handler
+ ;; restore Scheme handler, SIG_IGN or SIG_DFL.
+ (sigaction SIGINT (car handler) (cdr handler))
+ ;; restore original C handler.
+ (sigaction SIGINT #f))))))
+
+;; This variant of run-server from the fibers library supports running
+;; multiple servers within one process.
+(define run-server/patched
+ (let ((fibers-web-server-module
+ (resolve-module '(fibers web server))))
+
+ (define set-nonblocking!
+ (module-ref fibers-web-server-module 'set-nonblocking!))
+
+ (define make-default-socket
+ (module-ref fibers-web-server-module 'make-default-socket))
+
+ (define socket-loop
+ (module-ref fibers-web-server-module 'socket-loop))
+
+ (lambda* (handler
+ #:key
+ (host #f)
+ (family AF_INET)
+ (addr (if host
+ (inet-pton family host)
+ INADDR_LOOPBACK))
+ (port 8080)
+ (socket (make-default-socket family addr port)))
+ ;; We use a large backlog by default. If the server is suddenly hit
+ ;; with a number of connections on a small backlog, clients won't
+ ;; receive confirmation for their SYN, leading them to retry --
+ ;; probably successfully, but with a large latency.
+ (listen socket 1024)
+ (set-nonblocking! socket)
+ (sigaction SIGPIPE SIG_IGN)
+ (spawn-fiber (lambda () (socket-loop socket handler))))))
+
+;; These procedure are subject to spurious wakeups.
+
+(define (readable? port)
+ "Test if PORT is writable."
+ (match (select (vector port) #() #() 0)
+ ((#() #() #()) #f)
+ ((#(_) #() #()) #t)))
+
+(define (writable? port)
+ "Test if PORT is writable."
+ (match (select #() (vector port) #() 0)
+ ((#() #() #()) #f)
+ ((#() #(_) #()) #t)))
+
+(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
+ (make-base-operation #f
+ (lambda _
+ (and (ready? (port-ready-fd port)) values))
+ (lambda (flag sched resume)
+ (define (commit)
+ (match (atomic-box-compare-and-swap! flag 'W 'S)
+ ('W (resume values))
+ ('C (commit))
+ ('S #f)))
+ (schedule-when-ready
+ sched (port-ready-fd port) commit))))
+
+(define (wait-until-port-readable-operation port)
+ "Make an operation that will succeed when PORT is readable."
+ (unless (input-port? port)
+ (error "refusing to wait forever for input on non-input port"))
+ (make-wait-operation readable? schedule-task-when-fd-readable port
+ port-read-wait-fd
+ wait-until-port-readable-operation))
+
+(define (wait-until-port-writable-operation port)
+ "Make an operation that will succeed when PORT is writable."
+ (unless (output-port? port)
+ (error "refusing to wait forever for output on non-output port"))
+ (make-wait-operation writable? schedule-task-when-fd-writable port
+ port-write-wait-fd
+ wait-until-port-writable-operation))
+
+
+
+(define &port-timeout
+ (make-exception-type '&port-timeout
+ &external-error
+ '(thunk port)))
+
+(define make-port-timeout-error
+ (record-constructor &port-timeout))
+
+(define port-timeout-error?
+ (record-predicate &port-timeout))
+
+(define &port-read-timeout
+ (make-exception-type '&port-read-timeout
+ &port-timeout
+ '()))
+
+(define make-port-read-timeout-error
+ (record-constructor &port-read-timeout))
+
+(define port-read-timeout-error?
+ (record-predicate &port-read-timeout))
+
+(define &port-write-timeout
+ (make-exception-type '&port-write-timeout
+ &port-timeout
+ '()))
+
+(define make-port-write-timeout-error
+ (record-constructor &port-write-timeout))
+
+(define port-write-timeout-error?
+ (record-predicate &port-write-timeout))
+
+(define* (with-port-timeouts thunk
+ #:key timeout
+ (read-timeout timeout)
+ (write-timeout timeout))
+ (define (no-fibers-wait thunk port mode timeout)
+ (define poll-timeout-ms 200)
+
+ ;; When the GC runs, it restarts the poll syscall, but the timeout
+ ;; remains unchanged! When the timeout is longer than the time
+ ;; between the syscall restarting, I think this renders the
+ ;; timeout useless. Therefore, this code uses a short timeout, and
+ ;; repeatedly calls poll while watching the clock to see if it has
+ ;; timed out overall.
+ (let ((timeout-internal
+ (+ (get-internal-real-time)
+ (* timeout internal-time-units-per-second))))
+ (let loop ((poll-value
+ (port-poll port mode poll-timeout-ms)))
+ (if (= poll-value 0)
+ (if (> (get-internal-real-time)
+ timeout-internal)
+ (raise-exception
+ (if (string=? mode "r")
+ (make-port-read-timeout-error thunk port)
+ (make-port-write-timeout-error thunk port)))
+ (loop (port-poll port mode poll-timeout-ms)))
+ poll-value))))
+
+ (parameterize
+ ((current-read-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-readable-operation port)
+ (wrap-operation
+ (sleep-operation read-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-read-timeout-error thunk port))))))
+ (no-fibers-wait thunk port "r" read-timeout))))
+ (current-write-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-writable-operation port)
+ (wrap-operation
+ (sleep-operation write-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-write-timeout-error thunk port))))))
+ (no-fibers-wait thunk port "w" write-timeout)))))
+ (thunk)))