#!@GUILE@ --no-auto-compile -*- scheme -*- -*- geiser-scheme-implementation: guile -*- !# ;;; Nar Herder ;;; ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of the nar-herder. ;;; ;;; The Nar Herder is free software; you can redistribute it and/or ;;; modify it under the terms of the GNU General Public License as ;;; published by the Free Software Foundation; either version 3 of the ;;; License, or (at your option) any later version. ;;; ;;; The Nar Herder is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (when (and (string-prefix? "aarch64-" %host-type) (not (getenv "GUILE_JIT_THRESHOLD"))) (setenv "GUILE_JIT_THRESHOLD" "-1") (apply execlp (car (command-line)) (command-line))) (setvbuf (current-output-port) 'line) (setvbuf (current-error-port) 'line) (let ((columns (string->number (or (getenv "COLUMNS") "")))) (setenv "COLUMNS" (number->string (if columns (max 256 columns) 256)))) (use-modules (srfi srfi-1) (srfi srfi-19) (srfi srfi-37) (srfi srfi-43) (srfi srfi-71) (ice-9 ftw) (ice-9 match) (ice-9 format) (ice-9 threads) (ice-9 suspendable-ports) (web uri) (web client) (web response) (oop goops) (logging logger) (logging port-log) (prometheus) (fibers) (fibers channels) (fibers scheduler) (fibers conditions) (fibers web server) ((guix ui) #:select (read/eval string->duration)) (guix store) (guix progress) (guix narinfo) (guix derivations) ((guix store) #:select (store-path-hash-part)) ((guix build utils) #:select (dump-port)) ((guix build syscalls) #:select (set-thread-name)) (nar-herder utils) (nar-herder database) (nar-herder recent-changes) (nar-herder cached-compression) (nar-herder storage) (nar-herder mirror) (nar-herder server)) (install-suspendable-ports!) (define %valid-log-levels '(DEBUG INFO WARN ERROR)) (define %base-options (list (option '("database") #t #f (lambda (opt name arg result) (alist-cons 'database arg result))) (option '("database-dump") #t #f (lambda (opt name arg result) (alist-cons 'database-dump arg result))) (option '("storage") #t #f (lambda (opt name arg result) (alist-cons 'storage arg (alist-delete 'storage result)))) (option '("log-level") #t #f (lambda (opt name arg result) (alist-cons 'log-level (let ((level (string->symbol (string-upcase arg)))) (if (member level %valid-log-levels) level (error (simple-format #f "unknown log level ~A\nvalid levels are: ~A\n" level %valid-log-levels)))) (alist-delete 'log-level result)))))) (define %base-option-defaults ;; Alist of default option values `((database . ,(string-append (getcwd) "/nar_herder.db")) (database-dump . ,(string-append (getcwd) "/nar_herder_dump.db")) (log-level . DEBUG))) (define %import-options (list (option '("tag") #t #f (lambda (opt name arg result) (alist-cons 'tags (cons (match (string-split arg #\=) ((key value) (cons key value))) (or (assq-ref result 'tags) '())) (alist-delete 'tags result)))) (option '("ensure-references-exist") #f #f (lambda (opt name _ result) (alist-cons 'ensure-references-exist #t result))))) (define %import-options-defaults '()) (define %server-options (list (option '("port") #t #f (lambda (opt name arg result) (alist-cons 'port (string->number arg) (alist-delete 'port result)))) (option '("host") #t #f (lambda (opt name arg result) (alist-cons 'host arg (alist-delete 'host result)))) (option '("pid-file") #t #f (lambda (opt name arg result) (alist-cons 'pid-file arg (alist-delete 'pid-file result)))) (option '("storage-limit") #t #f (lambda (opt name arg result) (alist-cons 'storage-limit (if (string=? arg "none") "none" (string->number arg)) (alist-delete 'storage-limit result)))) ;; stored-on=https://other-nar-herder-server ;; stored-on=https://other-nar-herder-server&stored-on=https://different-server (option '("storage-nar-removal-criteria") #t #f (lambda (opt name arg result) (alist-cons 'storage-nar-removal-criteria (match (string-split arg #\=) ((sym rest) (list (string->symbol sym) (call-with-input-string rest read)))) result))) (option '("enable-cached-compression") #t #f (lambda (opt name arg result) (alist-cons 'cached-compression (match (string-split arg #\:) ((type) `((type . ,(string->symbol type)))) ((type level) `((type . ,(string->symbol type)) (level . ,(string->number level))))) result))) (option '("cached-compression-directory") #t #f (lambda (opt name arg result) (alist-cons 'cached-compression-directory (match (string-split arg #\=) ((type directory) (cons (string->symbol type) (canonicalize-path directory)))) result))) (option '("cached-compression-directory-max-size") #t #f (lambda (opt name arg result) (alist-cons 'cached-compression-directory-max-size (match (string-split arg #\=) ((type size) (cons (string->symbol type) (string->number size)))) result))) (option '("cached-compression-min-uses") #t #f (lambda (opt name arg result) (alist-cons 'cached-compression-min-uses (string->number arg) (alist-delete 'cached-compression-min-uses result)))) (option '("cached-compression-workers") #t #f (lambda (opt name arg result) (alist-cons 'cached-compression-workers (string->number arg) result))) (option '("cached-compression-nar-source") #t #f (lambda (opt name arg result) (alist-cons 'cached-compression-nar-source arg (alist-delete 'cached-compression-nar-source result)))) (option '("ttl") #t #f (lambda (opt name arg result) (let ((duration (string->duration arg))) (unless duration (simple-format (current-error-port) "~A: invalid duration\n" arg) (exit 1)) (alist-cons 'narinfo-ttl (time-second duration) result)))) (option '("negative-ttl") #t #f (lambda (opt name arg result) (let ((duration (string->duration arg))) (unless duration (simple-format (current-error-port) "~A: invalid duration\n" arg) (exit 1)) (alist-cons 'narinfo-negative-ttl (time-second duration) result)))) (option '("recent-changes-limit") #t #f (lambda (opt name arg result)+ (alist-cons 'recent-changes-limit (string->number arg) (alist-delete 'recent-changes-limit result)))) (option '("mirror") #t #f (lambda (opt name arg result) (alist-cons 'mirror arg (alist-delete 'mirror result)))))) (define %server-option-defaults '((port . 8080) (host . "0.0.0.0") (storage-limit . "none") (cached-compression-workers . 2) (cached-compression-min-uses . 3) (recent-changes-limit . 32768))) (define %check-options (list)) (define %check-option-defaults '()) (define (parse-options options defaults args) (args-fold args options (lambda (opt name arg result) (error "unrecognized option" name)) (lambda (arg result) (alist-cons 'arguments (cons arg (or (assoc-ref result 'arguments) '())) (alist-delete 'arguments result))) defaults)) (match (cdr (program-arguments)) (("import" rest ...) (let* ((opts (parse-options (append %base-options %import-options) (append %base-option-defaults %import-options-defaults) rest)) (metrics-registry (make-metrics-registry #:namespace "narherder")) (database (setup-database (assq-ref opts 'database) metrics-registry))) (let* ((narinfos (append-map (lambda (file-or-dir) (let ((s (stat file-or-dir))) (match (stat:type s) ('regular (list file-or-dir)) ('directory (let ((dir file-or-dir)) (map (lambda (nar-filename) (string-append dir (if (string-suffix? "/" dir) "" "/") nar-filename)) (scandir file-or-dir (lambda (name) (string-suffix? ".narinfo" name))))))))) (assq-ref opts 'arguments))) (len (length narinfos)) (progress (progress-reporter/bar len (format #f "importing ~a narinfos" len) (current-error-port)))) (call-with-progress-reporter progress (lambda (report) (database-call-with-transaction database (lambda (db) (let ((read-narinfos (map (lambda (narinfo-file) (let ((narinfo (call-with-input-file narinfo-file (lambda (port) ;; Set url to a dummy value as this doesn't ;; matter (read-narinfo port "https://narherderdummyvalue"))))) (database-insert-narinfo database narinfo #:tags (or (assq-ref opts 'tags) '())) (report) narinfo)) narinfos))) (when (assq-ref opts 'ensure-references-exist) (for-each (lambda (narinfo) (let ((self-reference (store-path-base (narinfo-path narinfo)))) (for-each (lambda (reference) (when (and (not (string=? reference self-reference)) (not (database-select-narinfo-by-hash database (string-take reference 32)))) (error (simple-format (current-error-port) "missing reference to ~A\n" reference)))) (narinfo-references narinfo)))) read-narinfos)))))))))) (("remove" rest ...) (let* ((opts (parse-options %base-options %base-option-defaults rest)) (metrics-registry (make-metrics-registry #:namespace "narherder")) (database (setup-database (assq-ref opts 'database) metrics-registry)) (lgr (make )) (port-log (make #:port (current-output-port) #:formatter (lambda (lvl time str) (format #f "~a (~5a): ~a~%" (strftime "%F %H:%M:%S" (localtime time)) lvl str))))) (add-handler! lgr port-log) (open-log! lgr) (set-default-logger! lgr) (for-each (lambda (store-path) (log-msg 'INFO "removing " store-path) (if (assq-ref opts 'storage) (begin (remove-nar-files-by-hash database (assq-ref opts 'storage) metrics-registry (store-path-hash-part store-path) #:error-unless-files-to-remove? #f)) (log-msg 'WARN "no --storage set, so just removing from the database")) (let ((removed? (database-remove-narinfo database store-path))) (unless removed? (log-msg 'WARN store-path " not found to remove")))) (assq-ref opts 'arguments)))) (("check" rest ...) (let* ((opts (parse-options (append %base-options %check-options) (append %base-option-defaults %check-option-defaults) rest)) (lgr (make )) (port-log (make #:port (current-output-port) #:formatter (lambda (lvl time str) (format #f "~a (~5a): ~a~%" (strftime "%F %H:%M:%S" (localtime time)) lvl str)))) (metrics-registry (make-metrics-registry #:namespace "narherder"))) (add-handler! lgr port-log) (open-log! lgr) (set-default-logger! lgr) (let ((log-level (assq-ref opts 'log-level))) (let loop ((levels %valid-log-levels)) (when (and (not (null? levels)) (not (eq? (car levels) log-level))) (disable-log-level! lgr (car levels)) (loop (cdr levels))))) (let* ((database (setup-database (assq-ref opts 'database) metrics-registry)) (canonical-storage (and=> (assq-ref opts 'storage) canonicalize-path))) (check-storage database canonical-storage metrics-registry)))) (("run-server" rest ...) (simple-format (current-error-port) "locale is ~A\n" (check-locale!)) (let* ((opts (parse-options (append %base-options %server-options) (append %base-option-defaults %server-option-defaults) rest)) (unknown-arguments (or (assq-ref opts 'arguments) '())) (lgr (make )) (port-log (make #:port (current-output-port) #:formatter (lambda (lvl time str) (format #f "~a (~5a): ~a~%" (strftime "%F %H:%M:%S" (localtime time)) lvl str)))) (metrics-registry (make-metrics-registry #:namespace "narherder"))) (define (download-database) (let ((database-uri (string->uri (string-append (assq-ref opts 'mirror) "/latest-database-dump")))) (with-fibers-port-timeouts (lambda () (call-with-values (lambda () (simple-format (current-error-port) "starting downloading the database\n") (let ((port socket (open-socket-for-uri* database-uri))) (http-get database-uri #:port port #:streaming? #t))) (lambda (response body) (when (not (= (response-code response) 200)) (error "unable to fetch database from mirror")) (let* ((reporter (progress-reporter/file (uri->string database-uri) (response-content-length response) (current-error-port))) (port (progress-report-port reporter body #:download-size (response-content-length response)))) (call-with-output-file (assq-ref opts 'database) (lambda (output-port) (dump-port port output-port))) (close-port port)) (simple-format (current-error-port) "finished downloading the database\n")))) #:timeout 30))) (add-handler! lgr port-log) (open-log! lgr) (set-default-logger! lgr) (let ((log-level (assq-ref opts 'log-level))) (let loop ((levels %valid-log-levels)) (when (and (not (null? levels)) (not (eq? (car levels) log-level))) (disable-log-level! lgr (car levels)) (loop (cdr levels))))) (unless (null? unknown-arguments) (simple-format (current-error-port) "unknown arguments: ~A\n" unknown-arguments) (exit 1)) (unless (or (assq-ref opts 'mirror) (assq-ref opts 'storage)) (simple-format (current-error-port) "error: you must specify --mirror or --storage\n") (exit 1)) (and=> (assq-ref opts 'pid-file) (lambda (pid-file) (call-with-output-file pid-file (lambda (port) (simple-format port "~A\n" (getpid)))))) (and=> (assq-ref opts 'mirror) (lambda (mirror) (let ((database-file (assq-ref opts 'database))) (if (file-exists? database-file) (begin ;; TODO Open the database, and check if the ;; latest changes in the database are visible on ;; the source to mirror. If they're not, then ;; delete the database and download it to get ;; back in sync #f) (download-database))))) (let* ((database (setup-database (assq-ref opts 'database) metrics-registry)) (canonical-storage (and=> (assq-ref opts 'storage) canonicalize-path)) (enabled-cached-compressions (let ((explicit-cached-compression-directories (filter-map (match-lambda (('cached-compression-directory . details) details) (_ #f)) opts)) (cached-compression-directories-max-sizes (filter-map (match-lambda (('cached-compression-directory-max-size . details) details) (_ #f)) opts))) (filter-map (match-lambda (('cached-compression . details) (let ((compression (assq-ref details 'type))) (cons compression `(,@(alist-delete 'type details) (directory . ,(or (assq-ref explicit-cached-compression-directories compression) (simple-format #f "/var/cache/nar-herder/nar/~A" compression))) (directory-max-size . ,(assq-ref cached-compression-directories-max-sizes compression)))))) (_ #f)) opts))) (cached-compression-min-uses (assq-ref opts 'cached-compression-min-uses)) (maybe-trigger-creation-of-compressed-nars (if (null? enabled-cached-compressions) #f (make-maybe-trigger-creation-of-compressed-nars database metrics-registry (or (assq-ref opts 'cached-compression-nar-source) canonical-storage) enabled-cached-compressions cached-compression-min-uses #:cached-compression-workers (assq-ref opts 'cached-compression-workers))))) (if (string=? (assq-ref opts 'database-dump) "disabled") (log-msg 'INFO "database dump disabled") (when (not (file-exists? (assq-ref opts 'database-dump))) (log-msg 'INFO "dumping database...") (dump-database database (assq-ref opts 'database-dump)))) (let ((finished? (make-condition))) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name "maintenance")) (const #t)) (run-fibers (lambda () (start-recent-change-removal-and-database-dump-fiber database (let ((filename (assq-ref opts 'database-dump))) (if (string=? filename "disabled") #f filename)) (* 24 3600) ; 24 hours (assq-ref opts 'recent-changes-limit)) (let ((mirror-channel (and=> (assq-ref opts 'mirror) (lambda (mirror) (start-fetch-changes-fiber database canonical-storage mirror metrics-registry) (when (assq-ref opts 'storage) (start-mirroring-fiber database mirror (assq-ref opts 'storage-limit) canonical-storage metrics-registry))))) (removal-channel (let ((nar-removal-criteria (filter-map (match-lambda ((key . val) (if (eq? key 'storage-nar-removal-criteria) val #f))) opts))) (when (and (assq-ref opts 'storage) (number? (assq-ref opts 'storage-limit)) (not (null? nar-removal-criteria))) (start-nar-removal-fiber database canonical-storage (assq-ref opts 'storage-limit) metrics-registry nar-removal-criteria)))) (addition-channel (make-channel))) (spawn-fiber (lambda () (while #t (match (get-message addition-channel) (('addition file) (when mirror-channel (put-message mirror-channel `(fetch ,file))) (when removal-channel (spawn-fiber (lambda () (sleep 60) (put-message removal-channel `(remove-from-storage ,file)) (sleep (* 5 60)) (put-message removal-channel `(remove-from-storage ,file)) (sleep (* 15 60)) (put-message removal-channel `(remove-from-storage ,file)) (sleep 3600) (put-message removal-channel `(remove-from-storage ,file)))))))))) (start-recent-change-listener-fiber database addition-channel removal-channel)) (log-msg 'DEBUG "finished maintenance setup") (wait finished?)) #:parallelism 1))) (call-with-sigint (lambda () (run-fibers (lambda () (let* ((current (current-scheduler)) (schedulers (cons current (scheduler-remote-peers current)))) (for-each (lambda (i sched) (spawn-fiber (lambda () (catch 'system-error (lambda () (set-thread-name (string-append "fibers " (number->string i)))) (const #t))) sched)) (iota (length schedulers)) schedulers)) (log-msg 'INFO "starting server, listening on " (assq-ref opts 'host) ":" (assq-ref opts 'port)) (run-server/patched (make-request-handler database canonical-storage #:ttl (assq-ref opts 'narinfo-ttl) #:negative-ttl (assq-ref opts 'narinfo-negative-ttl) #:logger lgr #:metrics-registry metrics-registry #:maybe-trigger-creation-of-compressed-nars maybe-trigger-creation-of-compressed-nars) #:host (assq-ref opts 'host) #:port (assq-ref opts 'port)) (wait finished?)) #:hz 0 #:parallelism 4)) finished?))))))