;;; Nar Herder ;;; ;;; Copyright © 2021 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (nar-herder server) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) #:use-module (srfi srfi-19) #:use-module (srfi srfi-34) #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 iconv) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (ice-9 binary-ports) #:use-module (rnrs bytevectors) #:use-module (web uri) #:use-module (web client) #:use-module (web response) #:use-module (web request) #:use-module (logging logger) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers conditions) #:use-module (fibers operations) #:use-module (prometheus) #:use-module (json) #:use-module ((system foreign) #:select (bytevector->pointer pointer->bytevector)) #:use-module (guix store) #:use-module (guix base32) #:use-module (guix progress) #:use-module (guix serialization) #:use-module ((guix utils) #:select (decompressed-port)) #:use-module ((guix build utils) #:select (dump-port)) #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (nar-herder database) #:use-module (nar-herder storage) #:use-module (nar-herder utils) #:use-module (nar-herder mirror) #:use-module (nar-herder recent-changes) #:use-module (nar-herder cached-compression) #:use-module (ice-9 textual-ports) #:export (%compression-options run-nar-herder-service make-request-handler)) (define %compression-options '(gzip lzip zstd none)) (define* (render-json json #:key (extra-headers '()) (code 200)) (values (build-response #:code code #:headers (append extra-headers '((content-type . (application/json (charset . "utf-8"))) (vary . (accept))))) (call-with-encoded-output-string "utf-8" (lambda (port) (scm->json json port))))) (define (parse-query-string query) (let lp ((lst (map uri-decode (string-split query (char-set #\& #\=))))) (match lst ((key value . rest) (cons (cons key value) (lp rest))) (("") '()) (() '())))) (define (serve-fixed-output-file input compression proc) ;; TODO It's hard with fold-archive from (guix serialization) to ;; read just the singular file from the archive, so the following ;; procedures allow to just read the parts prior to the file, which ;; includes the file length (define (sub-bytevector bv len) (define max (bytevector-length bv)) (cond ((= len max) bv) ((< len max) ;; Yes, this is safe because the result of each conversion procedure ;; has its life cycle synchronized with that of its argument. (pointer->bytevector (bytevector->pointer bv) len)) (else (error "sub-bytevector called to get a super bytevector")))) (define (read-long-long p) (let ((b (get-bytevector-n p 8))) (bytevector-u64-ref b 0 (endianness little)))) (define (read-int p) (let ((b (get-bytevector-n p 8))) (bytevector-u32-ref b 0 (endianness little)))) (define (read-byte-string p) (let* ((len (read-int p)) (m (modulo len 8)) (pad (if (zero? m) 0 (- 8 m))) (bv (get-bytevector-n p (+ len pad)))) (sub-bytevector bv len))) (define (read-string p) (utf8->string (read-byte-string p))) (let ((port pids (decompressed-port (string->symbol compression) input))) ;; The decompressor can be an external program, so wait for it to ;; exit (every (compose zero? cdr waitpid) pids) (match (list (read-string port) (read-string port) (read-string port) (read-string port) (match (read-string port) ("contents" 'regular) ("executable" (match (list (read-string port) (read-string port)) (("" "contents") 'executable)))) (read-long-long port)) (("nix-archive-1" "(" "type" "regular" type size) (proc port size))))) (define (add-cached-compressions-to-narinfo initial-narinfo-contents cached-narinfo-files) (let ((cached-nar-strings (map (lambda (cached-nar-details) (let ((compression (symbol->string (assq-ref cached-nar-details 'compression)))) (string-append "URL: nar/" compression "/" (store-path-base (assq-ref cached-nar-details 'store-path)) "\n" "Compression: " compression "\n" "FileSize: " (number->string (assq-ref cached-nar-details 'size)) "\n"))) cached-narinfo-files))) (string-append initial-narinfo-contents (string-join cached-nar-strings "\n")))) (define* (make-request-handler database storage-root #:key base-ttl base-cached-compressions-ttl negative-ttl logger metrics-registry maybe-trigger-creation-of-cached-nars cached-compression-nar-requested-hook) (define hostname (gethostname)) (define (narinfo? str) (and (= (string-length str) 40) (string-suffix? ".narinfo" str))) (define plain-metrics-registry (make-metrics-registry)) (define gc-metrics-updater (get-gc-metrics-updater plain-metrics-registry)) (define process-metrics-updater (get-process-metrics-updater plain-metrics-registry)) (define guile-time-metrics-updater (let ((internal-real-time (make-gauge-metric plain-metrics-registry "guile_internal_real_time")) (internal-run-time (make-gauge-metric plain-metrics-registry "guile_internal_run_time"))) (lambda () (metric-set internal-real-time (get-internal-real-time)) (metric-set internal-run-time (get-internal-run-time))))) (define requests-total-metric (make-counter-metric metrics-registry "server_requests_total")) (define* (increment-request-metric category response-code #:key (labels '())) (metric-increment requests-total-metric #:label-values `((category . ,category) (response_code . ,response-code) ,@labels))) (define %compression-strings (map symbol->string %compression-options)) (lambda (request body) (log-msg logger 'DEBUG (request-method request) " " (uri-path (request-uri request))) (match (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) (((or 'HEAD 'GET) (? narinfo? narinfo)) (let ((base-narinfo-contents narinfo-id (database-select-narinfo-contents-by-hash database (string-take narinfo 32)))) (increment-request-metric "narinfo" (if base-narinfo-contents "200" "404")) (if base-narinfo-contents (let* ((cached-narinfo-files (database-select-cached-narinfo-files-by-narinfo-id database narinfo-id)) (narinfo-contents (if (null? cached-narinfo-files) base-narinfo-contents (add-cached-compressions-to-narinfo base-narinfo-contents cached-narinfo-files))) (potential-ttls (remove not `(,(if (null? cached-narinfo-files) base-ttl base-cached-compressions-ttl) ,(and=> (database-select-scheduled-narinfo-removal database narinfo-id) (lambda (scheduled-removal-time) (list (max (- (time-second (time-difference scheduled-removal-time (current-time))) 60) 0)))) ,@(if (null? cached-narinfo-files) '() (map (lambda (details) (and=> (database-select-scheduled-cached-narinfo-removal database (assq-ref details 'id)) (lambda (scheduled-removal-time) (max (- (time-second (time-difference scheduled-removal-time (current-time))) 60) 0)))) cached-narinfo-files))))) (ttl (cond ((null? potential-ttls) #f) (else (apply min potential-ttls))))) (values `((content-type . (text/plain)) ,@(if ttl `((cache-control (max-age . ,ttl))) '())) narinfo-contents)) (values (build-response #:code 404 #:headers (if negative-ttl `((cache-control (max-age . ,negative-ttl))) '())) "404")))) (((or 'HEAD 'GET) (? narinfo? narinfo) "info") (let ((narinfo-contents (database-select-narinfo-contents-by-hash database (string-take narinfo 32)))) (increment-request-metric "narinfo/info" (if narinfo-contents "200" "404")) (if narinfo-contents (render-json `((stored . ,(store-item-in-local-storage? database storage-root (string-take narinfo 32))))) (values (build-response #:code 404) "404")))) (((or 'HEAD 'GET) "nar" compression filename) (let* ((hash (and (>= (string-length filename) 32) (string-take filename 32))) (narinfo (and hash (database-select-narinfo-by-hash database hash))) (narinfo-files (and=> (assq-ref narinfo 'id) (lambda (id) (database-select-narinfo-files-by-narinfo-id database id)))) (narinfo-file-for-compression (find (lambda (file) (and (string=? (assq-ref file 'compression) compression) (string=? (last (string-split (assq-ref file 'url) #\/)) (uri-encode filename)))) (or narinfo-files '()))) (compression-symbol (if (member compression %compression-strings string=?) (string->symbol compression) #f))) (if narinfo-file-for-compression (let ((loop? (any (lambda (via) (string=? (last (string-split via #\space)) hostname)) (request-via request)))) (when (and (not loop?) maybe-trigger-creation-of-cached-nars) (maybe-trigger-creation-of-cached-nars (assq-ref narinfo 'id))) (when loop? (log-msg logger 'WARN (request-method request) " " (uri-path (request-uri request)) ": loop detected (" hostname "): " (string-join (request-via request) ", "))) (increment-request-metric (string-append "nar/" compression) (if loop? "500" "200") #:labels (let ((system (assq-ref narinfo 'system))) (if (string? system) `((system . ,system)) '()))) (if loop? (values (build-response #:code 500) (simple-format #f "loop detected (~A): ~A\n" hostname (request-via request))) (values (build-response #:code 200 #:headers `((X-Accel-Redirect . ,(string-append "/internal/nar/" compression "/" (uri-encode filename))))) #f))) (let ((cached-narinfo-file (and compression-symbol ;; Check that the filename given in the ;; request matches the narinfo store-path (string=? filename (basename (assq-ref narinfo 'store-path))) (database-select-cached-narinfo-file-by-hash database hash compression-symbol)))) (when (or cached-narinfo-file ;; Check for a common compression to avoid lots of ;; metrics being generated if compression is random compression-symbol) (increment-request-metric (string-append "nar/" compression) (if cached-narinfo-file "200" "404") #:labels (if cached-narinfo-file (let ((system (assq-ref narinfo 'system))) (if (string? system) `((system . ,system)) '())) '()))) (when cached-narinfo-file (cached-compression-nar-requested-hook compression-symbol filename)) (if cached-narinfo-file (values (build-response #:code 200 #:headers `((X-Accel-Redirect . ,(string-append "/internal/cached-nar/" compression "/" (uri-encode filename))))) #f) (values (build-response #:code 404) "404")))))) (((or 'HEAD 'GET) "file" name algo hash) (guard (c ((invalid-base32-character? c) (values (build-response #:code 404) (if (eq? (request-method request) 'HEAD) #f "404")))) (let ((hash-bytevector (nix-base32-string->bytevector hash))) (if (and (string=? algo "sha256") (= 32 (bytevector-length hash-bytevector))) (let* ((store-path (fixed-output-path name hash-bytevector #:hash-algo (string->symbol algo) #:recursive? #f)) (store-path-hash (store-path-hash-part store-path)) (narinfo-files (database-select-narinfo-files database store-path-hash)) (selected-narinfo-file ;; TODO Select intelligently (if (null? narinfo-files) #f (first narinfo-files))) (filename (and selected-narinfo-file (let ((filename (string-append storage-root (uri-decode (assq-ref selected-narinfo-file 'url))))) (and (file-exists? filename) filename))))) (increment-request-metric "file" (if filename "200" "404")) (if filename (serve-fixed-output-file (open-input-file filename) (assq-ref selected-narinfo-file 'compression) (lambda (nar-port bytes) (values `((content-type . (application/octet-stream (charset . "ISO-8859-1"))) (content-length . ,bytes)) (if (eq? (request-method request) 'HEAD) #f (lambda (output-port) (dump-port nar-port output-port bytes) (close-port nar-port)))))) (values (build-response #:code 404) (if (eq? (request-method request) 'HEAD) #f "404")))) (begin (increment-request-metric "file" "404") (values (build-response #:code 404) (if (eq? (request-method request) 'HEAD) #f "404"))))))) (((or 'HEAD 'GET) "recent-changes") (let ((query-parameters (or (and=> (uri-query (request-uri request)) parse-query-string) '()))) (increment-request-metric "recent-changes" "200") (render-json `((recent_changes . ,(list->vector (database-select-recent-changes database (or (assoc-ref query-parameters "since") "1970-01-01 00:00:01")))))))) (((or 'HEAD 'GET) "latest-database-dump") (increment-request-metric "latest-database-dump" "200") (values (build-response #:code 200 #:headers '((X-Accel-Redirect . "/internal/database/nar_herder_dump.db"))) #f)) (((or 'HEAD 'GET) "metrics") (gc-metrics-updater) (process-metrics-updater) (guile-time-metrics-updater) (increment-request-metric "metrics" "200") (values (build-response #:code 200 #:headers '((content-type . (text/plain)) (vary . (accept)))) (call-with-output-string (lambda (port) (write-metrics metrics-registry port) (write-metrics plain-metrics-registry port))))) (_ (increment-request-metric "unhandled" "404") (values (build-response #:code 404) "404"))))) (define* (run-nar-herder-service opts lgr) (define (download-database) (let ((database-uri (string->uri (string-append (assq-ref opts 'mirror) "/latest-database-dump")))) (with-port-timeouts (lambda () (call-with-values (lambda () (simple-format (current-error-port) "starting downloading the database\n") (let ((port socket (open-socket-for-uri* database-uri))) (http-get database-uri #:port port #:streaming? #t))) (lambda (response body) (when (not (= (response-code response) 200)) (error "unable to fetch database from mirror")) (let* ((reporter (progress-reporter/file (uri->string database-uri) (response-content-length response) (current-error-port))) (port (progress-report-port reporter body #:download-size (response-content-length response)))) (call-with-output-file (assq-ref opts 'database) (lambda (output-port) (dump-port port output-port))) (close-port port)) (simple-format (current-error-port) "finished downloading the database\n")))) #:timeout 30))) (define metrics-registry (make-metrics-registry #:namespace "narherder")) (and=> (assq-ref opts 'mirror) (lambda (mirror) (let ((database-file (assq-ref opts 'database))) (if (file-exists? database-file) (begin ;; TODO Open the database, and check if the ;; latest changes in the database are visible on ;; the source to mirror. If they're not, then ;; delete the database and download it to get ;; back in sync #f) (download-database))))) ;; Used elsewhere (make-gauge-metric metrics-registry "recent_changes_count") (let ((recent-changes-metric (make-gauge-metric metrics-registry "recent_changes_limit"))) (metric-set recent-changes-metric (assq-ref opts 'recent-changes-limit))) (define maintenance-scheduler (make-scheduler #:parallelism 1)) (let* ((database (setup-database (assq-ref opts 'database) metrics-registry)) (canonical-storage (and=> (assq-ref opts 'storage) canonicalize-path)) (enabled-cached-compressions (let ((explicit-cached-compression-directories (filter-map (match-lambda (('cached-compression-directory . details) details) (_ #f)) opts)) (cached-compression-directories-max-sizes (filter-map (match-lambda (('cached-compression-directory-max-size . details) details) (_ #f)) opts)) (cached-compression-ttls (filter-map (match-lambda (('cached-compression-ttl . details) details) (_ #f)) opts)) (cached-compression-new-ttls (filter-map (match-lambda (('cached-compression-new-ttl . details) details) (_ #f)) opts)) (cached-compression-unused-removal-durations (filter-map (match-lambda (('cached-compression-unused-removal-duration . details) details) (_ #f)) opts))) (filter-map (match-lambda (('cached-compression . details) (let ((compression (assq-ref details 'type))) (cons compression `(,@(alist-delete 'type details) (directory . ,(or (assq-ref explicit-cached-compression-directories compression) (simple-format #f "/var/cache/nar-herder/nar/~A" compression))) (directory-max-size . ,(assq-ref cached-compression-directories-max-sizes compression)) (ttl . ,(assq-ref cached-compression-ttls compression)) (new-ttl . ,(assq-ref cached-compression-new-ttls compression)) (unused-removal-duration . ,(assq-ref cached-compression-unused-removal-durations compression)))))) (_ #f)) opts))) (cached-compression-min-uses (assq-ref opts 'cached-compression-min-uses)) (cached-compression-management-channel (if (null? enabled-cached-compressions) #f (start-cached-compression-management-fiber database metrics-registry (or (assq-ref opts 'cached-compression-nar-source) canonical-storage) enabled-cached-compressions cached-compression-min-uses #:cached-compression-workers (assq-ref opts 'cached-compression-workers) #:scheduler maintenance-scheduler))) (maybe-trigger-creation-of-cached-nars (if (null? enabled-cached-compressions) #f (lambda (narinfo-id) (spawn-fiber (lambda () (put-message cached-compression-management-channel (cons 'narinfo-id narinfo-id))) maintenance-scheduler)))) (cached-compression-nar-requested-hook (if (null? enabled-cached-compressions) #f (lambda (compression filename) (spawn-fiber (lambda () (let* ((directory (assq-ref (assq-ref enabled-cached-compressions compression) 'directory))) (utime (string-append directory "/" filename)))) maintenance-scheduler))))) (if (string=? (assq-ref opts 'database-dump) "disabled") (log-msg 'INFO "database dump disabled") (when (not (file-exists? (assq-ref opts 'database-dump))) (log-msg 'INFO "dumping database...") (dump-database database (assq-ref opts 'database-dump)))) (let ((finished? (make-condition))) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name "maintenance")) (const #t)) (run-fibers (lambda () (initialise-storage-metrics database canonical-storage metrics-registry) (start-recent-change-removal-and-database-dump-fiber database metrics-registry (let ((filename (assq-ref opts 'database-dump))) (if (string=? filename "disabled") #f filename)) (* 24 3600) ; 24 hours (assq-ref opts 'recent-changes-limit)) (let ((mirror-channel (and=> (assq-ref opts 'mirror) (lambda (mirror) (start-fetch-changes-fiber database metrics-registry canonical-storage mirror cached-compression-management-channel) (if (assq-ref opts 'storage) (start-mirroring-fiber database mirror (assq-ref opts 'storage-limit) canonical-storage metrics-registry) #f)))) (removal-channel (let ((nar-removal-criteria (filter-map (match-lambda ((key . val) (if (eq? key 'storage-nar-removal-criteria) val #f))) opts))) (if (and (assq-ref opts 'storage) (number? (assq-ref opts 'storage-limit)) (not (null? nar-removal-criteria))) (start-nar-removal-fiber database canonical-storage (assq-ref opts 'storage-limit) metrics-registry nar-removal-criteria) #f))) (addition-channel (make-channel))) (spawn-fiber (lambda () (while #t (match (get-message addition-channel) (('addition file) (when mirror-channel (put-message mirror-channel `(fetch ,file))) (when removal-channel (spawn-fiber (lambda () (sleep 60) (put-message removal-channel `(remove-from-storage ,file)) (sleep (* 5 60)) (put-message removal-channel `(remove-from-storage ,file)) (sleep (* 15 60)) (put-message removal-channel `(remove-from-storage ,file)) (sleep 3600) (put-message removal-channel `(remove-from-storage ,file)))))))))) (start-recent-change-listener-fiber database metrics-registry addition-channel removal-channel)) (unless (null? enabled-cached-compressions) (let ((cached-compression-removal-fiber-wakeup-channel (start-cached-compression-removal-fiber database cached-compression-management-channel enabled-cached-compressions))) (start-cached-compression-schedule-removal-fiber database cached-compression-management-channel enabled-cached-compressions cached-compression-removal-fiber-wakeup-channel (or (assq-ref opts 'narinfo-ttl) ;; Default from (guix substitutes) (* 36 3600))))) (log-msg 'DEBUG "finished maintenance setup") (wait finished?)) #:scheduler maintenance-scheduler #:hz 0 #:parallelism 1))) (call-with-sigint (lambda () (run-fibers (lambda () (let* ((current (current-scheduler)) (schedulers (cons current (scheduler-remote-peers current)))) (for-each (lambda (i sched) (spawn-fiber (lambda () (catch 'system-error (lambda () (set-thread-name (string-append "fibers " (number->string i)))) (const #t))) sched)) (iota (length schedulers)) schedulers)) (log-msg 'INFO "starting server, listening on " (assq-ref opts 'host) ":" (assq-ref opts 'port)) (run-server/patched (make-request-handler database canonical-storage #:base-ttl (or (assq-ref opts 'narinfo-new-ttl) (assq-ref opts 'narinfo-ttl)) #:base-cached-compressions-ttl (or (assq-ref opts 'new-cached-compressions-narinfo-ttl) (assq-ref opts 'cached-compressions-narinfo-ttl)) #:negative-ttl (assq-ref opts 'narinfo-negative-ttl) #:logger lgr #:metrics-registry metrics-registry #:maybe-trigger-creation-of-cached-nars maybe-trigger-creation-of-cached-nars #:cached-compression-nar-requested-hook cached-compression-nar-requested-hook) #:host (assq-ref opts 'host) #:port (assq-ref opts 'port)) (wait finished?)) #:hz 0 #:parallelism 4)) finished?))))