diff options
Diffstat (limited to 'nar-herder/server.scm')
-rw-r--r-- | nar-herder/server.scm | 790 |
1 files changed, 680 insertions, 110 deletions
diff --git a/nar-herder/server.scm b/nar-herder/server.scm index 522ff3f..f9a8c32 100644 --- a/nar-herder/server.scm +++ b/nar-herder/server.scm @@ -19,39 +19,65 @@ (define-module (nar-herder server) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) + #:use-module (srfi srfi-19) #:use-module (srfi srfi-34) + #:use-module (srfi srfi-71) + #:use-module (ice-9 ftw) + #:use-module (ice-9 iconv) #:use-module (ice-9 match) + #:use-module (ice-9 threads) #:use-module (ice-9 binary-ports) #:use-module (rnrs bytevectors) #:use-module (web uri) + #:use-module (web client) #:use-module (web response) #:use-module (web request) #:use-module (logging logger) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers scheduler) + #:use-module (fibers conditions) + #:use-module (fibers operations) #:use-module (prometheus) #:use-module (json) #:use-module ((system foreign) #:select (bytevector->pointer pointer->bytevector)) #:use-module (guix store) #:use-module (guix base32) + #:use-module (guix progress) #:use-module (guix serialization) #:use-module ((guix utils) #:select (decompressed-port)) #:use-module ((guix build utils) #:select (dump-port)) + #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (nar-herder database) #:use-module (nar-herder storage) + #:use-module (nar-herder utils) + #:use-module (nar-herder mirror) + #:use-module (nar-herder recent-changes) + #:use-module (nar-herder cached-compression) #:use-module (ice-9 textual-ports) - #:export (make-request-handler)) + #:export (%compression-options + + run-nar-herder-service + make-request-handler)) + +(define %compression-options + '(gzip lzip zstd none)) (define* (render-json json #:key (extra-headers '()) (code 200)) (values (build-response #:code code #:headers (append extra-headers - '((content-type . (application/json)) + '((content-type . (application/json + (charset . "utf-8"))) (vary . (accept))))) - (lambda (port) - (scm->json json port)))) + (call-with-encoded-output-string + "utf-8" + (lambda (port) + (scm->json json port))))) (define (parse-query-string query) (let lp ((lst (map uri-decode (string-split query (char-set #\& #\=))))) @@ -61,32 +87,6 @@ (("") '()) (() '())))) -(define (get-gc-metrics-updater registry) - (define metrics - `((gc-time-taken - . ,(make-gauge-metric registry "guile_gc_time_taken")) - (heap-size - . ,(make-gauge-metric registry "guile_heap_size")) - (heap-free-size - . ,(make-gauge-metric registry "guile_heap_free_size")) - (heap-total-allocated - . ,(make-gauge-metric registry "guile_heap_total_allocated")) - (heap-allocated-since-gc - . ,(make-gauge-metric registry "guile_allocated_since_gc")) - (protected-objects - . ,(make-gauge-metric registry "guile_gc_protected_objects")) - (gc-times - . ,(make-gauge-metric registry "guile_gc_times")))) - - (lambda () - (let ((stats (gc-stats))) - (for-each - (match-lambda - ((name . metric) - (let ((value (assq-ref stats name))) - (metric-set metric value)))) - metrics)))) - (define (serve-fixed-output-file input compression proc) ;; TODO It's hard with fold-archive from (guix serialization) to ;; read just the singular file from the archive, so the following @@ -121,10 +121,11 @@ (define (read-string p) (utf8->string (read-byte-string p))) - (let*-values (((port pids) - (decompressed-port - (string->symbol compression) - input))) + (let ((port + pids + (decompressed-port + (string->symbol compression) + input))) ;; The decompressor can be an external program, so wait for it to ;; exit @@ -147,26 +148,78 @@ (proc port size))))) +(define (add-cached-compressions-to-narinfo initial-narinfo-contents + cached-narinfo-files) + (let ((cached-nar-strings + (map (lambda (cached-nar-details) + (let ((compression + (symbol->string + (assq-ref cached-nar-details 'compression)))) + (string-append + "URL: nar/" compression "/" + (uri-encode + (store-path-base + (assq-ref cached-nar-details 'store-path))) + "\n" + "Compression: " compression "\n" + "FileSize: " (number->string + (assq-ref cached-nar-details 'size)) + "\n"))) + cached-narinfo-files))) + (string-append + initial-narinfo-contents + (string-join + cached-nar-strings + "\n")))) + (define* (make-request-handler database storage-root - #:key ttl negative-ttl logger - metrics-registry) + #:key base-ttl base-cached-compressions-ttl + negative-ttl logger + metrics-registry + maybe-trigger-creation-of-cached-nars + cached-compression-nar-requested-hook) + (define hostname + (gethostname)) + (define (narinfo? str) (and (= (string-length str) 40) (string-suffix? ".narinfo" str))) + (define plain-metrics-registry + (make-metrics-registry)) + (define gc-metrics-updater - (get-gc-metrics-updater metrics-registry)) + (get-gc-metrics-updater plain-metrics-registry)) + + (define process-metrics-updater + (get-process-metrics-updater plain-metrics-registry)) + + (define guile-time-metrics-updater + (let ((internal-real-time + (make-gauge-metric plain-metrics-registry "guile_internal_real_time")) + (internal-run-time + (make-gauge-metric plain-metrics-registry "guile_internal_run_time"))) + (lambda () + (metric-set internal-real-time + (get-internal-real-time)) + (metric-set internal-run-time + (get-internal-run-time))))) (define requests-total-metric (make-counter-metric metrics-registry "server_requests_total")) - (define (increment-request-metric category response-code) + (define* (increment-request-metric category response-code #:key (labels '())) (metric-increment requests-total-metric #:label-values `((category . ,category) - (response_code . ,response-code)))) + (response_code . ,response-code) + ,@labels))) + + (define %compression-strings + (map symbol->string + %compression-options)) (lambda (request body) (log-msg logger @@ -178,30 +231,81 @@ (match (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) - (('GET (? narinfo? narinfo)) - (let ((narinfo-contents + (((or 'HEAD 'GET) (? narinfo? narinfo)) + (let ((base-narinfo-contents + narinfo-id (database-select-narinfo-contents-by-hash database (string-take narinfo 32)))) (increment-request-metric "narinfo" - (if narinfo-contents + (if base-narinfo-contents "200" "404")) - (if narinfo-contents - (values `((content-type . (text/plain)) - ,@(if ttl - `((cache-control (max-age . ,ttl))) - '())) - narinfo-contents) + (if base-narinfo-contents + (let* ((cached-narinfo-files + (database-select-cached-narinfo-files-by-narinfo-id + database + narinfo-id)) + (narinfo-contents + (if (null? cached-narinfo-files) + base-narinfo-contents + (add-cached-compressions-to-narinfo + base-narinfo-contents + cached-narinfo-files))) + (potential-ttls + (remove + not + `(,(if (null? cached-narinfo-files) + base-ttl + base-cached-compressions-ttl) + + ,(and=> (database-select-scheduled-narinfo-removal + database + narinfo-id) + (lambda (scheduled-removal-time) + (list + (max + (- (time-second + (time-difference scheduled-removal-time + (current-time))) + 60) + 0)))) + + ,@(if (null? cached-narinfo-files) + '() + (map + (lambda (details) + (and=> + (database-select-scheduled-cached-narinfo-removal + database + (assq-ref details 'id)) + (lambda (scheduled-removal-time) + (max + (- (time-second + (time-difference scheduled-removal-time + (current-time))) + 60) + 0)))) + cached-narinfo-files))))) + (ttl + (cond + ((null? potential-ttls) #f) + (else (apply min potential-ttls))))) + + (values `((content-type . (text/plain)) + ,@(if ttl + `((cache-control (max-age . ,ttl))) + '())) + narinfo-contents)) (values (build-response #:code 404 #:headers (if negative-ttl `((cache-control (max-age . ,negative-ttl))) '())) "404")))) - (('GET (? narinfo? narinfo) "info") + (((or 'HEAD 'GET) (? narinfo? narinfo) "info") (let ((narinfo-contents (database-select-narinfo-contents-by-hash database @@ -220,42 +324,133 @@ (string-take narinfo 32))))) (values (build-response #:code 404) "404")))) - (('GET "nar" compression filename) - (let* ((hash (string-take filename 32)) + (((or 'HEAD 'GET) "nar" compression filename) + (let* ((hash (and (>= (string-length filename) 32) + (string-take filename 32))) + (narinfo + (and hash + (database-select-narinfo-by-hash + database + hash))) (narinfo-files - (database-select-narinfo-files - database - hash)) + (and=> (assq-ref narinfo 'id) + (lambda (id) + (database-select-narinfo-files-by-narinfo-id + database + id)))) (narinfo-file-for-compression (find (lambda (file) - (string=? (assq-ref file 'compression) - compression)) - narinfo-files))) - - (when (or narinfo-file-for-compression - ;; Check for a common compression to avoid lots of - ;; metrics being generated if compression is random - (member compression '("gzip" "lzip" "zstd"))) - (increment-request-metric - (string-append "nar/" - compression) - (if narinfo-file-for-compression "200" "404"))) + (and (string=? (assq-ref file 'compression) + compression) + (string=? + (last (string-split (assq-ref file 'url) + #\/)) + (uri-encode filename)))) + (or narinfo-files '()))) + (compression-symbol + (if (member + compression + %compression-strings + string=?) + (string->symbol compression) + #f))) (if narinfo-file-for-compression - (values (build-response - #:code 200 - #:headers `((X-Accel-Redirect - . ,(string-append - "/internal/nar/" - compression "/" - (uri-encode filename))))) - #f) - (values (build-response #:code 404) - "404")))) - (('GET "file" name algo hash) + (let ((loop? + (any + (lambda (via) + (string=? (last (string-split via #\space)) + hostname)) + (request-via request)))) + + (when (and (not loop?) + maybe-trigger-creation-of-cached-nars) + (maybe-trigger-creation-of-cached-nars + (assq-ref narinfo 'id))) + + (when loop? + (log-msg logger 'WARN + (request-method request) + " " + (uri-path (request-uri request)) + ": loop detected (" hostname "): " + (string-join (request-via request) ", "))) + + (increment-request-metric + (string-append "nar/" + compression) + (if loop? + "500" + "200") + #:labels + (let ((system (assq-ref narinfo 'system))) + (if (string? system) + `((system . ,system)) + '()))) + + (if loop? + (values (build-response #:code 500) + (simple-format #f "loop detected (~A): ~A\n" + hostname + (request-via request))) + (values (build-response + #:code 200 + #:headers `((X-Accel-Redirect + . ,(string-append + "/internal/nar/" + compression "/" + (uri-encode filename))))) + #f))) + (let ((cached-narinfo-file + (and narinfo ; must be a known hash + compression-symbol ; must be a known compression + ;; Check that the filename given in the + ;; request matches the narinfo store-path + (string=? filename + (basename + (assq-ref narinfo 'store-path))) + (database-select-cached-narinfo-file-by-hash + database + hash + compression-symbol)))) + + (when (or cached-narinfo-file + ;; Check for a common compression to avoid lots of + ;; metrics being generated if compression is random + compression-symbol) + (increment-request-metric + (string-append "nar/" + compression) + (if cached-narinfo-file "200" "404") + #:labels + (if cached-narinfo-file + (let ((system (assq-ref narinfo 'system))) + (if (string? system) + `((system . ,system)) + '())) + '()))) + + (when cached-narinfo-file + (cached-compression-nar-requested-hook compression-symbol + filename)) + + (if cached-narinfo-file + (values (build-response + #:code 200 + #:headers `((X-Accel-Redirect + . ,(string-append + "/internal/cached-nar/" + compression "/" + (uri-encode filename))))) + #f) + (values (build-response #:code 404) + "404")))))) + (((or 'HEAD 'GET) "file" name algo hash) (guard (c ((invalid-base32-character? c) (values (build-response #:code 404) - "404"))) + (if (eq? (request-method request) 'HEAD) + #f + "404")))) (let ((hash-bytevector (nix-base32-string->bytevector hash))) (if (and (string=? algo "sha256") (= 32 (bytevector-length hash-bytevector))) @@ -272,43 +467,54 @@ store-path-hash)) (selected-narinfo-file ;; TODO Select intelligently - (first narinfo-files))) + (if (null? narinfo-files) + #f + (first narinfo-files))) + (filename + (and selected-narinfo-file + (let ((filename + (string-append + storage-root + (uri-decode + (assq-ref selected-narinfo-file 'url))))) + (and (file-exists? filename) + filename))))) (increment-request-metric "file" - (if selected-narinfo-file "200" "404")) - - (if selected-narinfo-file - (let* ((url - (assq-ref selected-narinfo-file 'url)) - (filename - (string-append storage-root - (uri-decode url)))) - - (serve-fixed-output-file - (open-input-file filename) - (assq-ref selected-narinfo-file - 'compression) - (lambda (nar-port bytes) - (values `((content-type . (application/octet-stream - (charset . "ISO-8859-1"))) - (content-length . ,bytes)) - (lambda (output-port) - (dump-port nar-port - output-port - bytes) - - (close-port output-port)))))) + (if filename "200" "404")) + + (if filename + (serve-fixed-output-file + (open-input-file filename) + (assq-ref selected-narinfo-file + 'compression) + (lambda (nar-port bytes) + (values `((content-type . (application/octet-stream + (charset . "ISO-8859-1"))) + (content-length . ,bytes)) + (if (eq? (request-method request) 'HEAD) + #f + (lambda (output-port) + (dump-port nar-port + output-port + bytes) + + (close-port nar-port)))))) (values (build-response #:code 404) - "404"))) + (if (eq? (request-method request) 'HEAD) + #f + "404")))) (begin (increment-request-metric "file" "404") (values (build-response #:code 404) - "404")))))) + (if (eq? (request-method request) 'HEAD) + #f + "404"))))))) - (('GET "recent-changes") + (((or 'HEAD 'GET) "recent-changes") (let ((query-parameters (or (and=> (uri-query (request-uri request)) parse-query-string) @@ -323,7 +529,7 @@ (or (assoc-ref query-parameters "since") "1970-01-01 00:00:01")))))))) - (('GET "latest-database-dump") + (((or 'HEAD 'GET) "latest-database-dump") (increment-request-metric "latest-database-dump" "200") @@ -331,20 +537,384 @@ #:code 200 #:headers '((X-Accel-Redirect . "/internal/database/nar_herder_dump.db"))) #f)) - (('GET "metrics") + (((or 'HEAD 'GET) "metrics") (gc-metrics-updater) + (process-metrics-updater) + (guile-time-metrics-updater) + (update-database-metrics! database) (increment-request-metric "metrics" "200") (values (build-response #:code 200 #:headers '((content-type . (text/plain)) (vary . (accept)))) - (lambda (port) - (write-metrics metrics-registry - port)))) + (call-with-output-string + (lambda (port) + (write-metrics metrics-registry port) + (write-metrics plain-metrics-registry port))))) (_ (increment-request-metric "unhandled" "404") (values (build-response #:code 404) "404"))))) +(define* (run-nar-herder-service opts lgr) + (define (download-database) + (let ((database-uri + (string->uri + (string-append (assq-ref opts 'mirror) + "/latest-database-dump")))) + (with-port-timeouts + (lambda () + (call-with-values + (lambda () + (simple-format (current-error-port) + "starting downloading the database\n") + (let ((port + socket + (open-socket-for-uri* database-uri))) + (http-get database-uri + #:port port + #:streaming? #t))) + (lambda (response body) + (when (not (= (response-code response) 200)) + (error "unable to fetch database from mirror")) + + (let* ((reporter (progress-reporter/file + (uri->string database-uri) + (response-content-length response) + (current-error-port))) + (port + (progress-report-port + reporter + body + #:download-size (response-content-length response)))) + + (call-with-output-file (assq-ref opts 'database) + (lambda (output-port) + (dump-port port output-port))) + + (close-port port)) + + (simple-format (current-error-port) + "finished downloading the database\n")))) + #:timeout 30))) + + (define metrics-registry + (make-metrics-registry + #:namespace + "narherder")) + + (and=> + (assq-ref opts 'mirror) + (lambda (mirror) + (let ((database-file (assq-ref opts 'database))) + (if (file-exists? database-file) + (begin + ;; TODO Open the database, and check if the + ;; latest changes in the database are visible on + ;; the source to mirror. If they're not, then + ;; delete the database and download it to get + ;; back in sync + + #f) + (download-database))))) + + ;; Used elsewhere + (make-gauge-metric metrics-registry "recent_changes_count") + + (let ((recent-changes-metric + (make-gauge-metric metrics-registry "recent_changes_limit"))) + (metric-set recent-changes-metric (assq-ref opts 'recent-changes-limit))) + + (define maintenance-scheduler + (make-scheduler #:parallelism 1)) + + (let* ((database (setup-database (assq-ref opts 'database) + metrics-registry + #:reader-threads + (assq-ref opts 'database-reader-threads))) + (canonical-storage (and=> (assq-ref opts 'storage) + canonicalize-path)) + + (enabled-cached-compressions + (let ((explicit-cached-compression-directories + (filter-map + (match-lambda + (('cached-compression-directory . details) details) + (_ #f)) + opts)) + (cached-compression-directories-max-sizes + (filter-map + (match-lambda + (('cached-compression-directory-max-size . details) details) + (_ #f)) + opts)) + (cached-compression-ttls + (filter-map + (match-lambda + (('cached-compression-ttl . details) details) + (_ #f)) + opts)) + (cached-compression-new-ttls + (filter-map + (match-lambda + (('cached-compression-new-ttl . details) details) + (_ #f)) + opts)) + (cached-compression-unused-removal-durations + (filter-map + (match-lambda + (('cached-compression-unused-removal-duration . details) + details) + (_ #f)) + opts))) + + (filter-map + (match-lambda + (('cached-compression . details) + (let ((compression + (assq-ref details 'type))) + (cons compression + `(,@(alist-delete 'type details) + (directory + . ,(or (assq-ref explicit-cached-compression-directories + compression) + (simple-format #f "/var/cache/nar-herder/nar/~A" + compression))) + (directory-max-size + . ,(assq-ref cached-compression-directories-max-sizes + compression)) + (ttl + . ,(assq-ref cached-compression-ttls + compression)) + (new-ttl + . ,(assq-ref cached-compression-new-ttls + compression)) + (unused-removal-duration + . ,(assq-ref cached-compression-unused-removal-durations + compression)))))) + (_ #f)) + opts))) + + (cached-compression-min-uses + (assq-ref opts 'cached-compression-min-uses)) + + (cached-compression-management-channel + (if (null? enabled-cached-compressions) + #f + (start-cached-compression-management-fiber + database + metrics-registry + (or (assq-ref opts 'cached-compression-nar-source) + canonical-storage) + enabled-cached-compressions + cached-compression-min-uses + #:cached-compression-workers + (assq-ref opts 'cached-compression-workers) + #:scheduler maintenance-scheduler))) + + (maybe-trigger-creation-of-cached-nars + (if (null? enabled-cached-compressions) + #f + (lambda (narinfo-id) + (spawn-fiber + (lambda () + (put-message cached-compression-management-channel + (cons 'narinfo-id narinfo-id))) + maintenance-scheduler)))) + + (cached-compression-nar-requested-hook + (if (null? enabled-cached-compressions) + #f + (lambda (compression filename) + (spawn-fiber + (lambda () + (let* ((directory + (assq-ref (assq-ref enabled-cached-compressions + compression) + 'directory))) + (utime (string-append directory "/" filename)))) + maintenance-scheduler))))) + + (if (string=? (assq-ref opts 'database-dump) + "disabled") + (log-msg 'INFO "database dump disabled") + (when (not (file-exists? (assq-ref opts 'database-dump))) + (log-msg 'INFO "dumping database...") + (dump-database database (assq-ref opts 'database-dump)))) + + (let ((finished? (make-condition))) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name "maintenance")) + (const #t)) + + (run-fibers + (lambda () + (initialise-storage-metrics + database + canonical-storage + metrics-registry) + + (start-recent-change-removal-and-database-dump-fiber + database + metrics-registry + (let ((filename (assq-ref opts 'database-dump))) + (if (string=? filename "disabled") + #f + filename)) + (* 24 3600) ; 24 hours + (assq-ref opts 'recent-changes-limit)) + + (let ((mirror-channel + (and=> + (assq-ref opts 'mirror) + (lambda (mirror) + (start-fetch-changes-fiber + database + metrics-registry + canonical-storage ; might be #f, but that's fine here + mirror + cached-compression-management-channel) + + (if (assq-ref opts 'storage) + (start-mirroring-fiber database + mirror + (assq-ref opts 'storage-limit) + canonical-storage + metrics-registry) + #f)))) + (removal-channel + (let ((nar-removal-criteria + (filter-map + (match-lambda + ((key . val) + (if (eq? key 'storage-nar-removal-criteria) + val + #f))) + opts))) + (if (and (assq-ref opts 'storage) + (number? (assq-ref opts 'storage-limit)) + (not (null? nar-removal-criteria))) + (start-nar-removal-fiber database + canonical-storage + (assq-ref opts 'storage-limit) + metrics-registry + nar-removal-criteria) + #f))) + (addition-channel (make-channel))) + + (spawn-fiber + (lambda () + (while #t + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception processing addition-channel: " + exn)) + (lambda () + (match (get-message addition-channel) + (('addition file) + ;; TODO Check if the file is actually not stored + (update-nar-files-metric metrics-registry + '() + #:not-stored-addition-count 1) + + (when mirror-channel + (spawn-fiber + (lambda () + (put-message mirror-channel + `(fetch ,file))))) + (when removal-channel + (spawn-fiber + (lambda () + (sleep 60) + (removal-channel-remove-nar-from-storage removal-channel + file) + (sleep (* 5 60)) + (removal-channel-remove-nar-from-storage removal-channel + file) + (sleep (* 15 60)) + (removal-channel-remove-nar-from-storage removal-channel + file) + (sleep 3600) + (removal-channel-remove-nar-from-storage removal-channel + file))))))) + #:unwind? #t)))) + + (start-recent-change-listener-fiber + database + metrics-registry + addition-channel + removal-channel)) + + (unless (null? enabled-cached-compressions) + (let ((cached-compression-removal-fiber-wakeup-channel + (start-cached-compression-removal-fiber + database + cached-compression-management-channel + enabled-cached-compressions))) + (start-cached-compression-schedule-removal-fiber + database + metrics-registry + cached-compression-management-channel + enabled-cached-compressions + cached-compression-removal-fiber-wakeup-channel + (or (assq-ref opts 'narinfo-ttl) + ;; Default from (guix substitutes) + (* 36 3600))))) + + (log-msg 'DEBUG "finished maintenance setup") + (wait finished?)) + #:scheduler maintenance-scheduler + #:hz 0 + #:parallelism 1))) + + (call-with-sigint + (lambda () + (run-fibers + (lambda () + (let* ((current (current-scheduler)) + (schedulers + (cons current (scheduler-remote-peers current)))) + (for-each + (lambda (i sched) + (spawn-fiber + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append "fibers " (number->string i)))) + (const #t))) + sched)) + (iota (length schedulers)) + schedulers)) + + (log-msg 'INFO "starting server, listening on " + (assq-ref opts 'host) ":" (assq-ref opts 'port)) + + (run-server/patched + (make-request-handler + database + canonical-storage + #:base-ttl (or (assq-ref opts 'new-narinfo-ttl) + (assq-ref opts 'narinfo-ttl)) + #:base-cached-compressions-ttl + (or (assq-ref opts 'new-cached-compressions-narinfo-ttl) + (assq-ref opts 'cached-compressions-narinfo-ttl)) + #:negative-ttl (assq-ref opts 'narinfo-negative-ttl) + #:logger lgr + #:metrics-registry metrics-registry + #:maybe-trigger-creation-of-cached-nars + maybe-trigger-creation-of-cached-nars + #:cached-compression-nar-requested-hook + cached-compression-nar-requested-hook) + #:host (assq-ref opts 'host) + #:port (assq-ref opts 'port)) + + (wait finished?)) + #:hz 0 + #:parallelism (assq-ref opts 'parallelism))) + finished?)))) |