diff options
author | Christopher Baines <mail@cbaines.net> | 2021-12-11 10:27:24 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2021-12-12 16:35:38 +0000 |
commit | f9ff69e1c79f024ed188ad51642cca443aedfee2 (patch) | |
tree | 609b37ff8d6fc3d557d339a67ba6641522b0a977 | |
parent | 7e280ca951e8ffa7c86224843075e65266911617 (diff) | |
download | nar-herder-f9ff69e1c79f024ed188ad51642cca443aedfee2.tar nar-herder-f9ff69e1c79f024ed188ad51642cca443aedfee2.tar.gz |
Get most of the functionality sort of working
At least working enough to start trying this out, and finding the
problems.
-rw-r--r-- | .envrc | 13 | ||||
-rw-r--r-- | .gitignore | 23 | ||||
-rw-r--r-- | Makefile.am | 15 | ||||
-rw-r--r-- | VERSION | 1 | ||||
-rwxr-xr-x | bootstrap.sh | 3 | ||||
-rw-r--r-- | configure.ac | 35 | ||||
-rw-r--r-- | guile.am | 22 | ||||
-rw-r--r-- | guix-dev.scm | 71 | ||||
-rw-r--r-- | nar-herder/database.scm | 532 | ||||
-rw-r--r-- | nar-herder/mirror.scm | 82 | ||||
-rw-r--r-- | nar-herder/server.scm | 99 | ||||
-rw-r--r-- | nar-herder/storage.scm | 250 | ||||
-rw-r--r-- | nar-herder/utils.scm | 651 | ||||
-rw-r--r-- | nginx/conf/nginx.conf | 33 | ||||
-rw-r--r-- | pre-inst-env.in | 13 | ||||
-rw-r--r-- | scripts/nar-herder.in | 297 |
16 files changed, 2140 insertions, 0 deletions
@@ -0,0 +1,13 @@ +# Unset the Guile paths to avoid mixing Guile major versions +export GUILE_LOAD_PATH="" +export GUILE_LOAD_COMPILED_PATH="" + +use guix --no-grafts -l guix-dev.scm + +export GUILE_LOAD_COMPILED_PATH="$PWD:$PWD/tests:$GUILE_LOAD_COMPILED_PATH" +export GUILE_LOAD_PATH="$PWD:$GUILE_LOAD_PATH" +export PATH="$PWD/scripts:$PATH" + +if [ -f .local.envrc ]; then + source_env .local.envrc +fi diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a112683 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +*.go +Makefile.in +Makefile +aclocal.m4 +autom4te.cache +config.log +config.status +configure + +build-aux/install-sh +build-aux/missing + +pre-inst-env + +.local.envrc + +*.db +*.db-shm +*.db-wal + +scripts/nar-herder + +nar-herder/config.scm diff --git a/Makefile.am b/Makefile.am new file mode 100644 index 0000000..4438ba3 --- /dev/null +++ b/Makefile.am @@ -0,0 +1,15 @@ +include guile.am + +bin_SCRIPTS = \ + scripts/nar-herder + +SOURCES = \ + nar-herder/database.scm \ + nar-herder/server.scm \ + nar-herder/storage.scm \ + nar-herder/mirror.scm \ + nar-herder/utils.scm + +install-data-local: + mkdir -p "$(DESTDIR)$(docdir)"; + cp README.org "$(DESTDIR)$(docdir)/README.org" @@ -0,0 +1 @@ +0.0 diff --git a/bootstrap.sh b/bootstrap.sh new file mode 100755 index 0000000..5af6611 --- /dev/null +++ b/bootstrap.sh @@ -0,0 +1,3 @@ +#! /bin/sh + +autoreconf --verbose --install --force diff --git a/configure.ac b/configure.ac new file mode 100644 index 0000000..8954c8c --- /dev/null +++ b/configure.ac @@ -0,0 +1,35 @@ +AC_INIT([nar-herder], [m4_translit(m4_esyscmd([cat VERSION]),m4_newline)]) +AC_CONFIG_AUX_DIR([build-aux]) +AM_INIT_AUTOMAKE([gnu color-tests -Wall -Wno-portability foreign]) + +GUILE_PKG([3.0]) +GUILE_PROGS +if test "x$GUILD" = "x"; then + AC_MSG_ERROR(['guild' binary not found; please check your guile-3.x installation.]) +fi + +if test "$cross_compiling" != no; then + GUILE_TARGET="--target=$host_alias" + AC_SUBST([GUILE_TARGET]) +fi + +dnl Check for Guile-lzlib. +GUILE_MODULE_AVAILABLE([have_guile_lzlib], [(lzlib)]) +if test "x$have_guile_lzlib" != "xyes"; then + AC_MSG_ERROR([Guile-lzlib is missing; please install it.]) +fi + +dnl Check for Guile-lib. +GUILE_MODULE_AVAILABLE([have_guile_lib], [(logging logger)]) +if test "x$have_guile_lib" != "xyes"; then + AC_MSG_ERROR([Guile-lib is missing; please install it.]) +fi + +AC_CONFIG_FILES([Makefile]) +AC_CONFIG_FILES([pre-inst-env], [chmod +x pre-inst-env]) +AC_CONFIG_FILES( + [scripts/nar-herder], + [chmod +x scripts/nar-herder] +) + +AC_OUTPUT diff --git a/guile.am b/guile.am new file mode 100644 index 0000000..7f07ca2 --- /dev/null +++ b/guile.am @@ -0,0 +1,22 @@ +moddir=$(datadir)/guile/site/$(GUILE_EFFECTIVE_VERSION) +godir=$(libdir)/guile/$(GUILE_EFFECTIVE_VERSION)/site-ccache + +GOBJECTS = $(SOURCES:%.scm=%.go) + +nobase_mod_DATA = $(SOURCES) $(NOCOMP_SOURCES) +nobase_go_DATA = $(GOBJECTS) + +# Make sure source files are installed first, so that the mtime of +# installed compiled files is greater than that of installed source +# files. See +# <http://lists.gnu.org/archive/html/guile-devel/2010-07/msg00125.html> +# for details. +guile_install_go_files = install-nobase_goDATA +$(guile_install_go_files): install-nobase_modDATA + +CLEANFILES = $(GOBJECTS) +EXTRA_DIST = $(SOURCES) $(NOCOMP_SOURCES) +GUILE_WARNINGS = -Wunbound-variable -Warity-mismatch -Wformat +SUFFIXES = .scm .go +.scm.go: + $(AM_V_GEN)$(top_builddir)/pre-inst-env $(GUILE_TOOLS) compile $(GUILE_TARGET) $(GUILE_WARNINGS) -o "$@" "$<" diff --git a/guix-dev.scm b/guix-dev.scm new file mode 100644 index 0000000..b4c9aba --- /dev/null +++ b/guix-dev.scm @@ -0,0 +1,71 @@ +;;; nar-herder +;;; +;;; Copyright © 2021 Christopher Baines <mail@cbaines.net> +;;; +;;; This file is part of the nar-herder. +;;; +;;; guix-data-service is free software; you can redistribute it and/or modify it +;;; under the terms of the GNU General Public License as published by +;;; the Free Software Foundation; either version 3 of the License, or +;;; (at your option) any later version. +;;; +;;; guix-data-service is distributed in the hope that it will be useful, but +;;; WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; General Public License for more details. +;;; +;;; You should have received a copy of the GNU General Public License +;;; along with the guix-data-service. If not, see +;;; <http://www.gnu.org/licenses/>. + +;;; Run the following command to enter a development environment for +;;; the nar-herder: +;;; +;;; $ guix environment -l guix-dev.scm + +(use-modules ((guix licenses) #:prefix license:) + (guix packages) + (guix download) + (guix git-download) + (guix utils) + (guix build-system gnu) + (gnu packages) + (gnu packages autotools) + (gnu packages compression) + (gnu packages databases) + (gnu packages gnupg) + (gnu packages guile) + (gnu packages guile-xyz) + (gnu packages package-management) + (gnu packages perl) + (gnu packages pkg-config) + (gnu packages ruby) + (gnu packages sqlite) + (gnu packages texinfo) + (gnu packages web) + (srfi srfi-1)) + +(package + (name "nar-herder") + (version "0") + (source #f) + (build-system gnu-build-system) + (inputs + `(("guix" ,guix) + ("guile-json" ,guile-json-4) + ("guile-fibers" ,guile-fibers) + ("guile-gcrypt" ,guile-gcrypt) + ("guile-readline" ,guile-readline) + ("guile-lzlib" ,guile-lzlib) + ("guile" ,@(assoc-ref (package-native-inputs guix) "guile")) + ("guile-sqlite3" ,guile-sqlite3) + ("sqlite" ,sqlite))) + (native-inputs + `(("autoconf" ,autoconf) + ("automake" ,automake) + ("pkg-config" ,pkg-config) + ("nginx" ,nginx))) + (synopsis "TODO") + (description "TODO") + (home-page "TODO") + (license license:gpl3+)) diff --git a/nar-herder/database.scm b/nar-herder/database.scm new file mode 100644 index 0000000..9cb04e2 --- /dev/null +++ b/nar-herder/database.scm @@ -0,0 +1,532 @@ +;;; Nar Herder +;;; +;;; Copyright © 2021 Christopher Baines <mail@cbaines.net> +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; <http://www.gnu.org/licenses/>. + +(define-module (nar-herder database) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) + #:use-module (srfi srfi-19) + #:use-module (ice-9 match) + #:use-module (ice-9 format) + #:use-module (ice-9 threads) + #:use-module (ice-9 exceptions) + #:use-module (web uri) + #:use-module (sqlite3) + #:use-module (fibers) + #:use-module (guix narinfo) + #:use-module (guix derivations) + #:use-module (nar-herder utils) + #:export (setup-database + + database-optimize + database-spawn-fibers + + database-call-with-transaction + + dump-database + + database-insert-narinfo + database-select-narinfo-contents-by-hash + + database-select-recent-changes + database-select-latest-recent-change-datetime + + database-select-narinfo-files + + database-map-all-narinfo-files)) + +(define-record-type <database> + (make-database database-file reader-thread-channel writer-thread-channel) + database? + (database-file database-file) + (reader-thread-channel database-reader-thread-channel) + (writer-thread-channel database-writer-thread-channel)) + +(define* (db-open database + #:key (write? #t)) + (define flags + `(,@(if write? + (list SQLITE_OPEN_READWRITE + SQLITE_OPEN_CREATE) + + (list SQLITE_OPEN_READONLY)) + ,SQLITE_OPEN_NOMUTEX)) + + (sqlite-open database (apply logior flags))) + +(define (perform-initial-database-setup db) + (define schema + " +CREATE TABLE narinfos ( + id INTEGER PRIMARY KEY ASC, + store_path TEXT NOT NULL, + nar_hash TEXT NOT NULL, + nar_size INTEGER NOT NULL, + deriver TEXT, + system TEXT, + contents NOT NULL +); + +CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32)); + +CREATE TABLE narinfo_files ( + narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), + size INTEGER NOT NULL, + compression TEXT, + url TEXT NOT NULL +); + +CREATE TABLE narinfo_references ( + narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), + store_path TEXT NOT NULL +); + +CREATE TABLE tags ( + id INTEGER PRIMARY KEY ASC, + key TEXT NOT NULL, + value TEXT NOT NULL +); + +CREATE UNIQUE INDEX tags_index ON tags (key, value); + +CREATE TABLE narinfo_tags ( + narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), + tag_id INTEGER NOT NULL REFERENCES tags (id) +); + +CREATE TABLE recent_changes ( + id INTEGER PRIMARY KEY ASC, + datetime TEXT NOT NULL, + change TEXT NOT NULl, + data TEXT NOT NULL +);") + + (sqlite-exec db schema)) + +(define (update-schema db) + (let ((statement + (sqlite-prepare + db + " +SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) + + (sqlite-bind-arguments + statement + #:name "narinfos") + + (match (sqlite-step statement) + (#f (perform-initial-database-setup db)) + (_ #f)) + + (sqlite-finalize statement))) + +(define (setup-database database-file) + (let ((db (db-open database-file))) + (sqlite-exec db "PRAGMA journal_mode=WAL;") + (sqlite-exec db "PRAGMA optimize;") + (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") + + (update-schema db) + + (sqlite-close db)) + + (let ((reader-thread-channel + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file #:write? #f))) + (sqlite-exec db "PRAGMA busy_timeout = 5000;") + (list db))) + #:destructor + (lambda (db) + (sqlite-close db)) + #:lifetime 50000 + + ;; Use a minimum of 2 and a maximum of 8 threads + #:parallelism + (min (max (current-processor-count) + 2) + 8))) + + (writer-thread-channel + (make-worker-thread-channel + (lambda () + (let ((db + (db-open database-file))) + (sqlite-exec db "PRAGMA busy_timeout = 5000;") + (sqlite-exec db "PRAGMA foreign_keys = ON;") + (list db))) + #:destructor + (lambda (db) + (db-optimize db + database-file) + + (sqlite-close db)) + #:lifetime 500 + + ;; SQLite doesn't support parallel writes + #:parallelism 1))) + + (make-database database-file + reader-thread-channel + writer-thread-channel))) + +(define (db-optimize db db-filename) + (define (wal-size) + (let ((db-wal-filename + (string-append db-filename "-wal"))) + + (stat:size (stat db-wal-filename)))) + + (define MiB (* (expt 2 20) 1.)) + (define wal-size-threshold + (* 5 MiB)) + + (when (> (wal-size) wal-size-threshold) + (sqlite-exec db "PRAGMA wal_checkpoint(TRUNCATE);") + + (sqlite-exec db + " +PRAGMA analysis_limit=1000; +PRAGMA optimize;"))) + +(define (database-optimize database) + (retry-on-error + (lambda () + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (db-optimize + db + (database-file database))))) + #:times 5 + #:delay 5)) + +(define (database-spawn-fibers database) + (spawn-fiber + (lambda () + (while #t + (sleep (* 60 5)) ; 5 minutes + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "exception when performing WAL checkpoint: ~A\n" + exn)) + (lambda () + (database-optimize database)) + #:unwind? #t))) + #:parallel? #t)) + +(define %current-transaction-proc + (make-parameter #f)) + +(define* (database-call-with-transaction database proc + #:key + readonly?) + (define (run-proc-within-transaction db) + (if (%current-transaction-proc) + (proc db) ; already in transaction + (begin + (sqlite-exec db "BEGIN TRANSACTION;") + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "error: sqlite rolling back transaction\n") + (sqlite-exec db "ROLLBACK TRANSACTION;") + (raise-exception exn)) + (lambda () + (call-with-values + (lambda () + (parameterize ((%current-transaction-proc proc)) + (proc db))) + (lambda vals + (sqlite-exec db "COMMIT TRANSACTION;") + (apply values vals)))))))) + + (match (call-with-worker-thread + ((if readonly? + database-reader-thread-channel + database-writer-thread-channel) + database) + (lambda (db) + (let ((start-time (get-internal-real-time))) + (call-with-values + (lambda () + (run-proc-within-transaction db)) + (lambda vals + (let ((duration-seconds + (/ (- (get-internal-real-time) start-time) + internal-time-units-per-second))) + (when (and (not readonly?) + (> duration-seconds 2)) + (display + (format + #f + "warning: ~a:\n took ~4f seconds in transaction\n" + proc + duration-seconds) + (current-error-port))) + + (cons duration-seconds vals))))))) + ((duration vals ...) + (apply values vals)))) + +(define (dump-database database name) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (sqlite-exec + db + (string-append "VACUUM INTO '" name "';"))))) + +(define (last-insert-rowid db) + (let ((statement + (sqlite-prepare + db + "SELECT last_insert_rowid();" + #:cache? #t))) + (let ((id + (vector-ref (sqlite-step statement) + 0))) + + (sqlite-reset statement) + + id))) + +(define (database-insert-narinfo database narinfo) + (define (insert-narinfo-record db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO narinfos ( + store_path, nar_hash, nar_size, deriver, system, contents +) VALUES ( + :store_path, :nar_hash, :nar_size, :deriver, :system, :contents +)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:store_path (narinfo-path narinfo) + #:nar_hash (narinfo-hash narinfo) + #:nar_size (narinfo-size narinfo) + #:deriver (narinfo-deriver narinfo) + #:system (narinfo-system narinfo) + #:contents (narinfo-contents narinfo)) + + (sqlite-step statement) + (sqlite-reset statement) + + (last-insert-rowid db))) + + (define (insert-narinfo-file db narinfo-id size compression url) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO narinfo_files ( + narinfo_id, size, compression, url +) VALUES ( + :narinfo_id, :size, :compression, :url +)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:size size + #:compression compression + #:url url) + + (sqlite-step statement) + (sqlite-reset statement))) + + (define (insert-narinfo-references db narinfo-id store-path) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO narinfo_references ( + narinfo_id, store_path +) VALUES ( + :narinfo_id, :store_path +)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:store_path store-path) + + (sqlite-step statement) + (sqlite-reset statement))) + + (define (insert-change db contents) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO recent_changes ( + datetime, change, data +) VALUES ( + datetime('now'), 'addition', :contents +)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:contents contents) + + (sqlite-step statement) + (sqlite-reset statement))) + + (database-call-with-transaction + database + (lambda (db) + (let ((narinfo-id (insert-narinfo-record db))) + (let ((len (length (narinfo-uris narinfo)))) + (for-each insert-narinfo-file + (make-list len db) + (make-list len narinfo-id) + (narinfo-file-sizes narinfo) + (narinfo-compressions narinfo) + (map uri-path (narinfo-uris narinfo)))) + + (let ((references (narinfo-references narinfo))) + (for-each insert-narinfo-references + (make-list (length references) db) + (make-list (length references) narinfo-id) + references)) + + (insert-change db (narinfo-contents narinfo)) + + narinfo-id)))) + +(define (database-select-narinfo-contents-by-hash database hash) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:hash hash) + + (match (let ((result (sqlite-step statement))) + (sqlite-reset statement) + result) + (#(contents) contents) + (_ #f)))))) + +(define (database-select-recent-changes database after-date) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT datetime, change, data FROM recent_changes WHERE datetime >= :datetime" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:datetime after-date) + + (let loop ((row (sqlite-step statement)) + (result '())) + (match row + (#(datetime change data) + (loop (sqlite-step statement) + (cons `((datetime . ,datetime) + (change . ,change) + (data . ,data)) + result))) + (#f + (sqlite-reset statement) + + (reverse result)))))))) + +(define (database-select-latest-recent-change-datetime database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT datetime FROM recent_changes ORDER BY datetime ASC LIMIT 1" + #:cache? #t))) + + (let ((result (match (sqlite-step statement) + (#(date) date) + (#f #f)))) + (sqlite-reset statement) + + result))))) + +(define (database-select-narinfo-files database hash) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url +FROM narinfos +INNER JOIN narinfo_files + ON narinfos.id = narinfo_files.narinfo_id +WHERE substr(narinfos.store_path, 12, 32) = :hash" + #:cache? #t))) + (let ((result + (sqlite-map + (match-lambda + (#(size compression url) + `((size . ,size) + (compression . ,compression) + (url . ,url)))) + statement))) + (sqlite-reset statement) + + result))))) + +(define (database-map-all-narinfo-files database proc) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT size, compression, url +FROM narinfo_files" + #:cache? #t))) + (let ((result-list + (sqlite-fold + (lambda (row result) + (match row + (#(size compression url) + (cons (proc `((size . ,size) + (compression . ,compression) + (url . ,url))) + result)))) + '() + statement))) + (sqlite-reset statement) + + result-list))))) diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm new file mode 100644 index 0000000..01da2ba --- /dev/null +++ b/nar-herder/mirror.scm @@ -0,0 +1,82 @@ +;;; Nar Herder +;;; +;;; Copyright © 2021 Christopher Baines <mail@cbaines.net> +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; <http://www.gnu.org/licenses/>. + +(define-module (nar-herder mirror) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-43) + #:use-module (ice-9 threads) + #:use-module (rnrs bytevectors) + #:use-module (web uri) + #:use-module (web client) + #:use-module (web response) + #:use-module (logging logger) + #:use-module (json) + #:use-module (guix narinfo) + #:use-module (nar-herder database) + #:export (start-fetch-changes-thread)) + +(define (start-fetch-changes-thread database mirror) + (define (request-recent-changes) + (define latest-recent-change + (database-select-latest-recent-change-datetime database)) + + (define processed-recent-changes + (database-select-recent-changes database latest-recent-change)) + + (call-with-values + (lambda () + (http-get + (string->uri + (string-append mirror "/recent-changes" + (if latest-recent-change + (string-append "?since=" latest-recent-change) + ""))) + #:decode-body? #f)) + (lambda (response body) + (if (= (response-code response) 200) + (let ((json-body (json-string->scm + (utf8->string body)))) + (vector-for-each + (lambda (_ change-details) + ;; Guard against processing changes that have already + ;; been processed + (unless (member processed-recent-changes change-details) + (let ((change (assoc-ref change-details "change"))) + (cond + ((string=? change "addition") + (let ((narinfo + (call-with-input-string + (assoc-ref change-details "data") + (lambda (port) + (read-narinfo port + "https://narherderdummyvalue"))))) + (log-msg 'INFO "processing addition change for " + (first (narinfo-uris narinfo))) + (database-insert-narinfo database + narinfo))) + (else + (error "unimplemented")))))) + (assoc-ref json-body "recent_changes"))) + (error "unknown response code"))))) + + (call-with-new-thread + (lambda () + (while #t + (request-recent-changes) + + (sleep 60))))) diff --git a/nar-herder/server.scm b/nar-herder/server.scm new file mode 100644 index 0000000..6a77645 --- /dev/null +++ b/nar-herder/server.scm @@ -0,0 +1,99 @@ +;;; Nar Herder +;;; +;;; Copyright © 2021 Christopher Baines <mail@cbaines.net> +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; <http://www.gnu.org/licenses/>. + +(define-module (nar-herder server) + #:use-module (ice-9 match) + #:use-module (web uri) + #:use-module (web response) + #:use-module (web request) + #:use-module (json) + #:use-module (nar-herder database) + #:use-module (nar-herder storage) + #:export (make-request-handler)) + +(define* (render-json json #:key (extra-headers '()) + (code 200)) + (values (build-response + #:code code + #:headers (append extra-headers + '((content-type . (application/json)) + (vary . (accept))))) + (lambda (port) + (scm->json json port)))) + +(define (parse-query-string query) + (let lp ((lst (map uri-decode (string-split query (char-set #\& #\=))))) + (match lst + ((key value . rest) + (cons (cons key value) (lp rest))) + (("") '()) + (() '())))) + +(define (make-request-handler database storage-root) + (define (narinfo? str) + (and + (= (string-length str) 40) + (string-suffix? ".narinfo" str))) + + (lambda (request body) + (simple-format (current-error-port) + "~A ~A\n" + (request-method request) + (uri-path (request-uri request))) + + (match (cons (request-method request) + (split-and-decode-uri-path + (uri-path (request-uri request)))) + (('GET (? narinfo? narinfo)) + (let ((narinfo-contents + (database-select-narinfo-contents-by-hash + database + (string-take narinfo 32)))) + (if narinfo-contents + (values '((content-type . (text/plain))) + narinfo-contents) + (values (build-response #:code 404) + "404")))) + (('GET (? narinfo? narinfo) "info") + ;; TODO Check if narinfo exists? + (render-json + `((stored . ,(store-item-in-local-storage? + database + storage-root + (string-take narinfo 32)))))) + (('GET "recent-changes") + (let ((query-parameters + (or (and=> (uri-query (request-uri request)) + parse-query-string) + '()))) + (render-json + `((recent_changes . ,(list->vector + (database-select-recent-changes + database + (or + (assoc-ref query-parameters "since") + "1970-01-01 00:00:01")))))))) + (('GET "latest-database-dump") + (values (build-response + #:code 200 + #:headers '((X-Accel-Redirect . "/internal/database/nar_herder_dump.db"))) + #f)) + (_ + (values (build-response #:code 404) + "404"))))) + diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm new file mode 100644 index 0000000..96c84ed --- /dev/null +++ b/nar-herder/storage.scm @@ -0,0 +1,250 @@ +;;; Nar Herder +;;; +;;; Copyright © 2021 Christopher Baines <mail@cbaines.net> +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; <http://www.gnu.org/licenses/>. + +(define-module (nar-herder storage) + #:use-module (srfi srfi-1) + #:use-module (ice-9 ftw) + #:use-module (ice-9 match) + #:use-module (ice-9 threads) + #:use-module (web uri) + #:use-module (web client) + #:use-module (web response) + #:use-module (logging logger) + #:use-module (logging port-log) + #:use-module (json) + #:use-module ((guix build utils) #:select (dump-port mkdir-p)) + #:use-module (nar-herder database) + #:export (store-item-in-local-storage? + + start-nar-removal-thread + start-mirroring-thread)) + +(define (store-item-in-local-storage? database storage-root hash) + (let ((narinfo-files (database-select-narinfo-files database hash))) + (every (lambda (file) + (file-exists? + (string-append storage-root "/" file))) + narinfo-files))) + +(define (get-storage-size storage-root) + (define enter? (const #t)) + (define (leaf name stat result) + (+ result + (or (and=> (stat:blocks stat) + (lambda (blocks) + (* blocks 512))) + (stat:size stat)))) + + (define (down name stat result) result) + (define (up name stat result) result) + (define (skip name stat result) result) + + (define (error name stat errno result) + (format (current-error-port) "warning: ~a: ~a~%" + name (strerror errno)) + result) + + (file-system-fold enter? leaf down up skip error + 0 ; Start counting at 0 + storage-root)) + +(define (remove-nar-from-storage storage-root nar-file-url) + (delete-file + (string-append storage-root "/" nar-file-url))) + +(define (index-storage database storage-root) + (define (get-files-hash) + (define (get-file-strings prefix children) + (append-map + (match-lambda + ((name stat) + (list (string-append prefix "/" name))) + ((name stat children ...) + (get-file-strings (string-append prefix "/" name) + children))) + children)) + + (let* ((lst + (match (third (file-system-tree storage-root)) + ((name stat children ...) + (get-file-strings (string-append "/" name) + children)))) + (hash-table + (make-hash-table (length lst)))) + + (for-each (lambda (s) + (hash-set! hash-table s #t)) + lst) + + hash-table)) + + (let* ((files-hash + (get-files-hash)) + (narinfo-files + (database-map-all-narinfo-files + database + (lambda (file) + (let* ((url (assq-ref file 'url)) + (stored? (hash-ref files-hash url))) + (when stored? + ;; Delete the hash entry, so + ;; that the hash at the end will + ;; just contain the unrecognised + ;; files + (hash-remove! files-hash url)) + + `(,@file + (stored? . ,stored?))))))) + `((narinfo-files . ,narinfo-files) + (unrecognised-files . ,(hash-map->list (lambda (key _) key) + files-hash))))) + +(define (start-nar-removal-thread database + storage-root storage-limit + nar-removal-criteria) + + (define (check-removal-criteria nar criteria) + (match criteria + (('and and-criteria ...) + (every check-removal-criteria criteria)) + (('stored-on url) + (let ((uri (string->uri + (string-append url (assq-ref nar 'url))))) + (call-with-values (http-get uri) + (lambda (response body) + (and (= (response-code response) + 200) + + (let ((json-body (json-string->scm body))) + (eq? (assoc-ref json-body "stored") + #t))))))))) + + (define (nar-can-be-removed? nar) + (any (lambda (criteria) + (check-removal-criteria nar criteria)) + nar-removal-criteria)) + + (define (get-stored-nar-files) + (let ((index (index-storage database storage-root))) + (filter + (lambda (file) + (assq-ref file 'stored?)) + (assq-ref index 'narinfo-files)))) + + (define (run-removal-pass) + (let loop ((storage-size (get-storage-size storage-root)) + (stored-nar-files (get-stored-nar-files))) + ;; Look through items in local storage, check if the removal + ;; criteria have been met, and if so, delete it + + (unless (null? stored-nar-files) + (let ((nar-to-consider (car stored-nar-files))) + (if (nar-can-be-removed? nar-to-consider) + (let ((removed-bytes + (remove-nar-from-storage storage-root nar-to-consider))) + + (let ((storage-size-estimate + (- storage-size + removed-bytes))) + (when (> storage-size-estimate storage-limit) + (loop storage-size-estimate + (cdr stored-nar-files))))) + (loop storage-size + (cdr stored-nar-files))))))) + + (call-with-new-thread + (lambda () + (while #t + (run-removal-pass) + + (sleep 300))))) + +(define (start-mirroring-thread database mirror storage-limit storage-root) + (define (get-missing-nar-files) + (let ((index (index-storage database storage-root))) + (filter + (lambda (file) + (not (assq-ref file 'stored?))) + (assq-ref index 'narinfo-files)))) + + (define (fetch-file file) + (let* ((string-url + (string-append mirror file)) + (uri + (string->uri (string-append mirror file))) + (destination-file-name + (string-append storage-root file)) + (tmp-file-name + (string-append destination-file-name "-tmp"))) + (log-msg 'INFO "fetching " string-url) + + (mkdir-p (dirname destination-file-name)) + + (when (file-exists? tmp-file-name) + (delete-file tmp-file-name)) + + (call-with-values + (lambda () + (http-get uri + #:decode-body? #f + #:streaming? #t)) + (lambda (response body) + (unless (= (response-code response) + 200) + (error "unknown response code")) + + (call-with-output-file tmp-file-name + (lambda (output-port) + (dump-port body output-port))) + (rename-file tmp-file-name + destination-file-name))))) + + (define (run-mirror-pass) + (define no-storage-limit? + (not (integer? storage-limit))) + + (log-msg 'DEBUG "running mirror pass") + (let ((initial-storage-size (get-storage-size storage-root))) + ;; If there's free space, then consider downloading missing nars + (when (or no-storage-limit? + (< initial-storage-size storage-limit)) + (let loop ((storage-size initial-storage-size) + (missing-nar-files (get-missing-nar-files))) + (unless (null? missing-nar-files) + (let ((file (car missing-nar-files))) + (log-msg 'DEBUG "considering " + (assq-ref file 'url)) + (let ((file-bytes (assq-ref file 'size))) + (if (or no-storage-limit? + (< (+ storage-size file-bytes) + storage-limit)) + (begin + (fetch-file (assq-ref file 'url)) + (loop (+ storage-size file-bytes) + (cdr missing-nar-files))) + ;; This file won't fit, so try the next one + (loop storage-size + (cdr missing-nar-files)))))))))) + + (call-with-new-thread + (lambda () + (while #t + (run-mirror-pass) + (log-msg 'DEBUG "finished mirror pass") + + (sleep 300))))) diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm new file mode 100644 index 0000000..a0a5171 --- /dev/null +++ b/nar-herder/utils.scm @@ -0,0 +1,651 @@ +;;; Nar Herder +;;; +;;; Copyright © 2021 Christopher Baines <mail@cbaines.net> +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; <http://www.gnu.org/licenses/>. + +(define-module (nar-herder utils) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-19) ; time + #:use-module (ice-9 q) + ;; #:use-module (ice-9 ftw) + ;; #:use-module (ice-9 popen) + #:use-module (ice-9 iconv) + #:use-module (ice-9 match) + #:use-module (ice-9 format) + #:use-module (ice-9 threads) + #:use-module (ice-9 textual-ports) + #:use-module (ice-9 rdelim) + #:use-module (ice-9 binary-ports) + #:use-module (ice-9 exceptions) + #:use-module (rnrs bytevectors) + #:use-module (web uri) + #:use-module (web http) + #:use-module (web client) + #:use-module (web request) + #:use-module (web response) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers conditions) + ;; #:use-module (gcrypt pk-crypto) + ;; #:use-module (gcrypt hash) + ;; #:use-module (gcrypt random) + ;; #:use-module (json) + ;; #:use-module (guix pki) + ;; #:use-module (guix utils) + ;; #:use-module (guix config) + ;; #:use-module (guix store) + ;; #:use-module (guix status) + ;; #:use-module (guix base64) + ;; #:use-module (guix scripts substitute) + #:export (call-with-streaming-http-request + &chunked-input-ended-prematurely + chunked-input-ended-prematurely-error? + make-chunked-input-port* + + make-worker-thread-channel + call-with-worker-thread + + retry-on-error + + create-work-queue + + check-locale!)) + +;; Chunked Responses +(define (read-chunk-header port) + "Read a chunk header from PORT and return the size in bytes of the +upcoming chunk." + (match (read-line port) + ((? eof-object?) + ;; Connection closed prematurely: there's nothing left to read. + (error "chunked input ended prematurely")) + (str + (let ((extension-start (string-index str + (lambda (c) + (or (char=? c #\;) + (char=? c #\return)))))) + (string->number (if extension-start ; unnecessary? + (substring str 0 extension-start) + str) + 16))))) + +(define &chunked-input-ended-prematurely + (make-exception-type '&chunked-input-error-prematurely + &external-error + '())) + +(define make-chunked-input-ended-prematurely-error + (record-constructor &chunked-input-ended-prematurely)) + +(define chunked-input-ended-prematurely-error? + (record-predicate &chunked-input-ended-prematurely)) + +(define* (make-chunked-input-port* port #:key (keep-alive? #f)) + (define (close) + (unless keep-alive? + (close-port port))) + + (define chunk-size 0) ;size of the current chunk + (define remaining 0) ;number of bytes left from the current chunk + (define finished? #f) ;did we get all the chunks? + + (define (read! bv idx to-read) + (define (loop to-read num-read) + (cond ((or finished? (zero? to-read)) + num-read) + ((zero? remaining) ;get a new chunk + (let ((size (read-chunk-header port))) + (set! chunk-size size) + (set! remaining size) + (cond + ((zero? size) + (set! finished? #t) + (get-bytevector-n port 2) ; \r\n follows the last chunk + num-read) + (else + (loop to-read num-read))))) + (else ;read from the current chunk + (let* ((ask-for (min to-read remaining)) + (read (get-bytevector-n! port bv (+ idx num-read) + ask-for))) + (cond + ((eof-object? read) ;premature termination + (raise-exception + (make-chunked-input-ended-prematurely-error))) + (else + (let ((left (- remaining read))) + (set! remaining left) + (when (zero? left) + ;; We're done with this chunk; read CR and LF. + (get-u8 port) (get-u8 port)) + (loop (- to-read read) + (+ num-read read))))))))) + (loop to-read 0)) + + (make-custom-binary-input-port "chunked input port" read! #f #f close)) + +(define* (make-chunked-output-port* port #:key (keep-alive? #f) + (buffering 1200) + report-bytes-sent) + (define heap-allocated-limit + (expt 2 20)) ;; 1MiB + + (define (%put-string s) + (unless (string-null? s) + (let* ((bv (string->bytevector s "ISO-8859-1")) + (length (bytevector-length bv))) + (put-string port (number->string length 16)) + (put-string port "\r\n") + (put-bytevector port bv) + (put-string port "\r\n") + + (when report-bytes-sent + (report-bytes-sent length)) + (let* ((stats (gc-stats)) + (initial-gc-times + (assq-ref stats 'gc-times))) + (when (> (assq-ref stats 'heap-allocated-since-gc) + heap-allocated-limit) + (while (let ((updated-stats (gc-stats))) + (= (assq-ref updated-stats 'gc-times) + initial-gc-times)) + (gc) + (usleep 50))))))) + + (define (%put-char c) + (%put-string (list->string (list c)))) + + (define (flush) #t) + (define (close) + (put-string port "0\r\n\r\n") + (force-output port) + (unless keep-alive? + (close-port port))) + (let ((ret (make-soft-port + (vector %put-char %put-string flush #f close) "w"))) + (setvbuf ret 'block buffering) + ret)) + +(define* (call-with-streaming-http-request uri callback + #:key (headers '()) + (method 'PUT) + report-bytes-sent) + (let* ((port (open-socket-for-uri uri)) + (request + (build-request + uri + #:method method + #:version '(1 . 1) + #:headers `((connection close) + (Transfer-Encoding . "chunked") + (Content-Type . "application/octet-stream") + ,@headers) + #:port port))) + + (set-port-encoding! port "ISO-8859-1") + (setvbuf port 'block (expt 2 13)) + (with-exception-handler + (lambda (exp) + (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp) + (close-port port) + (raise-exception exp)) + (lambda () + (let ((request (write-request request port))) + (let* ((chunked-output-port + (make-chunked-output-port* + port + #:buffering (expt 2 12) + #:keep-alive? #t + #:report-bytes-sent report-bytes-sent))) + + ;; A SIGPIPE will kill Guile, so ignore it + (sigaction SIGPIPE + (lambda (arg) + (simple-format (current-error-port) "warning: SIGPIPE\n"))) + + (set-port-encoding! chunked-output-port "ISO-8859-1") + (callback chunked-output-port) + (close-port chunked-output-port) + + (let ((response (read-response port))) + (let ((body (read-response-body response))) + (close-port port) + (values response + body))))))))) + +(define* (retry-on-error f #:key times delay ignore) + (let loop ((attempt 1)) + (match (with-exception-handler + (lambda (exn) + (when (cond + ((list? ignore) + (any (lambda (test) + (test exn)) + ignore)) + ((procedure? ignore) + (ignore exn)) + (else #f)) + (raise-exception exn)) + + (cons #f exn)) + (lambda () + (call-with-values f + (lambda vals + (cons #t vals)))) + #:unwind? #t) + ((#t . return-values) + (when (> attempt 1) + (simple-format + (current-error-port) + "retry success: ~A\n on attempt ~A of ~A\n" + f + attempt + times)) + (apply values return-values)) + ((#f . exn) + (if (>= attempt times) + (begin + (simple-format + (current-error-port) + "error: ~A:\n ~A,\n giving up after ~A attempts\n" + f + exn + times) + (raise-exception exn)) + (begin + (simple-format + (current-error-port) + "error: ~A:\n ~A,\n attempt ~A of ~A, retrying in ~A\n" + f + exn + attempt + times + delay) + (sleep delay) + (loop (+ 1 attempt)))))))) + +(define delay-logging-fluid + (make-thread-local-fluid)) +(define delay-logging-depth-fluid + (make-thread-local-fluid 0)) + +(define (log-delay proc duration) + (and=> (fluid-ref delay-logging-fluid) + (lambda (recorder) + (recorder proc duration)))) + +(define* (call-with-delay-logging proc #:key (threshold 1) (args '())) + (let ((start (get-internal-real-time)) + (trace '()) + (root-logger? (eq? #f (fluid-ref delay-logging-fluid)))) + + (define (format-seconds seconds) + (format #f "~4f" seconds)) + + (call-with-values + (lambda () + (with-fluid* delay-logging-depth-fluid + (+ 1 (fluid-ref delay-logging-depth-fluid)) + (lambda () + (if root-logger? + (with-fluid* delay-logging-fluid + (lambda (proc duration) + (set! trace + (cons (list proc + duration + (fluid-ref delay-logging-depth-fluid)) + trace)) + #t) + (lambda () + (apply proc args))) + (apply proc args))))) + (lambda vals + (let ((elapsed-seconds + (/ (- (get-internal-real-time) + start) + internal-time-units-per-second))) + (if (and (> elapsed-seconds threshold) + root-logger?) + (let ((lines + (cons + (simple-format #f "warning: delay of ~A seconds: ~A" + (format-seconds elapsed-seconds) + proc) + (map (match-lambda + ((proc duration depth) + (string-append + (make-string (* 2 depth) #\space) + (simple-format #f "~A: ~A" + (format-seconds duration) + proc)))) + trace)))) + (display (string-append + (string-join lines "\n") + "\n"))) + (unless root-logger? + ((fluid-ref delay-logging-fluid) proc elapsed-seconds)))) + (apply values vals))))) + +(define (call-with-time-logging name thunk) + (let ((start (current-time time-utc))) + (call-with-values + thunk + (lambda vals + (let* ((end (current-time time-utc)) + (elapsed (time-difference end start))) + (display + (format #f + "~a took ~f seconds~%" + name + (+ (time-second elapsed) + (/ (time-nanosecond elapsed) 1e9)))) + (apply values vals)))))) + +(define-syntax-rule (with-time-logging name exp ...) + "Log under NAME the time taken to evaluate EXP." + (call-with-time-logging name (lambda () exp ...))) + +(define* (create-work-queue thread-count-parameter proc + #:key thread-start-delay + (thread-stop-delay + (make-time time-duration 0 0))) + (let ((queue (make-q)) + (queue-mutex (make-mutex)) + (job-available (make-condition-variable)) + (running-job-args (make-hash-table))) + + (define get-thread-count + (cond + ((number? thread-count-parameter) + (const thread-count-parameter)) + ((eq? thread-count-parameter #f) + ;; Run one thread per job + (lambda () + (+ (q-length queue) + (hash-count (lambda (index val) + (list? val)) + running-job-args)))) + (else + thread-count-parameter))) + + (define (process-job . args) + (with-mutex queue-mutex + (enq! queue args) + (start-new-threads-if-necessary (get-thread-count)) + (signal-condition-variable job-available))) + + (define (count-threads) + (with-mutex queue-mutex + (hash-count (const #t) running-job-args))) + + (define (count-jobs) + (with-mutex queue-mutex + (+ (q-length queue) + (hash-count (lambda (index val) + (list? val)) + running-job-args)))) + + (define (list-jobs) + (with-mutex queue-mutex + (append (list-copy + (car queue)) + (hash-fold (lambda (key val result) + (or (and val + (cons val result)) + result)) + '() + running-job-args)))) + + (define (thread-process-job job-args) + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "job raised exception: ~A\n" + job-args)) + (lambda () + (with-throw-handler #t + (lambda () + (apply proc job-args)) + (lambda (key . args) + (simple-format (current-error-port) + "exception when handling job: ~A ~A\n" + key args) + (backtrace)))) + #:unwind? #t)) + + (define (start-thread thread-index) + (define (too-many-threads?) + (let ((running-jobs-count + (hash-count (lambda (index val) + (list? val)) + running-job-args)) + (desired-thread-count (get-thread-count))) + + (>= running-jobs-count + desired-thread-count))) + + (define (thread-idle-for-too-long? last-job-finished-at) + (time>=? + (time-difference (current-time time-monotonic) + last-job-finished-at) + thread-stop-delay)) + + (define (stop-thread) + (hash-remove! running-job-args + thread-index) + (unlock-mutex queue-mutex)) + + (call-with-new-thread + (lambda () + (let loop ((last-job-finished-at (current-time time-monotonic))) + (lock-mutex queue-mutex) + + (if (too-many-threads?) + (stop-thread) + (let ((job-args + (if (q-empty? queue) + ;; #f from wait-condition-variable indicates a timeout + (if (wait-condition-variable + job-available + queue-mutex + (+ 9 (time-second (current-time)))) + ;; Another thread could have taken + ;; the job in the mean time + (if (q-empty? queue) + #f + (deq! queue)) + #f) + (deq! queue)))) + + (if job-args + (begin + (hash-set! running-job-args + thread-index + job-args) + + (unlock-mutex queue-mutex) + (thread-process-job job-args) + + (with-mutex queue-mutex + (hash-set! running-job-args + thread-index + #f)) + + (loop (current-time time-monotonic))) + (if (thread-idle-for-too-long? last-job-finished-at) + (stop-thread) + (begin + (unlock-mutex queue-mutex) + + (loop last-job-finished-at)))))))))) + + + (define start-new-threads-if-necessary + (let ((previous-thread-started-at (make-time time-monotonic 0 0))) + (lambda (desired-count) + (let* ((thread-count + (hash-count (const #t) running-job-args)) + (threads-to-start + (- desired-count thread-count))) + (when (> threads-to-start 0) + (for-each + (lambda (thread-index) + (when (eq? (hash-ref running-job-args + thread-index + 'slot-free) + 'slot-free) + (let* ((now (current-time time-monotonic)) + (elapsed (time-difference now + previous-thread-started-at))) + (when (or (eq? #f thread-start-delay) + (time>=? elapsed thread-start-delay)) + (set! previous-thread-started-at now) + (hash-set! running-job-args + thread-index + #f) + (start-thread thread-index))))) + (iota desired-count))))))) + + (if (procedure? thread-count-parameter) + (call-with-new-thread + (lambda () + (while #t + (sleep 15) + (with-mutex queue-mutex + (let ((idle-threads (hash-count (lambda (index val) + (eq? #f val)) + running-job-args))) + (when (= 0 idle-threads) + (start-new-threads-if-necessary (get-thread-count)))))))) + (start-new-threads-if-necessary (get-thread-count))) + + (values process-job count-jobs count-threads list-jobs))) + +(define (check-locale!) + (with-exception-handler + (lambda (exn) + (display + (simple-format + #f + "exception when calling setlocale: ~A +falling back to en_US.utf8\n" + exn) + (current-error-port)) + + (with-exception-handler + (lambda (exn) + (display + (simple-format + #f + "exception when calling setlocale with en_US.utf8: ~A\n" + exn) + (current-error-port)) + + (exit 1)) + (lambda _ + (setlocale LC_ALL "en_US.utf8")) + #:unwind? #t)) + (lambda _ + (setlocale LC_ALL "")) + #:unwind? #t)) + +(define %worker-thread-args + (make-parameter #f)) + +(define* (make-worker-thread-channel initializer + #:key (parallelism 1) + (delay-logger (lambda _ #f)) + destructor + lifetime + (log-exception? (const #t))) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the worker thread procedure." + (parameterize (((@@ (fibers internal) current-fiber) #f)) + (let ((channel (make-channel))) + (for-each + (lambda _ + (call-with-new-thread + (lambda () + (let init ((args (initializer))) + (parameterize ((%worker-thread-args args)) + (let loop ((current-lifetime lifetime)) + (match (get-message channel) + (((? channel? reply) sent-time (? procedure? proc)) + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay + internal-time-units-per-second)) + (put-message + reply + (let ((start-time (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (list 'worker-thread-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))) + (lambda args + (when (match args + (('%exception exn) + (log-exception? exn)) + (_ #t)) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" args) + (backtrace))))) + #:unwind? #t)))))) + (if (number? current-lifetime) + (unless (< current-lifetime 0) + (loop (if current-lifetime + (- current-lifetime 1) + #f))) + (loop #f)))) + (when destructor + (apply destructor args)) + (init (initializer)))))) + (iota parallelism)) + channel))) + +(define* (call-with-worker-thread channel proc #:key duration-logger) + "Send PROC to the worker thread through CHANNEL. Return the result of PROC. +If already in the worker thread, call PROC immediately." + (let ((args (%worker-thread-args))) + (if args + (apply proc args) + (let ((reply (make-channel))) + (put-message channel (list reply (get-internal-real-time) proc)) + (match (get-message reply) + (('worker-thread-error duration exn) + (when duration-logger + (duration-logger duration)) + (raise-exception exn)) + ((duration . result) + (when duration-logger + (duration-logger duration)) + (apply values result))))))) diff --git a/nginx/conf/nginx.conf b/nginx/conf/nginx.conf new file mode 100644 index 0000000..2dffe12 --- /dev/null +++ b/nginx/conf/nginx.conf @@ -0,0 +1,33 @@ +daemon off; +error_log /dev/stdout info; + +events { +} + +http { + access_log /dev/stdout; + + upstream nar-herder { + server 127.0.0.1:8080; + } + + server { + listen 8081; + + location ~ \.narinfo$ { + proxy_pass http://nar-herder; + } + + location ~ ^/nar/(.*)$ { + alias /home/chris/Projects/Guix/nar-herder/data/nar/$1; + } + + location = /latest-database-dump { + proxy_pass http://nar-herder; + } + location ~ ^/internal/database/(.*)$ { + internal; + alias /home/chris/Projects/Guix/nar-herder/$1; + } + } +} diff --git a/pre-inst-env.in b/pre-inst-env.in new file mode 100644 index 0000000..ebf1a05 --- /dev/null +++ b/pre-inst-env.in @@ -0,0 +1,13 @@ +#!/bin/sh + +abs_top_srcdir="`cd "@abs_top_srcdir@" > /dev/null; pwd`" +abs_top_builddir="`cd "@abs_top_builddir@" > /dev/null; pwd`" + +GUILE_LOAD_COMPILED_PATH="$abs_top_builddir${GUILE_LOAD_COMPILED_PATH:+:}$GUILE_LOAD_COMPILED_PATH" +GUILE_LOAD_PATH="$abs_top_builddir:$abs_top_srcdir${GUILE_LOAD_PATH:+:}:$GUILE_LOAD_PATH" +export GUILE_LOAD_COMPILED_PATH GUILE_LOAD_PATH + +PATH="$abs_top_builddir:$PATH" +export PATH + +exec "$@" diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in new file mode 100644 index 0000000..372dd39 --- /dev/null +++ b/scripts/nar-herder.in @@ -0,0 +1,297 @@ +#!@GUILE@ --no-auto-compile +-*- scheme -*- +-*- geiser-scheme-implementation: guile -*- +!# +;;; Nar Herder +;;; +;;; Copyright © 2020 Christopher Baines <mail@cbaines.net> +;;; +;;; This file is part of the nar-herder. +;;; +;;; The Nar Herder is free software; you can redistribute it and/or +;;; modify it under the terms of the GNU General Public License as +;;; published by the Free Software Foundation; either version 3 of the +;;; License, or (at your option) any later version. +;;; +;;; The Nar Herder is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; General Public License for more details. +;;; +;;; You should have received a copy of the GNU General Public License +;;; along with the guix-data-service. If not, see +;;; <http://www.gnu.org/licenses/>. + +(setvbuf (current-output-port) 'line) +(setvbuf (current-error-port) 'line) + +(let ((columns (string->number (or (getenv "COLUMNS") "")))) + (setenv "COLUMNS" + (number->string + (if columns + (max 256 columns) + 256)))) + +(use-modules (srfi srfi-1) + (srfi srfi-19) + (srfi srfi-37) + (srfi srfi-43) + (ice-9 ftw) + (ice-9 match) + (ice-9 format) + (web uri) + (web client) + (web response) + (oop goops) + (logging logger) + (logging port-log) + (fibers) + (fibers conditions) + (fibers web server) + ((guix ui) #:select (read/eval)) + (guix progress) + (guix narinfo) + (guix derivations) + ((guix build utils) #:select (dump-port)) + (nar-herder utils) + (nar-herder database) + (nar-herder storage) + (nar-herder mirror) + (nar-herder server)) + +(define %base-options + (list (option '("database") #t #f + (lambda (opt name arg result) + (alist-cons 'database + arg + result))))) + +(define %base-option-defaults + ;; Alist of default option values + `((database . ,(string-append (getcwd) "/nar_herder.db")) + (database-dump . ,(string-append (getcwd) "/nar_herder_dump.db")))) + +(define %server-options + (list (option '("port") #t #f + (lambda (opt name arg result) + (alist-cons 'port + (string->number arg) + (alist-delete 'port result)))) + (option '("host") #t #f + (lambda (opt name arg result) + (alist-cons 'host + arg + (alist-delete 'host result)))) + (option '("storage") #t #f + (lambda (opt name arg result) + (alist-cons 'storage + arg + (alist-delete 'storage result)))) + (option '("storage-limit") #t #f + (lambda (opt name arg result) + (alist-cons 'storage-limit + (if (string=? arg "none") + "none" + (string->number arg)) + (alist-delete 'storage-limit result)))) + + ;; stored-on=https://other-nar-herder-server + ;; stored-on=https://other-nar-herder-server&stored-on=https://different-server + (option '("storage-nar-removal-criteria") #t #f + (lambda (opt name arg result) + (alist-cons 'storage-nar-removal-criteria + (match (string-split arg #\=) + ((sym rest ...) + (cons (string->symbol sym) rest))) + result))) + + (option '("mirror") #t #f + (lambda (opt name arg result) + (alist-cons 'mirror + arg + (alist-delete 'mirror result)))))) + +(define %server-option-defaults + '((port . 8080) + (host . "0.0.0.0") + + (storage-limit . "none"))) + +(define (parse-options options defaults args) + (args-fold + args options + (lambda (opt name arg result) + (error "unrecognized option" name)) + (lambda (arg result) + (alist-cons + 'arguments + (cons arg + (or (assoc-ref result 'arguments) + '())) + (alist-delete 'arguments result))) + defaults)) + +(match (cdr (program-arguments)) + (("import" rest ...) + (let* ((opts (parse-options %base-options + %base-option-defaults + rest)) + (database (setup-database + (assq-ref opts 'database)))) + (let* ((narinfos + (append-map + (lambda (file-or-dir) + (let ((s (stat file-or-dir))) + (match (stat:type s) + ('regular (list file-or-dir)) + ('directory + (let ((dir file-or-dir)) + (map + (lambda (nar-filename) + (string-append dir + (if (string-suffix? "/" dir) + "" + "/") + nar-filename)) + (scandir file-or-dir + (lambda (name) + (string-suffix? ".narinfo" name))))))))) + rest)) + (len (length narinfos)) + (progress + (progress-reporter/bar len + (format #f "importing ~a narinfos" + len) + (current-error-port)))) + + (call-with-progress-reporter progress + (lambda (report) + (for-each (lambda (narinfo) + (database-insert-narinfo + database + (call-with-input-file narinfo + (lambda (port) + ;; Set url to a dummy value as this doesn't + ;; matter + (read-narinfo port "https://narherderdummyvalue")))) + + (report)) + narinfos)))))) + (("run-server" rest ...) + (simple-format (current-error-port) "locale is ~A\n" (check-locale!)) + + (let* ((opts (parse-options (append %base-options + %server-options) + (append %base-option-defaults + %server-option-defaults) + rest)) + (unknown-arguments + (or (assq-ref opts 'arguments) + '())) + (lgr (make <logger>)) + (port-log (make <port-log> + #:port (current-output-port) + #:formatter + (lambda (lvl time str) + (format #f "~a (~5a): ~a~%" + (strftime "%F %H:%M:%S" (localtime time)) + lvl + str))))) + + (define (download-database) + (let ((database-uri + (string->uri + (string-append (assq-ref opts 'mirror) + "/latest-database-dump")))) + (call-with-values + (lambda () + (http-get database-uri + #:decode-body? #f + #:streaming? #t)) + (lambda (response body) + (when (not (= (response-code response) 200)) + (error "unable to fetch database from mirror")) + + (call-with-output-file (assq-ref opts 'database) + (lambda (output-port) + (dump-port body output-port))) + + (simple-format (current-error-port) + "finished downloading the database\n"))))) + + (add-handler! lgr port-log) + (open-log! lgr) + (set-default-logger! lgr) + + (unless (null? unknown-arguments) + (simple-format (current-error-port) + "unknown arguments: ~A\n" + unknown-arguments) + (exit 1)) + + (unless (or (assq-ref opts 'mirror) + (assq-ref opts 'storage)) + (simple-format (current-error-port) + "error: you must specify --mirror or --storage\n") + (exit 1)) + + (and=> + (assq-ref opts 'mirror) + (lambda (mirror) + (let ((database-file (assq-ref opts 'database))) + (if (file-exists? database-file) + (begin + ;; TODO Open the database, and check if the + ;; latest changes in the database are visible on + ;; the source to mirror. If they're not, then + ;; delete the database and download it to get + ;; back in sync + + #f) + (download-database))))) + + (let ((database (setup-database (assq-ref opts 'database))) + (canonical-storage (and=> (assq-ref opts 'storage) + canonicalize-path))) + (and=> (assq-ref opts 'pid-file) + (lambda (pid-file) + (call-with-output-file pid-file + (lambda (port) + (simple-format port "~A\n" (getpid)))))) + + (when (not (file-exists? (assq-ref opts 'database-dump))) + (simple-format (current-error-port) + "dumping database...\n") + (dump-database database (assq-ref opts 'database-dump))) + + (and=> (assq-ref opts 'mirror) + (lambda (mirror) + (start-fetch-changes-thread database mirror) + + (when (assq-ref opts 'storage) + (start-mirroring-thread database + mirror + (assq-ref opts 'storage-limit) + canonical-storage)))) + + + (when (and (assq-ref opts 'storage) + (number? (assq-ref opts 'storage-limit))) + (start-nar-removal-thread database + canonical-storage + (assq-ref opts 'storage-limit) + (filter-map + (match-lambda + ((key . val) + (if (eq? key 'storage-nar-removal-criteria) + val + #f))) + opts))) + + (simple-format (current-error-port) + "starting server\n") + (run-server + (make-request-handler database + canonical-storage) + #:host (assq-ref opts 'host) + #:port (assq-ref opts 'port)))))) |