aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.envrc13
-rw-r--r--.gitignore23
-rw-r--r--Makefile.am15
-rw-r--r--VERSION1
-rwxr-xr-xbootstrap.sh3
-rw-r--r--configure.ac35
-rw-r--r--guile.am22
-rw-r--r--guix-dev.scm71
-rw-r--r--nar-herder/database.scm532
-rw-r--r--nar-herder/mirror.scm82
-rw-r--r--nar-herder/server.scm99
-rw-r--r--nar-herder/storage.scm250
-rw-r--r--nar-herder/utils.scm651
-rw-r--r--nginx/conf/nginx.conf33
-rw-r--r--pre-inst-env.in13
-rw-r--r--scripts/nar-herder.in297
16 files changed, 2140 insertions, 0 deletions
diff --git a/.envrc b/.envrc
new file mode 100644
index 0000000..4e544a3
--- /dev/null
+++ b/.envrc
@@ -0,0 +1,13 @@
+# Unset the Guile paths to avoid mixing Guile major versions
+export GUILE_LOAD_PATH=""
+export GUILE_LOAD_COMPILED_PATH=""
+
+use guix --no-grafts -l guix-dev.scm
+
+export GUILE_LOAD_COMPILED_PATH="$PWD:$PWD/tests:$GUILE_LOAD_COMPILED_PATH"
+export GUILE_LOAD_PATH="$PWD:$GUILE_LOAD_PATH"
+export PATH="$PWD/scripts:$PATH"
+
+if [ -f .local.envrc ]; then
+ source_env .local.envrc
+fi
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..a112683
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,23 @@
+*.go
+Makefile.in
+Makefile
+aclocal.m4
+autom4te.cache
+config.log
+config.status
+configure
+
+build-aux/install-sh
+build-aux/missing
+
+pre-inst-env
+
+.local.envrc
+
+*.db
+*.db-shm
+*.db-wal
+
+scripts/nar-herder
+
+nar-herder/config.scm
diff --git a/Makefile.am b/Makefile.am
new file mode 100644
index 0000000..4438ba3
--- /dev/null
+++ b/Makefile.am
@@ -0,0 +1,15 @@
+include guile.am
+
+bin_SCRIPTS = \
+ scripts/nar-herder
+
+SOURCES = \
+ nar-herder/database.scm \
+ nar-herder/server.scm \
+ nar-herder/storage.scm \
+ nar-herder/mirror.scm \
+ nar-herder/utils.scm
+
+install-data-local:
+ mkdir -p "$(DESTDIR)$(docdir)";
+ cp README.org "$(DESTDIR)$(docdir)/README.org"
diff --git a/VERSION b/VERSION
new file mode 100644
index 0000000..ba66466
--- /dev/null
+++ b/VERSION
@@ -0,0 +1 @@
+0.0
diff --git a/bootstrap.sh b/bootstrap.sh
new file mode 100755
index 0000000..5af6611
--- /dev/null
+++ b/bootstrap.sh
@@ -0,0 +1,3 @@
+#! /bin/sh
+
+autoreconf --verbose --install --force
diff --git a/configure.ac b/configure.ac
new file mode 100644
index 0000000..8954c8c
--- /dev/null
+++ b/configure.ac
@@ -0,0 +1,35 @@
+AC_INIT([nar-herder], [m4_translit(m4_esyscmd([cat VERSION]),m4_newline)])
+AC_CONFIG_AUX_DIR([build-aux])
+AM_INIT_AUTOMAKE([gnu color-tests -Wall -Wno-portability foreign])
+
+GUILE_PKG([3.0])
+GUILE_PROGS
+if test "x$GUILD" = "x"; then
+ AC_MSG_ERROR(['guild' binary not found; please check your guile-3.x installation.])
+fi
+
+if test "$cross_compiling" != no; then
+ GUILE_TARGET="--target=$host_alias"
+ AC_SUBST([GUILE_TARGET])
+fi
+
+dnl Check for Guile-lzlib.
+GUILE_MODULE_AVAILABLE([have_guile_lzlib], [(lzlib)])
+if test "x$have_guile_lzlib" != "xyes"; then
+ AC_MSG_ERROR([Guile-lzlib is missing; please install it.])
+fi
+
+dnl Check for Guile-lib.
+GUILE_MODULE_AVAILABLE([have_guile_lib], [(logging logger)])
+if test "x$have_guile_lib" != "xyes"; then
+ AC_MSG_ERROR([Guile-lib is missing; please install it.])
+fi
+
+AC_CONFIG_FILES([Makefile])
+AC_CONFIG_FILES([pre-inst-env], [chmod +x pre-inst-env])
+AC_CONFIG_FILES(
+ [scripts/nar-herder],
+ [chmod +x scripts/nar-herder]
+)
+
+AC_OUTPUT
diff --git a/guile.am b/guile.am
new file mode 100644
index 0000000..7f07ca2
--- /dev/null
+++ b/guile.am
@@ -0,0 +1,22 @@
+moddir=$(datadir)/guile/site/$(GUILE_EFFECTIVE_VERSION)
+godir=$(libdir)/guile/$(GUILE_EFFECTIVE_VERSION)/site-ccache
+
+GOBJECTS = $(SOURCES:%.scm=%.go)
+
+nobase_mod_DATA = $(SOURCES) $(NOCOMP_SOURCES)
+nobase_go_DATA = $(GOBJECTS)
+
+# Make sure source files are installed first, so that the mtime of
+# installed compiled files is greater than that of installed source
+# files. See
+# <http://lists.gnu.org/archive/html/guile-devel/2010-07/msg00125.html>
+# for details.
+guile_install_go_files = install-nobase_goDATA
+$(guile_install_go_files): install-nobase_modDATA
+
+CLEANFILES = $(GOBJECTS)
+EXTRA_DIST = $(SOURCES) $(NOCOMP_SOURCES)
+GUILE_WARNINGS = -Wunbound-variable -Warity-mismatch -Wformat
+SUFFIXES = .scm .go
+.scm.go:
+ $(AM_V_GEN)$(top_builddir)/pre-inst-env $(GUILE_TOOLS) compile $(GUILE_TARGET) $(GUILE_WARNINGS) -o "$@" "$<"
diff --git a/guix-dev.scm b/guix-dev.scm
new file mode 100644
index 0000000..b4c9aba
--- /dev/null
+++ b/guix-dev.scm
@@ -0,0 +1,71 @@
+;;; nar-herder
+;;;
+;;; Copyright © 2021 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This file is part of the nar-herder.
+;;;
+;;; guix-data-service is free software; you can redistribute it and/or modify it
+;;; under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation; either version 3 of the License, or
+;;; (at your option) any later version.
+;;;
+;;; guix-data-service is distributed in the hope that it will be useful, but
+;;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with the guix-data-service. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+;;; Run the following command to enter a development environment for
+;;; the nar-herder:
+;;;
+;;; $ guix environment -l guix-dev.scm
+
+(use-modules ((guix licenses) #:prefix license:)
+ (guix packages)
+ (guix download)
+ (guix git-download)
+ (guix utils)
+ (guix build-system gnu)
+ (gnu packages)
+ (gnu packages autotools)
+ (gnu packages compression)
+ (gnu packages databases)
+ (gnu packages gnupg)
+ (gnu packages guile)
+ (gnu packages guile-xyz)
+ (gnu packages package-management)
+ (gnu packages perl)
+ (gnu packages pkg-config)
+ (gnu packages ruby)
+ (gnu packages sqlite)
+ (gnu packages texinfo)
+ (gnu packages web)
+ (srfi srfi-1))
+
+(package
+ (name "nar-herder")
+ (version "0")
+ (source #f)
+ (build-system gnu-build-system)
+ (inputs
+ `(("guix" ,guix)
+ ("guile-json" ,guile-json-4)
+ ("guile-fibers" ,guile-fibers)
+ ("guile-gcrypt" ,guile-gcrypt)
+ ("guile-readline" ,guile-readline)
+ ("guile-lzlib" ,guile-lzlib)
+ ("guile" ,@(assoc-ref (package-native-inputs guix) "guile"))
+ ("guile-sqlite3" ,guile-sqlite3)
+ ("sqlite" ,sqlite)))
+ (native-inputs
+ `(("autoconf" ,autoconf)
+ ("automake" ,automake)
+ ("pkg-config" ,pkg-config)
+ ("nginx" ,nginx)))
+ (synopsis "TODO")
+ (description "TODO")
+ (home-page "TODO")
+ (license license:gpl3+))
diff --git a/nar-herder/database.scm b/nar-herder/database.scm
new file mode 100644
index 0000000..9cb04e2
--- /dev/null
+++ b/nar-herder/database.scm
@@ -0,0 +1,532 @@
+;;; Nar Herder
+;;;
+;;; Copyright © 2021 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (nar-herder database)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
+ #:use-module (srfi srfi-19)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 format)
+ #:use-module (ice-9 threads)
+ #:use-module (ice-9 exceptions)
+ #:use-module (web uri)
+ #:use-module (sqlite3)
+ #:use-module (fibers)
+ #:use-module (guix narinfo)
+ #:use-module (guix derivations)
+ #:use-module (nar-herder utils)
+ #:export (setup-database
+
+ database-optimize
+ database-spawn-fibers
+
+ database-call-with-transaction
+
+ dump-database
+
+ database-insert-narinfo
+ database-select-narinfo-contents-by-hash
+
+ database-select-recent-changes
+ database-select-latest-recent-change-datetime
+
+ database-select-narinfo-files
+
+ database-map-all-narinfo-files))
+
+(define-record-type <database>
+ (make-database database-file reader-thread-channel writer-thread-channel)
+ database?
+ (database-file database-file)
+ (reader-thread-channel database-reader-thread-channel)
+ (writer-thread-channel database-writer-thread-channel))
+
+(define* (db-open database
+ #:key (write? #t))
+ (define flags
+ `(,@(if write?
+ (list SQLITE_OPEN_READWRITE
+ SQLITE_OPEN_CREATE)
+
+ (list SQLITE_OPEN_READONLY))
+ ,SQLITE_OPEN_NOMUTEX))
+
+ (sqlite-open database (apply logior flags)))
+
+(define (perform-initial-database-setup db)
+ (define schema
+ "
+CREATE TABLE narinfos (
+ id INTEGER PRIMARY KEY ASC,
+ store_path TEXT NOT NULL,
+ nar_hash TEXT NOT NULL,
+ nar_size INTEGER NOT NULL,
+ deriver TEXT,
+ system TEXT,
+ contents NOT NULL
+);
+
+CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32));
+
+CREATE TABLE narinfo_files (
+ narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
+ size INTEGER NOT NULL,
+ compression TEXT,
+ url TEXT NOT NULL
+);
+
+CREATE TABLE narinfo_references (
+ narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
+ store_path TEXT NOT NULL
+);
+
+CREATE TABLE tags (
+ id INTEGER PRIMARY KEY ASC,
+ key TEXT NOT NULL,
+ value TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX tags_index ON tags (key, value);
+
+CREATE TABLE narinfo_tags (
+ narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
+ tag_id INTEGER NOT NULL REFERENCES tags (id)
+);
+
+CREATE TABLE recent_changes (
+ id INTEGER PRIMARY KEY ASC,
+ datetime TEXT NOT NULL,
+ change TEXT NOT NULl,
+ data TEXT NOT NULL
+);")
+
+ (sqlite-exec db schema))
+
+(define (update-schema db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))
+
+ (sqlite-bind-arguments
+ statement
+ #:name "narinfos")
+
+ (match (sqlite-step statement)
+ (#f (perform-initial-database-setup db))
+ (_ #f))
+
+ (sqlite-finalize statement)))
+
+(define (setup-database database-file)
+ (let ((db (db-open database-file)))
+ (sqlite-exec db "PRAGMA journal_mode=WAL;")
+ (sqlite-exec db "PRAGMA optimize;")
+ (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")
+
+ (update-schema db)
+
+ (sqlite-close db))
+
+ (let ((reader-thread-channel
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file #:write? #f)))
+ (sqlite-exec db "PRAGMA busy_timeout = 5000;")
+ (list db)))
+ #:destructor
+ (lambda (db)
+ (sqlite-close db))
+ #:lifetime 50000
+
+ ;; Use a minimum of 2 and a maximum of 8 threads
+ #:parallelism
+ (min (max (current-processor-count)
+ 2)
+ 8)))
+
+ (writer-thread-channel
+ (make-worker-thread-channel
+ (lambda ()
+ (let ((db
+ (db-open database-file)))
+ (sqlite-exec db "PRAGMA busy_timeout = 5000;")
+ (sqlite-exec db "PRAGMA foreign_keys = ON;")
+ (list db)))
+ #:destructor
+ (lambda (db)
+ (db-optimize db
+ database-file)
+
+ (sqlite-close db))
+ #:lifetime 500
+
+ ;; SQLite doesn't support parallel writes
+ #:parallelism 1)))
+
+ (make-database database-file
+ reader-thread-channel
+ writer-thread-channel)))
+
+(define (db-optimize db db-filename)
+ (define (wal-size)
+ (let ((db-wal-filename
+ (string-append db-filename "-wal")))
+
+ (stat:size (stat db-wal-filename))))
+
+ (define MiB (* (expt 2 20) 1.))
+ (define wal-size-threshold
+ (* 5 MiB))
+
+ (when (> (wal-size) wal-size-threshold)
+ (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);")
+
+ (sqlite-exec db
+ "
+PRAGMA analysis_limit=1000;
+PRAGMA optimize;")))
+
+(define (database-optimize database)
+ (retry-on-error
+ (lambda ()
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (db-optimize
+ db
+ (database-file database)))))
+ #:times 5
+ #:delay 5))
+
+(define (database-spawn-fibers database)
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (sleep (* 60 5)) ; 5 minutes
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "exception when performing WAL checkpoint: ~A\n"
+ exn))
+ (lambda ()
+ (database-optimize database))
+ #:unwind? #t)))
+ #:parallel? #t))
+
+(define %current-transaction-proc
+ (make-parameter #f))
+
+(define* (database-call-with-transaction database proc
+ #:key
+ readonly?)
+ (define (run-proc-within-transaction db)
+ (if (%current-transaction-proc)
+ (proc db) ; already in transaction
+ (begin
+ (sqlite-exec db "BEGIN TRANSACTION;")
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "error: sqlite rolling back transaction\n")
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (raise-exception exn))
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (parameterize ((%current-transaction-proc proc))
+ (proc db)))
+ (lambda vals
+ (sqlite-exec db "COMMIT TRANSACTION;")
+ (apply values vals))))))))
+
+ (match (call-with-worker-thread
+ ((if readonly?
+ database-reader-thread-channel
+ database-writer-thread-channel)
+ database)
+ (lambda (db)
+ (let ((start-time (get-internal-real-time)))
+ (call-with-values
+ (lambda ()
+ (run-proc-within-transaction db))
+ (lambda vals
+ (let ((duration-seconds
+ (/ (- (get-internal-real-time) start-time)
+ internal-time-units-per-second)))
+ (when (and (not readonly?)
+ (> duration-seconds 2))
+ (display
+ (format
+ #f
+ "warning: ~a:\n took ~4f seconds in transaction\n"
+ proc
+ duration-seconds)
+ (current-error-port)))
+
+ (cons duration-seconds vals)))))))
+ ((duration vals ...)
+ (apply values vals))))
+
+(define (dump-database database name)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (sqlite-exec
+ db
+ (string-append "VACUUM INTO '" name "';")))))
+
+(define (last-insert-rowid db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "SELECT last_insert_rowid();"
+ #:cache? #t)))
+ (let ((id
+ (vector-ref (sqlite-step statement)
+ 0)))
+
+ (sqlite-reset statement)
+
+ id)))
+
+(define (database-insert-narinfo database narinfo)
+ (define (insert-narinfo-record db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO narinfos (
+ store_path, nar_hash, nar_size, deriver, system, contents
+) VALUES (
+ :store_path, :nar_hash, :nar_size, :deriver, :system, :contents
+)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:store_path (narinfo-path narinfo)
+ #:nar_hash (narinfo-hash narinfo)
+ #:nar_size (narinfo-size narinfo)
+ #:deriver (narinfo-deriver narinfo)
+ #:system (narinfo-system narinfo)
+ #:contents (narinfo-contents narinfo))
+
+ (sqlite-step statement)
+ (sqlite-reset statement)
+
+ (last-insert-rowid db)))
+
+ (define (insert-narinfo-file db narinfo-id size compression url)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO narinfo_files (
+ narinfo_id, size, compression, url
+) VALUES (
+ :narinfo_id, :size, :compression, :url
+)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:size size
+ #:compression compression
+ #:url url)
+
+ (sqlite-step statement)
+ (sqlite-reset statement)))
+
+ (define (insert-narinfo-references db narinfo-id store-path)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO narinfo_references (
+ narinfo_id, store_path
+) VALUES (
+ :narinfo_id, :store_path
+)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:store_path store-path)
+
+ (sqlite-step statement)
+ (sqlite-reset statement)))
+
+ (define (insert-change db contents)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO recent_changes (
+ datetime, change, data
+) VALUES (
+ datetime('now'), 'addition', :contents
+)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:contents contents)
+
+ (sqlite-step statement)
+ (sqlite-reset statement)))
+
+ (database-call-with-transaction
+ database
+ (lambda (db)
+ (let ((narinfo-id (insert-narinfo-record db)))
+ (let ((len (length (narinfo-uris narinfo))))
+ (for-each insert-narinfo-file
+ (make-list len db)
+ (make-list len narinfo-id)
+ (narinfo-file-sizes narinfo)
+ (narinfo-compressions narinfo)
+ (map uri-path (narinfo-uris narinfo))))
+
+ (let ((references (narinfo-references narinfo)))
+ (for-each insert-narinfo-references
+ (make-list (length references) db)
+ (make-list (length references) narinfo-id)
+ references))
+
+ (insert-change db (narinfo-contents narinfo))
+
+ narinfo-id))))
+
+(define (database-select-narinfo-contents-by-hash database hash)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:hash hash)
+
+ (match (let ((result (sqlite-step statement)))
+ (sqlite-reset statement)
+ result)
+ (#(contents) contents)
+ (_ #f))))))
+
+(define (database-select-recent-changes database after-date)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT datetime, change, data FROM recent_changes WHERE datetime >= :datetime"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:datetime after-date)
+
+ (let loop ((row (sqlite-step statement))
+ (result '()))
+ (match row
+ (#(datetime change data)
+ (loop (sqlite-step statement)
+ (cons `((datetime . ,datetime)
+ (change . ,change)
+ (data . ,data))
+ result)))
+ (#f
+ (sqlite-reset statement)
+
+ (reverse result))))))))
+
+(define (database-select-latest-recent-change-datetime database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT datetime FROM recent_changes ORDER BY datetime ASC LIMIT 1"
+ #:cache? #t)))
+
+ (let ((result (match (sqlite-step statement)
+ (#(date) date)
+ (#f #f))))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define (database-select-narinfo-files database hash)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url
+FROM narinfos
+INNER JOIN narinfo_files
+ ON narinfos.id = narinfo_files.narinfo_id
+WHERE substr(narinfos.store_path, 12, 32) = :hash"
+ #:cache? #t)))
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(size compression url)
+ `((size . ,size)
+ (compression . ,compression)
+ (url . ,url))))
+ statement)))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define (database-map-all-narinfo-files database proc)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT size, compression, url
+FROM narinfo_files"
+ #:cache? #t)))
+ (let ((result-list
+ (sqlite-fold
+ (lambda (row result)
+ (match row
+ (#(size compression url)
+ (cons (proc `((size . ,size)
+ (compression . ,compression)
+ (url . ,url)))
+ result))))
+ '()
+ statement)))
+ (sqlite-reset statement)
+
+ result-list)))))
diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm
new file mode 100644
index 0000000..01da2ba
--- /dev/null
+++ b/nar-herder/mirror.scm
@@ -0,0 +1,82 @@
+;;; Nar Herder
+;;;
+;;; Copyright © 2021 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (nar-herder mirror)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-43)
+ #:use-module (ice-9 threads)
+ #:use-module (rnrs bytevectors)
+ #:use-module (web uri)
+ #:use-module (web client)
+ #:use-module (web response)
+ #:use-module (logging logger)
+ #:use-module (json)
+ #:use-module (guix narinfo)
+ #:use-module (nar-herder database)
+ #:export (start-fetch-changes-thread))
+
+(define (start-fetch-changes-thread database mirror)
+ (define (request-recent-changes)
+ (define latest-recent-change
+ (database-select-latest-recent-change-datetime database))
+
+ (define processed-recent-changes
+ (database-select-recent-changes database latest-recent-change))
+
+ (call-with-values
+ (lambda ()
+ (http-get
+ (string->uri
+ (string-append mirror "/recent-changes"
+ (if latest-recent-change
+ (string-append "?since=" latest-recent-change)
+ "")))
+ #:decode-body? #f))
+ (lambda (response body)
+ (if (= (response-code response) 200)
+ (let ((json-body (json-string->scm
+ (utf8->string body))))
+ (vector-for-each
+ (lambda (_ change-details)
+ ;; Guard against processing changes that have already
+ ;; been processed
+ (unless (member processed-recent-changes change-details)
+ (let ((change (assoc-ref change-details "change")))
+ (cond
+ ((string=? change "addition")
+ (let ((narinfo
+ (call-with-input-string
+ (assoc-ref change-details "data")
+ (lambda (port)
+ (read-narinfo port
+ "https://narherderdummyvalue")))))
+ (log-msg 'INFO "processing addition change for "
+ (first (narinfo-uris narinfo)))
+ (database-insert-narinfo database
+ narinfo)))
+ (else
+ (error "unimplemented"))))))
+ (assoc-ref json-body "recent_changes")))
+ (error "unknown response code")))))
+
+ (call-with-new-thread
+ (lambda ()
+ (while #t
+ (request-recent-changes)
+
+ (sleep 60)))))
diff --git a/nar-herder/server.scm b/nar-herder/server.scm
new file mode 100644
index 0000000..6a77645
--- /dev/null
+++ b/nar-herder/server.scm
@@ -0,0 +1,99 @@
+;;; Nar Herder
+;;;
+;;; Copyright © 2021 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (nar-herder server)
+ #:use-module (ice-9 match)
+ #:use-module (web uri)
+ #:use-module (web response)
+ #:use-module (web request)
+ #:use-module (json)
+ #:use-module (nar-herder database)
+ #:use-module (nar-herder storage)
+ #:export (make-request-handler))
+
+(define* (render-json json #:key (extra-headers '())
+ (code 200))
+ (values (build-response
+ #:code code
+ #:headers (append extra-headers
+ '((content-type . (application/json))
+ (vary . (accept)))))
+ (lambda (port)
+ (scm->json json port))))
+
+(define (parse-query-string query)
+ (let lp ((lst (map uri-decode (string-split query (char-set #\& #\=)))))
+ (match lst
+ ((key value . rest)
+ (cons (cons key value) (lp rest)))
+ (("") '())
+ (() '()))))
+
+(define (make-request-handler database storage-root)
+ (define (narinfo? str)
+ (and
+ (= (string-length str) 40)
+ (string-suffix? ".narinfo" str)))
+
+ (lambda (request body)
+ (simple-format (current-error-port)
+ "~A ~A\n"
+ (request-method request)
+ (uri-path (request-uri request)))
+
+ (match (cons (request-method request)
+ (split-and-decode-uri-path
+ (uri-path (request-uri request))))
+ (('GET (? narinfo? narinfo))
+ (let ((narinfo-contents
+ (database-select-narinfo-contents-by-hash
+ database
+ (string-take narinfo 32))))
+ (if narinfo-contents
+ (values '((content-type . (text/plain)))
+ narinfo-contents)
+ (values (build-response #:code 404)
+ "404"))))
+ (('GET (? narinfo? narinfo) "info")
+ ;; TODO Check if narinfo exists?
+ (render-json
+ `((stored . ,(store-item-in-local-storage?
+ database
+ storage-root
+ (string-take narinfo 32))))))
+ (('GET "recent-changes")
+ (let ((query-parameters
+ (or (and=> (uri-query (request-uri request))
+ parse-query-string)
+ '())))
+ (render-json
+ `((recent_changes . ,(list->vector
+ (database-select-recent-changes
+ database
+ (or
+ (assoc-ref query-parameters "since")
+ "1970-01-01 00:00:01"))))))))
+ (('GET "latest-database-dump")
+ (values (build-response
+ #:code 200
+ #:headers '((X-Accel-Redirect . "/internal/database/nar_herder_dump.db")))
+ #f))
+ (_
+ (values (build-response #:code 404)
+ "404")))))
+
diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm
new file mode 100644
index 0000000..96c84ed
--- /dev/null
+++ b/nar-herder/storage.scm
@@ -0,0 +1,250 @@
+;;; Nar Herder
+;;;
+;;; Copyright © 2021 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (nar-herder storage)
+ #:use-module (srfi srfi-1)
+ #:use-module (ice-9 ftw)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 threads)
+ #:use-module (web uri)
+ #:use-module (web client)
+ #:use-module (web response)
+ #:use-module (logging logger)
+ #:use-module (logging port-log)
+ #:use-module (json)
+ #:use-module ((guix build utils) #:select (dump-port mkdir-p))
+ #:use-module (nar-herder database)
+ #:export (store-item-in-local-storage?
+
+ start-nar-removal-thread
+ start-mirroring-thread))
+
+(define (store-item-in-local-storage? database storage-root hash)
+ (let ((narinfo-files (database-select-narinfo-files database hash)))
+ (every (lambda (file)
+ (file-exists?
+ (string-append storage-root "/" file)))
+ narinfo-files)))
+
+(define (get-storage-size storage-root)
+ (define enter? (const #t))
+ (define (leaf name stat result)
+ (+ result
+ (or (and=> (stat:blocks stat)
+ (lambda (blocks)
+ (* blocks 512)))
+ (stat:size stat))))
+
+ (define (down name stat result) result)
+ (define (up name stat result) result)
+ (define (skip name stat result) result)
+
+ (define (error name stat errno result)
+ (format (current-error-port) "warning: ~a: ~a~%"
+ name (strerror errno))
+ result)
+
+ (file-system-fold enter? leaf down up skip error
+ 0 ; Start counting at 0
+ storage-root))
+
+(define (remove-nar-from-storage storage-root nar-file-url)
+ (delete-file
+ (string-append storage-root "/" nar-file-url)))
+
+(define (index-storage database storage-root)
+ (define (get-files-hash)
+ (define (get-file-strings prefix children)
+ (append-map
+ (match-lambda
+ ((name stat)
+ (list (string-append prefix "/" name)))
+ ((name stat children ...)
+ (get-file-strings (string-append prefix "/" name)
+ children)))
+ children))
+
+ (let* ((lst
+ (match (third (file-system-tree storage-root))
+ ((name stat children ...)
+ (get-file-strings (string-append "/" name)
+ children))))
+ (hash-table
+ (make-hash-table (length lst))))
+
+ (for-each (lambda (s)
+ (hash-set! hash-table s #t))
+ lst)
+
+ hash-table))
+
+ (let* ((files-hash
+ (get-files-hash))
+ (narinfo-files
+ (database-map-all-narinfo-files
+ database
+ (lambda (file)
+ (let* ((url (assq-ref file 'url))
+ (stored? (hash-ref files-hash url)))
+ (when stored?
+ ;; Delete the hash entry, so
+ ;; that the hash at the end will
+ ;; just contain the unrecognised
+ ;; files
+ (hash-remove! files-hash url))
+
+ `(,@file
+ (stored? . ,stored?)))))))
+ `((narinfo-files . ,narinfo-files)
+ (unrecognised-files . ,(hash-map->list (lambda (key _) key)
+ files-hash)))))
+
+(define (start-nar-removal-thread database
+ storage-root storage-limit
+ nar-removal-criteria)
+
+ (define (check-removal-criteria nar criteria)
+ (match criteria
+ (('and and-criteria ...)
+ (every check-removal-criteria criteria))
+ (('stored-on url)
+ (let ((uri (string->uri
+ (string-append url (assq-ref nar 'url)))))
+ (call-with-values (http-get uri)
+ (lambda (response body)
+ (and (= (response-code response)
+ 200)
+
+ (let ((json-body (json-string->scm body)))
+ (eq? (assoc-ref json-body "stored")
+ #t)))))))))
+
+ (define (nar-can-be-removed? nar)
+ (any (lambda (criteria)
+ (check-removal-criteria nar criteria))
+ nar-removal-criteria))
+
+ (define (get-stored-nar-files)
+ (let ((index (index-storage database storage-root)))
+ (filter
+ (lambda (file)
+ (assq-ref file 'stored?))
+ (assq-ref index 'narinfo-files))))
+
+ (define (run-removal-pass)
+ (let loop ((storage-size (get-storage-size storage-root))
+ (stored-nar-files (get-stored-nar-files)))
+ ;; Look through items in local storage, check if the removal
+ ;; criteria have been met, and if so, delete it
+
+ (unless (null? stored-nar-files)
+ (let ((nar-to-consider (car stored-nar-files)))
+ (if (nar-can-be-removed? nar-to-consider)
+ (let ((removed-bytes
+ (remove-nar-from-storage storage-root nar-to-consider)))
+
+ (let ((storage-size-estimate
+ (- storage-size
+ removed-bytes)))
+ (when (> storage-size-estimate storage-limit)
+ (loop storage-size-estimate
+ (cdr stored-nar-files)))))
+ (loop storage-size
+ (cdr stored-nar-files)))))))
+
+ (call-with-new-thread
+ (lambda ()
+ (while #t
+ (run-removal-pass)
+
+ (sleep 300)))))
+
+(define (start-mirroring-thread database mirror storage-limit storage-root)
+ (define (get-missing-nar-files)
+ (let ((index (index-storage database storage-root)))
+ (filter
+ (lambda (file)
+ (not (assq-ref file 'stored?)))
+ (assq-ref index 'narinfo-files))))
+
+ (define (fetch-file file)
+ (let* ((string-url
+ (string-append mirror file))
+ (uri
+ (string->uri (string-append mirror file)))
+ (destination-file-name
+ (string-append storage-root file))
+ (tmp-file-name
+ (string-append destination-file-name "-tmp")))
+ (log-msg 'INFO "fetching " string-url)
+
+ (mkdir-p (dirname destination-file-name))
+
+ (when (file-exists? tmp-file-name)
+ (delete-file tmp-file-name))
+
+ (call-with-values
+ (lambda ()
+ (http-get uri
+ #:decode-body? #f
+ #:streaming? #t))
+ (lambda (response body)
+ (unless (= (response-code response)
+ 200)
+ (error "unknown response code"))
+
+ (call-with-output-file tmp-file-name
+ (lambda (output-port)
+ (dump-port body output-port)))
+ (rename-file tmp-file-name
+ destination-file-name)))))
+
+ (define (run-mirror-pass)
+ (define no-storage-limit?
+ (not (integer? storage-limit)))
+
+ (log-msg 'DEBUG "running mirror pass")
+ (let ((initial-storage-size (get-storage-size storage-root)))
+ ;; If there's free space, then consider downloading missing nars
+ (when (or no-storage-limit?
+ (< initial-storage-size storage-limit))
+ (let loop ((storage-size initial-storage-size)
+ (missing-nar-files (get-missing-nar-files)))
+ (unless (null? missing-nar-files)
+ (let ((file (car missing-nar-files)))
+ (log-msg 'DEBUG "considering "
+ (assq-ref file 'url))
+ (let ((file-bytes (assq-ref file 'size)))
+ (if (or no-storage-limit?
+ (< (+ storage-size file-bytes)
+ storage-limit))
+ (begin
+ (fetch-file (assq-ref file 'url))
+ (loop (+ storage-size file-bytes)
+ (cdr missing-nar-files)))
+ ;; This file won't fit, so try the next one
+ (loop storage-size
+ (cdr missing-nar-files))))))))))
+
+ (call-with-new-thread
+ (lambda ()
+ (while #t
+ (run-mirror-pass)
+ (log-msg 'DEBUG "finished mirror pass")
+
+ (sleep 300)))))
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
new file mode 100644
index 0000000..a0a5171
--- /dev/null
+++ b/nar-herder/utils.scm
@@ -0,0 +1,651 @@
+;;; Nar Herder
+;;;
+;;; Copyright © 2021 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (nar-herder utils)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-19) ; time
+ #:use-module (ice-9 q)
+ ;; #:use-module (ice-9 ftw)
+ ;; #:use-module (ice-9 popen)
+ #:use-module (ice-9 iconv)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 format)
+ #:use-module (ice-9 threads)
+ #:use-module (ice-9 textual-ports)
+ #:use-module (ice-9 rdelim)
+ #:use-module (ice-9 binary-ports)
+ #:use-module (ice-9 exceptions)
+ #:use-module (rnrs bytevectors)
+ #:use-module (web uri)
+ #:use-module (web http)
+ #:use-module (web client)
+ #:use-module (web request)
+ #:use-module (web response)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers conditions)
+ ;; #:use-module (gcrypt pk-crypto)
+ ;; #:use-module (gcrypt hash)
+ ;; #:use-module (gcrypt random)
+ ;; #:use-module (json)
+ ;; #:use-module (guix pki)
+ ;; #:use-module (guix utils)
+ ;; #:use-module (guix config)
+ ;; #:use-module (guix store)
+ ;; #:use-module (guix status)
+ ;; #:use-module (guix base64)
+ ;; #:use-module (guix scripts substitute)
+ #:export (call-with-streaming-http-request
+ &chunked-input-ended-prematurely
+ chunked-input-ended-prematurely-error?
+ make-chunked-input-port*
+
+ make-worker-thread-channel
+ call-with-worker-thread
+
+ retry-on-error
+
+ create-work-queue
+
+ check-locale!))
+
+;; Chunked Responses
+(define (read-chunk-header port)
+ "Read a chunk header from PORT and return the size in bytes of the
+upcoming chunk."
+ (match (read-line port)
+ ((? eof-object?)
+ ;; Connection closed prematurely: there's nothing left to read.
+ (error "chunked input ended prematurely"))
+ (str
+ (let ((extension-start (string-index str
+ (lambda (c)
+ (or (char=? c #\;)
+ (char=? c #\return))))))
+ (string->number (if extension-start ; unnecessary?
+ (substring str 0 extension-start)
+ str)
+ 16)))))
+
+(define &chunked-input-ended-prematurely
+ (make-exception-type '&chunked-input-error-prematurely
+ &external-error
+ '()))
+
+(define make-chunked-input-ended-prematurely-error
+ (record-constructor &chunked-input-ended-prematurely))
+
+(define chunked-input-ended-prematurely-error?
+ (record-predicate &chunked-input-ended-prematurely))
+
+(define* (make-chunked-input-port* port #:key (keep-alive? #f))
+ (define (close)
+ (unless keep-alive?
+ (close-port port)))
+
+ (define chunk-size 0) ;size of the current chunk
+ (define remaining 0) ;number of bytes left from the current chunk
+ (define finished? #f) ;did we get all the chunks?
+
+ (define (read! bv idx to-read)
+ (define (loop to-read num-read)
+ (cond ((or finished? (zero? to-read))
+ num-read)
+ ((zero? remaining) ;get a new chunk
+ (let ((size (read-chunk-header port)))
+ (set! chunk-size size)
+ (set! remaining size)
+ (cond
+ ((zero? size)
+ (set! finished? #t)
+ (get-bytevector-n port 2) ; \r\n follows the last chunk
+ num-read)
+ (else
+ (loop to-read num-read)))))
+ (else ;read from the current chunk
+ (let* ((ask-for (min to-read remaining))
+ (read (get-bytevector-n! port bv (+ idx num-read)
+ ask-for)))
+ (cond
+ ((eof-object? read) ;premature termination
+ (raise-exception
+ (make-chunked-input-ended-prematurely-error)))
+ (else
+ (let ((left (- remaining read)))
+ (set! remaining left)
+ (when (zero? left)
+ ;; We're done with this chunk; read CR and LF.
+ (get-u8 port) (get-u8 port))
+ (loop (- to-read read)
+ (+ num-read read)))))))))
+ (loop to-read 0))
+
+ (make-custom-binary-input-port "chunked input port" read! #f #f close))
+
+(define* (make-chunked-output-port* port #:key (keep-alive? #f)
+ (buffering 1200)
+ report-bytes-sent)
+ (define heap-allocated-limit
+ (expt 2 20)) ;; 1MiB
+
+ (define (%put-string s)
+ (unless (string-null? s)
+ (let* ((bv (string->bytevector s "ISO-8859-1"))
+ (length (bytevector-length bv)))
+ (put-string port (number->string length 16))
+ (put-string port "\r\n")
+ (put-bytevector port bv)
+ (put-string port "\r\n")
+
+ (when report-bytes-sent
+ (report-bytes-sent length))
+ (let* ((stats (gc-stats))
+ (initial-gc-times
+ (assq-ref stats 'gc-times)))
+ (when (> (assq-ref stats 'heap-allocated-since-gc)
+ heap-allocated-limit)
+ (while (let ((updated-stats (gc-stats)))
+ (= (assq-ref updated-stats 'gc-times)
+ initial-gc-times))
+ (gc)
+ (usleep 50)))))))
+
+ (define (%put-char c)
+ (%put-string (list->string (list c))))
+
+ (define (flush) #t)
+ (define (close)
+ (put-string port "0\r\n\r\n")
+ (force-output port)
+ (unless keep-alive?
+ (close-port port)))
+ (let ((ret (make-soft-port
+ (vector %put-char %put-string flush #f close) "w")))
+ (setvbuf ret 'block buffering)
+ ret))
+
+(define* (call-with-streaming-http-request uri callback
+ #:key (headers '())
+ (method 'PUT)
+ report-bytes-sent)
+ (let* ((port (open-socket-for-uri uri))
+ (request
+ (build-request
+ uri
+ #:method method
+ #:version '(1 . 1)
+ #:headers `((connection close)
+ (Transfer-Encoding . "chunked")
+ (Content-Type . "application/octet-stream")
+ ,@headers)
+ #:port port)))
+
+ (set-port-encoding! port "ISO-8859-1")
+ (setvbuf port 'block (expt 2 13))
+ (with-exception-handler
+ (lambda (exp)
+ (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp)
+ (close-port port)
+ (raise-exception exp))
+ (lambda ()
+ (let ((request (write-request request port)))
+ (let* ((chunked-output-port
+ (make-chunked-output-port*
+ port
+ #:buffering (expt 2 12)
+ #:keep-alive? #t
+ #:report-bytes-sent report-bytes-sent)))
+
+ ;; A SIGPIPE will kill Guile, so ignore it
+ (sigaction SIGPIPE
+ (lambda (arg)
+ (simple-format (current-error-port) "warning: SIGPIPE\n")))
+
+ (set-port-encoding! chunked-output-port "ISO-8859-1")
+ (callback chunked-output-port)
+ (close-port chunked-output-port)
+
+ (let ((response (read-response port)))
+ (let ((body (read-response-body response)))
+ (close-port port)
+ (values response
+ body)))))))))
+
+(define* (retry-on-error f #:key times delay ignore)
+ (let loop ((attempt 1))
+ (match (with-exception-handler
+ (lambda (exn)
+ (when (cond
+ ((list? ignore)
+ (any (lambda (test)
+ (test exn))
+ ignore))
+ ((procedure? ignore)
+ (ignore exn))
+ (else #f))
+ (raise-exception exn))
+
+ (cons #f exn))
+ (lambda ()
+ (call-with-values f
+ (lambda vals
+ (cons #t vals))))
+ #:unwind? #t)
+ ((#t . return-values)
+ (when (> attempt 1)
+ (simple-format
+ (current-error-port)
+ "retry success: ~A\n on attempt ~A of ~A\n"
+ f
+ attempt
+ times))
+ (apply values return-values))
+ ((#f . exn)
+ (if (>= attempt times)
+ (begin
+ (simple-format
+ (current-error-port)
+ "error: ~A:\n ~A,\n giving up after ~A attempts\n"
+ f
+ exn
+ times)
+ (raise-exception exn))
+ (begin
+ (simple-format
+ (current-error-port)
+ "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n"
+ f
+ exn
+ attempt
+ times
+ delay)
+ (sleep delay)
+ (loop (+ 1 attempt))))))))
+
+(define delay-logging-fluid
+ (make-thread-local-fluid))
+(define delay-logging-depth-fluid
+ (make-thread-local-fluid 0))
+
+(define (log-delay proc duration)
+ (and=> (fluid-ref delay-logging-fluid)
+ (lambda (recorder)
+ (recorder proc duration))))
+
+(define* (call-with-delay-logging proc #:key (threshold 1) (args '()))
+ (let ((start (get-internal-real-time))
+ (trace '())
+ (root-logger? (eq? #f (fluid-ref delay-logging-fluid))))
+
+ (define (format-seconds seconds)
+ (format #f "~4f" seconds))
+
+ (call-with-values
+ (lambda ()
+ (with-fluid* delay-logging-depth-fluid
+ (+ 1 (fluid-ref delay-logging-depth-fluid))
+ (lambda ()
+ (if root-logger?
+ (with-fluid* delay-logging-fluid
+ (lambda (proc duration)
+ (set! trace
+ (cons (list proc
+ duration
+ (fluid-ref delay-logging-depth-fluid))
+ trace))
+ #t)
+ (lambda ()
+ (apply proc args)))
+ (apply proc args)))))
+ (lambda vals
+ (let ((elapsed-seconds
+ (/ (- (get-internal-real-time)
+ start)
+ internal-time-units-per-second)))
+ (if (and (> elapsed-seconds threshold)
+ root-logger?)
+ (let ((lines
+ (cons
+ (simple-format #f "warning: delay of ~A seconds: ~A"
+ (format-seconds elapsed-seconds)
+ proc)
+ (map (match-lambda
+ ((proc duration depth)
+ (string-append
+ (make-string (* 2 depth) #\space)
+ (simple-format #f "~A: ~A"
+ (format-seconds duration)
+ proc))))
+ trace))))
+ (display (string-append
+ (string-join lines "\n")
+ "\n")))
+ (unless root-logger?
+ ((fluid-ref delay-logging-fluid) proc elapsed-seconds))))
+ (apply values vals)))))
+
+(define (call-with-time-logging name thunk)
+ (let ((start (current-time time-utc)))
+ (call-with-values
+ thunk
+ (lambda vals
+ (let* ((end (current-time time-utc))
+ (elapsed (time-difference end start)))
+ (display
+ (format #f
+ "~a took ~f seconds~%"
+ name
+ (+ (time-second elapsed)
+ (/ (time-nanosecond elapsed) 1e9))))
+ (apply values vals))))))
+
+(define-syntax-rule (with-time-logging name exp ...)
+ "Log under NAME the time taken to evaluate EXP."
+ (call-with-time-logging name (lambda () exp ...)))
+
+(define* (create-work-queue thread-count-parameter proc
+ #:key thread-start-delay
+ (thread-stop-delay
+ (make-time time-duration 0 0)))
+ (let ((queue (make-q))
+ (queue-mutex (make-mutex))
+ (job-available (make-condition-variable))
+ (running-job-args (make-hash-table)))
+
+ (define get-thread-count
+ (cond
+ ((number? thread-count-parameter)
+ (const thread-count-parameter))
+ ((eq? thread-count-parameter #f)
+ ;; Run one thread per job
+ (lambda ()
+ (+ (q-length queue)
+ (hash-count (lambda (index val)
+ (list? val))
+ running-job-args))))
+ (else
+ thread-count-parameter)))
+
+ (define (process-job . args)
+ (with-mutex queue-mutex
+ (enq! queue args)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))
+
+ (define (count-threads)
+ (with-mutex queue-mutex
+ (hash-count (const #t) running-job-args)))
+
+ (define (count-jobs)
+ (with-mutex queue-mutex
+ (+ (q-length queue)
+ (hash-count (lambda (index val)
+ (list? val))
+ running-job-args))))
+
+ (define (list-jobs)
+ (with-mutex queue-mutex
+ (append (list-copy
+ (car queue))
+ (hash-fold (lambda (key val result)
+ (or (and val
+ (cons val result))
+ result))
+ '()
+ running-job-args))))
+
+ (define (thread-process-job job-args)
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "job raised exception: ~A\n"
+ job-args))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply proc job-args))
+ (lambda (key . args)
+ (simple-format (current-error-port)
+ "exception when handling job: ~A ~A\n"
+ key args)
+ (backtrace))))
+ #:unwind? #t))
+
+ (define (start-thread thread-index)
+ (define (too-many-threads?)
+ (let ((running-jobs-count
+ (hash-count (lambda (index val)
+ (list? val))
+ running-job-args))
+ (desired-thread-count (get-thread-count)))
+
+ (>= running-jobs-count
+ desired-thread-count)))
+
+ (define (thread-idle-for-too-long? last-job-finished-at)
+ (time>=?
+ (time-difference (current-time time-monotonic)
+ last-job-finished-at)
+ thread-stop-delay))
+
+ (define (stop-thread)
+ (hash-remove! running-job-args
+ thread-index)
+ (unlock-mutex queue-mutex))
+
+ (call-with-new-thread
+ (lambda ()
+ (let loop ((last-job-finished-at (current-time time-monotonic)))
+ (lock-mutex queue-mutex)
+
+ (if (too-many-threads?)
+ (stop-thread)
+ (let ((job-args
+ (if (q-empty? queue)
+ ;; #f from wait-condition-variable indicates a timeout
+ (if (wait-condition-variable
+ job-available
+ queue-mutex
+ (+ 9 (time-second (current-time))))
+ ;; Another thread could have taken
+ ;; the job in the mean time
+ (if (q-empty? queue)
+ #f
+ (deq! queue))
+ #f)
+ (deq! queue))))
+
+ (if job-args
+ (begin
+ (hash-set! running-job-args
+ thread-index
+ job-args)
+
+ (unlock-mutex queue-mutex)
+ (thread-process-job job-args)
+
+ (with-mutex queue-mutex
+ (hash-set! running-job-args
+ thread-index
+ #f))
+
+ (loop (current-time time-monotonic)))
+ (if (thread-idle-for-too-long? last-job-finished-at)
+ (stop-thread)
+ (begin
+ (unlock-mutex queue-mutex)
+
+ (loop last-job-finished-at))))))))))
+
+
+ (define start-new-threads-if-necessary
+ (let ((previous-thread-started-at (make-time time-monotonic 0 0)))
+ (lambda (desired-count)
+ (let* ((thread-count
+ (hash-count (const #t) running-job-args))
+ (threads-to-start
+ (- desired-count thread-count)))
+ (when (> threads-to-start 0)
+ (for-each
+ (lambda (thread-index)
+ (when (eq? (hash-ref running-job-args
+ thread-index
+ 'slot-free)
+ 'slot-free)
+ (let* ((now (current-time time-monotonic))
+ (elapsed (time-difference now
+ previous-thread-started-at)))
+ (when (or (eq? #f thread-start-delay)
+ (time>=? elapsed thread-start-delay))
+ (set! previous-thread-started-at now)
+ (hash-set! running-job-args
+ thread-index
+ #f)
+ (start-thread thread-index)))))
+ (iota desired-count)))))))
+
+ (if (procedure? thread-count-parameter)
+ (call-with-new-thread
+ (lambda ()
+ (while #t
+ (sleep 15)
+ (with-mutex queue-mutex
+ (let ((idle-threads (hash-count (lambda (index val)
+ (eq? #f val))
+ running-job-args)))
+ (when (= 0 idle-threads)
+ (start-new-threads-if-necessary (get-thread-count))))))))
+ (start-new-threads-if-necessary (get-thread-count)))
+
+ (values process-job count-jobs count-threads list-jobs)))
+
+(define (check-locale!)
+ (with-exception-handler
+ (lambda (exn)
+ (display
+ (simple-format
+ #f
+ "exception when calling setlocale: ~A
+falling back to en_US.utf8\n"
+ exn)
+ (current-error-port))
+
+ (with-exception-handler
+ (lambda (exn)
+ (display
+ (simple-format
+ #f
+ "exception when calling setlocale with en_US.utf8: ~A\n"
+ exn)
+ (current-error-port))
+
+ (exit 1))
+ (lambda _
+ (setlocale LC_ALL "en_US.utf8"))
+ #:unwind? #t))
+ (lambda _
+ (setlocale LC_ALL ""))
+ #:unwind? #t))
+
+(define %worker-thread-args
+ (make-parameter #f))
+
+(define* (make-worker-thread-channel initializer
+ #:key (parallelism 1)
+ (delay-logger (lambda _ #f))
+ destructor
+ lifetime
+ (log-exception? (const #t)))
+ "Return a channel used to offload work to a dedicated thread. ARGS are the
+arguments of the worker thread procedure."
+ (parameterize (((@@ (fibers internal) current-fiber) #f))
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda _
+ (call-with-new-thread
+ (lambda ()
+ (let init ((args (initializer)))
+ (parameterize ((%worker-thread-args args))
+ (let loop ((current-lifetime lifetime))
+ (match (get-message channel)
+ (((? channel? reply) sent-time (? procedure? proc))
+ (let ((time-delay
+ (- (get-internal-real-time)
+ sent-time)))
+ (delay-logger (/ time-delay
+ internal-time-units-per-second))
+ (put-message
+ reply
+ (let ((start-time (get-internal-real-time)))
+ (with-exception-handler
+ (lambda (exn)
+ (list 'worker-thread-error
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ exn))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (apply proc args))
+ (lambda vals
+ (cons (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ vals))))
+ (lambda args
+ (when (match args
+ (('%exception exn)
+ (log-exception? exn))
+ (_ #t))
+ (simple-format
+ (current-error-port)
+ "worker-thread: exception: ~A\n" args)
+ (backtrace)))))
+ #:unwind? #t))))))
+ (if (number? current-lifetime)
+ (unless (< current-lifetime 0)
+ (loop (if current-lifetime
+ (- current-lifetime 1)
+ #f)))
+ (loop #f))))
+ (when destructor
+ (apply destructor args))
+ (init (initializer))))))
+ (iota parallelism))
+ channel)))
+
+(define* (call-with-worker-thread channel proc #:key duration-logger)
+ "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
+If already in the worker thread, call PROC immediately."
+ (let ((args (%worker-thread-args)))
+ (if args
+ (apply proc args)
+ (let ((reply (make-channel)))
+ (put-message channel (list reply (get-internal-real-time) proc))
+ (match (get-message reply)
+ (('worker-thread-error duration exn)
+ (when duration-logger
+ (duration-logger duration))
+ (raise-exception exn))
+ ((duration . result)
+ (when duration-logger
+ (duration-logger duration))
+ (apply values result)))))))
diff --git a/nginx/conf/nginx.conf b/nginx/conf/nginx.conf
new file mode 100644
index 0000000..2dffe12
--- /dev/null
+++ b/nginx/conf/nginx.conf
@@ -0,0 +1,33 @@
+daemon off;
+error_log /dev/stdout info;
+
+events {
+}
+
+http {
+ access_log /dev/stdout;
+
+ upstream nar-herder {
+ server 127.0.0.1:8080;
+ }
+
+ server {
+ listen 8081;
+
+ location ~ \.narinfo$ {
+ proxy_pass http://nar-herder;
+ }
+
+ location ~ ^/nar/(.*)$ {
+ alias /home/chris/Projects/Guix/nar-herder/data/nar/$1;
+ }
+
+ location = /latest-database-dump {
+ proxy_pass http://nar-herder;
+ }
+ location ~ ^/internal/database/(.*)$ {
+ internal;
+ alias /home/chris/Projects/Guix/nar-herder/$1;
+ }
+ }
+}
diff --git a/pre-inst-env.in b/pre-inst-env.in
new file mode 100644
index 0000000..ebf1a05
--- /dev/null
+++ b/pre-inst-env.in
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+abs_top_srcdir="`cd "@abs_top_srcdir@" > /dev/null; pwd`"
+abs_top_builddir="`cd "@abs_top_builddir@" > /dev/null; pwd`"
+
+GUILE_LOAD_COMPILED_PATH="$abs_top_builddir${GUILE_LOAD_COMPILED_PATH:+:}$GUILE_LOAD_COMPILED_PATH"
+GUILE_LOAD_PATH="$abs_top_builddir:$abs_top_srcdir${GUILE_LOAD_PATH:+:}:$GUILE_LOAD_PATH"
+export GUILE_LOAD_COMPILED_PATH GUILE_LOAD_PATH
+
+PATH="$abs_top_builddir:$PATH"
+export PATH
+
+exec "$@"
diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in
new file mode 100644
index 0000000..372dd39
--- /dev/null
+++ b/scripts/nar-herder.in
@@ -0,0 +1,297 @@
+#!@GUILE@ --no-auto-compile
+-*- scheme -*-
+-*- geiser-scheme-implementation: guile -*-
+!#
+;;; Nar Herder
+;;;
+;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This file is part of the nar-herder.
+;;;
+;;; The Nar Herder is free software; you can redistribute it and/or
+;;; modify it under the terms of the GNU General Public License as
+;;; published by the Free Software Foundation; either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; The Nar Herder is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with the guix-data-service. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(setvbuf (current-output-port) 'line)
+(setvbuf (current-error-port) 'line)
+
+(let ((columns (string->number (or (getenv "COLUMNS") ""))))
+ (setenv "COLUMNS"
+ (number->string
+ (if columns
+ (max 256 columns)
+ 256))))
+
+(use-modules (srfi srfi-1)
+ (srfi srfi-19)
+ (srfi srfi-37)
+ (srfi srfi-43)
+ (ice-9 ftw)
+ (ice-9 match)
+ (ice-9 format)
+ (web uri)
+ (web client)
+ (web response)
+ (oop goops)
+ (logging logger)
+ (logging port-log)
+ (fibers)
+ (fibers conditions)
+ (fibers web server)
+ ((guix ui) #:select (read/eval))
+ (guix progress)
+ (guix narinfo)
+ (guix derivations)
+ ((guix build utils) #:select (dump-port))
+ (nar-herder utils)
+ (nar-herder database)
+ (nar-herder storage)
+ (nar-herder mirror)
+ (nar-herder server))
+
+(define %base-options
+ (list (option '("database") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'database
+ arg
+ result)))))
+
+(define %base-option-defaults
+ ;; Alist of default option values
+ `((database . ,(string-append (getcwd) "/nar_herder.db"))
+ (database-dump . ,(string-append (getcwd) "/nar_herder_dump.db"))))
+
+(define %server-options
+ (list (option '("port") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'port
+ (string->number arg)
+ (alist-delete 'port result))))
+ (option '("host") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'host
+ arg
+ (alist-delete 'host result))))
+ (option '("storage") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'storage
+ arg
+ (alist-delete 'storage result))))
+ (option '("storage-limit") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'storage-limit
+ (if (string=? arg "none")
+ "none"
+ (string->number arg))
+ (alist-delete 'storage-limit result))))
+
+ ;; stored-on=https://other-nar-herder-server
+ ;; stored-on=https://other-nar-herder-server&stored-on=https://different-server
+ (option '("storage-nar-removal-criteria") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'storage-nar-removal-criteria
+ (match (string-split arg #\=)
+ ((sym rest ...)
+ (cons (string->symbol sym) rest)))
+ result)))
+
+ (option '("mirror") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'mirror
+ arg
+ (alist-delete 'mirror result))))))
+
+(define %server-option-defaults
+ '((port . 8080)
+ (host . "0.0.0.0")
+
+ (storage-limit . "none")))
+
+(define (parse-options options defaults args)
+ (args-fold
+ args options
+ (lambda (opt name arg result)
+ (error "unrecognized option" name))
+ (lambda (arg result)
+ (alist-cons
+ 'arguments
+ (cons arg
+ (or (assoc-ref result 'arguments)
+ '()))
+ (alist-delete 'arguments result)))
+ defaults))
+
+(match (cdr (program-arguments))
+ (("import" rest ...)
+ (let* ((opts (parse-options %base-options
+ %base-option-defaults
+ rest))
+ (database (setup-database
+ (assq-ref opts 'database))))
+ (let* ((narinfos
+ (append-map
+ (lambda (file-or-dir)
+ (let ((s (stat file-or-dir)))
+ (match (stat:type s)
+ ('regular (list file-or-dir))
+ ('directory
+ (let ((dir file-or-dir))
+ (map
+ (lambda (nar-filename)
+ (string-append dir
+ (if (string-suffix? "/" dir)
+ ""
+ "/")
+ nar-filename))
+ (scandir file-or-dir
+ (lambda (name)
+ (string-suffix? ".narinfo" name)))))))))
+ rest))
+ (len (length narinfos))
+ (progress
+ (progress-reporter/bar len
+ (format #f "importing ~a narinfos"
+ len)
+ (current-error-port))))
+
+ (call-with-progress-reporter progress
+ (lambda (report)
+ (for-each (lambda (narinfo)
+ (database-insert-narinfo
+ database
+ (call-with-input-file narinfo
+ (lambda (port)
+ ;; Set url to a dummy value as this doesn't
+ ;; matter
+ (read-narinfo port "https://narherderdummyvalue"))))
+
+ (report))
+ narinfos))))))
+ (("run-server" rest ...)
+ (simple-format (current-error-port) "locale is ~A\n" (check-locale!))
+
+ (let* ((opts (parse-options (append %base-options
+ %server-options)
+ (append %base-option-defaults
+ %server-option-defaults)
+ rest))
+ (unknown-arguments
+ (or (assq-ref opts 'arguments)
+ '()))
+ (lgr (make <logger>))
+ (port-log (make <port-log>
+ #:port (current-output-port)
+ #:formatter
+ (lambda (lvl time str)
+ (format #f "~a (~5a): ~a~%"
+ (strftime "%F %H:%M:%S" (localtime time))
+ lvl
+ str)))))
+
+ (define (download-database)
+ (let ((database-uri
+ (string->uri
+ (string-append (assq-ref opts 'mirror)
+ "/latest-database-dump"))))
+ (call-with-values
+ (lambda ()
+ (http-get database-uri
+ #:decode-body? #f
+ #:streaming? #t))
+ (lambda (response body)
+ (when (not (= (response-code response) 200))
+ (error "unable to fetch database from mirror"))
+
+ (call-with-output-file (assq-ref opts 'database)
+ (lambda (output-port)
+ (dump-port body output-port)))
+
+ (simple-format (current-error-port)
+ "finished downloading the database\n")))))
+
+ (add-handler! lgr port-log)
+ (open-log! lgr)
+ (set-default-logger! lgr)
+
+ (unless (null? unknown-arguments)
+ (simple-format (current-error-port)
+ "unknown arguments: ~A\n"
+ unknown-arguments)
+ (exit 1))
+
+ (unless (or (assq-ref opts 'mirror)
+ (assq-ref opts 'storage))
+ (simple-format (current-error-port)
+ "error: you must specify --mirror or --storage\n")
+ (exit 1))
+
+ (and=>
+ (assq-ref opts 'mirror)
+ (lambda (mirror)
+ (let ((database-file (assq-ref opts 'database)))
+ (if (file-exists? database-file)
+ (begin
+ ;; TODO Open the database, and check if the
+ ;; latest changes in the database are visible on
+ ;; the source to mirror. If they're not, then
+ ;; delete the database and download it to get
+ ;; back in sync
+
+ #f)
+ (download-database)))))
+
+ (let ((database (setup-database (assq-ref opts 'database)))
+ (canonical-storage (and=> (assq-ref opts 'storage)
+ canonicalize-path)))
+ (and=> (assq-ref opts 'pid-file)
+ (lambda (pid-file)
+ (call-with-output-file pid-file
+ (lambda (port)
+ (simple-format port "~A\n" (getpid))))))
+
+ (when (not (file-exists? (assq-ref opts 'database-dump)))
+ (simple-format (current-error-port)
+ "dumping database...\n")
+ (dump-database database (assq-ref opts 'database-dump)))
+
+ (and=> (assq-ref opts 'mirror)
+ (lambda (mirror)
+ (start-fetch-changes-thread database mirror)
+
+ (when (assq-ref opts 'storage)
+ (start-mirroring-thread database
+ mirror
+ (assq-ref opts 'storage-limit)
+ canonical-storage))))
+
+
+ (when (and (assq-ref opts 'storage)
+ (number? (assq-ref opts 'storage-limit)))
+ (start-nar-removal-thread database
+ canonical-storage
+ (assq-ref opts 'storage-limit)
+ (filter-map
+ (match-lambda
+ ((key . val)
+ (if (eq? key 'storage-nar-removal-criteria)
+ val
+ #f)))
+ opts)))
+
+ (simple-format (current-error-port)
+ "starting server\n")
+ (run-server
+ (make-request-handler database
+ canonical-storage)
+ #:host (assq-ref opts 'host)
+ #:port (assq-ref opts 'port))))))