diff options
-rw-r--r-- | nar-herder/server.scm | 258 | ||||
-rw-r--r-- | scripts/nar-herder.in | 256 |
2 files changed, 261 insertions, 253 deletions
diff --git a/nar-herder/server.scm b/nar-herder/server.scm index 525d70b..7aa4115 100644 --- a/nar-herder/server.scm +++ b/nar-herder/server.scm @@ -28,26 +28,38 @@ #:use-module (ice-9 binary-ports) #:use-module (rnrs bytevectors) #:use-module (web uri) + #:use-module (web client) #:use-module (web response) #:use-module (web request) #:use-module (logging logger) #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers scheduler) + #:use-module (fibers conditions) + #:use-module (fibers operations) #:use-module (prometheus) #:use-module (json) #:use-module ((system foreign) #:select (bytevector->pointer pointer->bytevector)) #:use-module (guix store) #:use-module (guix base32) + #:use-module (guix progress) #:use-module (guix serialization) #:use-module ((guix utils) #:select (decompressed-port)) #:use-module ((guix build utils) #:select (dump-port)) + #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (nar-herder database) #:use-module (nar-herder storage) + #:use-module (nar-herder utils) + #:use-module (nar-herder mirror) + #:use-module (nar-herder recent-changes) + #:use-module (nar-herder cached-compression) #:use-module (ice-9 textual-ports) #:export (%compression-options + run-nar-herder-service make-request-handler)) (define %compression-options @@ -537,3 +549,249 @@ (values (build-response #:code 404) "404"))))) +(define* (run-nar-herder-service opts lgr) + (define (download-database) + (let ((database-uri + (string->uri + (string-append (assq-ref opts 'mirror) + "/latest-database-dump")))) + (with-fibers-port-timeouts + (lambda () + (call-with-values + (lambda () + (simple-format (current-error-port) + "starting downloading the database\n") + (let ((port + socket + (open-socket-for-uri* database-uri))) + (http-get database-uri + #:port port + #:streaming? #t))) + (lambda (response body) + (when (not (= (response-code response) 200)) + (error "unable to fetch database from mirror")) + + (let* ((reporter (progress-reporter/file + (uri->string database-uri) + (response-content-length response) + (current-error-port))) + (port + (progress-report-port + reporter + body + #:download-size (response-content-length response)))) + + (call-with-output-file (assq-ref opts 'database) + (lambda (output-port) + (dump-port port output-port))) + + (close-port port)) + + (simple-format (current-error-port) + "finished downloading the database\n")))) + #:timeout 30))) + + (define metrics-registry + (make-metrics-registry + #:namespace + "narherder")) + + (and=> + (assq-ref opts 'mirror) + (lambda (mirror) + (let ((database-file (assq-ref opts 'database))) + (if (file-exists? database-file) + (begin + ;; TODO Open the database, and check if the + ;; latest changes in the database are visible on + ;; the source to mirror. If they're not, then + ;; delete the database and download it to get + ;; back in sync + + #f) + (download-database))))) + + (let* ((database (setup-database (assq-ref opts 'database) + metrics-registry)) + (canonical-storage (and=> (assq-ref opts 'storage) + canonicalize-path)) + + (enabled-cached-compressions + (let ((explicit-cached-compression-directories + (filter-map + (match-lambda + (('cached-compression-directory . details) details) + (_ #f)) + opts)) + (cached-compression-directories-max-sizes + (filter-map + (match-lambda + (('cached-compression-directory-max-size . details) details) + (_ #f)) + opts))) + (filter-map + (match-lambda + (('cached-compression . details) + (let ((compression + (assq-ref details 'type))) + (cons compression + `(,@(alist-delete 'type details) + (directory + . ,(or (assq-ref explicit-cached-compression-directories + compression) + (simple-format #f "/var/cache/nar-herder/nar/~A" + compression))) + (directory-max-size + . ,(assq-ref cached-compression-directories-max-sizes + compression)))))) + (_ #f)) + opts))) + + (cached-compression-min-uses + (assq-ref opts 'cached-compression-min-uses)) + + (maybe-trigger-creation-of-compressed-nars + (if (null? enabled-cached-compressions) + #f + (make-maybe-trigger-creation-of-compressed-nars + database + metrics-registry + (or (assq-ref opts 'cached-compression-nar-source) + canonical-storage) + enabled-cached-compressions + cached-compression-min-uses + #:cached-compression-workers + (assq-ref opts 'cached-compression-workers))))) + + (if (string=? (assq-ref opts 'database-dump) + "disabled") + (log-msg 'INFO "database dump disabled") + (when (not (file-exists? (assq-ref opts 'database-dump))) + (log-msg 'INFO "dumping database...") + (dump-database database (assq-ref opts 'database-dump)))) + + (let ((finished? (make-condition))) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name "maintenance")) + (const #t)) + + (run-fibers + (lambda () + (start-recent-change-removal-and-database-dump-fiber + database + (let ((filename (assq-ref opts 'database-dump))) + (if (string=? filename "disabled") + #f + filename)) + (* 24 3600) ; 24 hours + (assq-ref opts 'recent-changes-limit)) + + (let ((mirror-channel + (and=> + (assq-ref opts 'mirror) + (lambda (mirror) + (start-fetch-changes-fiber database + canonical-storage + mirror + metrics-registry) + + (when (assq-ref opts 'storage) + (start-mirroring-fiber database + mirror + (assq-ref opts 'storage-limit) + canonical-storage + metrics-registry))))) + (removal-channel + (let ((nar-removal-criteria + (filter-map + (match-lambda + ((key . val) + (if (eq? key 'storage-nar-removal-criteria) + val + #f))) + opts))) + (when (and (assq-ref opts 'storage) + (number? (assq-ref opts 'storage-limit)) + (not (null? nar-removal-criteria))) + (start-nar-removal-fiber database + canonical-storage + (assq-ref opts 'storage-limit) + metrics-registry + nar-removal-criteria)))) + (addition-channel (make-channel))) + + (spawn-fiber + (lambda () + (while #t + (match (get-message addition-channel) + (('addition file) + (when mirror-channel + (put-message mirror-channel + `(fetch ,file))) + (when removal-channel + (spawn-fiber + (lambda () + (sleep 60) + (put-message removal-channel + `(remove-from-storage ,file)) + (sleep (* 5 60)) + (put-message removal-channel + `(remove-from-storage ,file)) + (sleep (* 15 60)) + (put-message removal-channel + `(remove-from-storage ,file)) + (sleep 3600) + (put-message removal-channel + `(remove-from-storage ,file)))))))))) + + (start-recent-change-listener-fiber + database + addition-channel + removal-channel)) + + (log-msg 'DEBUG "finished maintenance setup") + (wait finished?)) + #:parallelism 1))) + + (call-with-sigint + (lambda () + (run-fibers + (lambda () + (let* ((current (current-scheduler)) + (schedulers + (cons current (scheduler-remote-peers current)))) + (for-each + (lambda (i sched) + (spawn-fiber + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append "fibers " (number->string i)))) + (const #t))) + sched)) + (iota (length schedulers)) + schedulers)) + + (log-msg 'INFO "starting server, listening on " + (assq-ref opts 'host) ":" (assq-ref opts 'port)) + + (run-server/patched + (make-request-handler database + canonical-storage + #:ttl (assq-ref opts 'narinfo-ttl) + #:negative-ttl (assq-ref opts 'narinfo-negative-ttl) + #:logger lgr + #:metrics-registry metrics-registry + #:maybe-trigger-creation-of-compressed-nars + maybe-trigger-creation-of-compressed-nars) + #:host (assq-ref opts 'host) + #:port (assq-ref opts 'port)) + + (wait finished?)) + #:hz 0 + #:parallelism 4)) + finished?)))) diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in index 6d01f74..cfa4fe7 100644 --- a/scripts/nar-herder.in +++ b/scripts/nar-herder.in @@ -55,24 +55,16 @@ (logging port-log) (prometheus) (fibers) - (fibers channels) - (fibers scheduler) - (fibers conditions) - (fibers web server) ((guix ui) #:select (read/eval string->duration)) (guix store) - (guix progress) (guix narinfo) + (guix progress) (guix derivations) ((guix store) #:select (store-path-hash-part)) ((guix build utils) #:select (dump-port)) - ((guix build syscalls) #:select (set-thread-name)) (nar-herder utils) (nar-herder database) - (nar-herder recent-changes) - (nar-herder cached-compression) (nar-herder storage) - (nar-herder mirror) (nar-herder server)) (install-suspendable-ports!) @@ -468,51 +460,7 @@ (format #f "~a (~5a): ~a~%" (strftime "%F %H:%M:%S" (localtime time)) lvl - str)))) - (metrics-registry (make-metrics-registry - #:namespace - "narherder"))) - - (define (download-database) - (let ((database-uri - (string->uri - (string-append (assq-ref opts 'mirror) - "/latest-database-dump")))) - (with-fibers-port-timeouts - (lambda () - (call-with-values - (lambda () - (simple-format (current-error-port) - "starting downloading the database\n") - (let ((port - socket - (open-socket-for-uri* database-uri))) - (http-get database-uri - #:port port - #:streaming? #t))) - (lambda (response body) - (when (not (= (response-code response) 200)) - (error "unable to fetch database from mirror")) - - (let* ((reporter (progress-reporter/file - (uri->string database-uri) - (response-content-length response) - (current-error-port))) - (port - (progress-report-port - reporter - body - #:download-size (response-content-length response)))) - - (call-with-output-file (assq-ref opts 'database) - (lambda (output-port) - (dump-port port output-port))) - - (close-port port)) - - (simple-format (current-error-port) - "finished downloading the database\n")))) - #:timeout 30))) + str))))) (add-handler! lgr port-log) (open-log! lgr) @@ -543,202 +491,4 @@ (lambda (port) (simple-format port "~A\n" (getpid)))))) - (and=> - (assq-ref opts 'mirror) - (lambda (mirror) - (let ((database-file (assq-ref opts 'database))) - (if (file-exists? database-file) - (begin - ;; TODO Open the database, and check if the - ;; latest changes in the database are visible on - ;; the source to mirror. If they're not, then - ;; delete the database and download it to get - ;; back in sync - - #f) - (download-database))))) - - (let* ((database (setup-database (assq-ref opts 'database) - metrics-registry)) - (canonical-storage (and=> (assq-ref opts 'storage) - canonicalize-path)) - - (enabled-cached-compressions - (let ((explicit-cached-compression-directories - (filter-map - (match-lambda - (('cached-compression-directory . details) details) - (_ #f)) - opts)) - (cached-compression-directories-max-sizes - (filter-map - (match-lambda - (('cached-compression-directory-max-size . details) details) - (_ #f)) - opts))) - (filter-map - (match-lambda - (('cached-compression . details) - (let ((compression - (assq-ref details 'type))) - (cons compression - `(,@(alist-delete 'type details) - (directory - . ,(or (assq-ref explicit-cached-compression-directories - compression) - (simple-format #f "/var/cache/nar-herder/nar/~A" - compression))) - (directory-max-size - . ,(assq-ref cached-compression-directories-max-sizes - compression)))))) - (_ #f)) - opts))) - - (cached-compression-min-uses - (assq-ref opts 'cached-compression-min-uses)) - - (maybe-trigger-creation-of-compressed-nars - (if (null? enabled-cached-compressions) - #f - (make-maybe-trigger-creation-of-compressed-nars - database - metrics-registry - (or (assq-ref opts 'cached-compression-nar-source) - canonical-storage) - enabled-cached-compressions - cached-compression-min-uses - #:cached-compression-workers - (assq-ref opts 'cached-compression-workers))))) - - (if (string=? (assq-ref opts 'database-dump) - "disabled") - (log-msg 'INFO "database dump disabled") - (when (not (file-exists? (assq-ref opts 'database-dump))) - (log-msg 'INFO "dumping database...") - (dump-database database (assq-ref opts 'database-dump)))) - - (let ((finished? (make-condition))) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name "maintenance")) - (const #t)) - - (run-fibers - (lambda () - (start-recent-change-removal-and-database-dump-fiber - database - (let ((filename (assq-ref opts 'database-dump))) - (if (string=? filename "disabled") - #f - filename)) - (* 24 3600) ; 24 hours - (assq-ref opts 'recent-changes-limit)) - - (let ((mirror-channel - (and=> - (assq-ref opts 'mirror) - (lambda (mirror) - (start-fetch-changes-fiber database - canonical-storage - mirror - metrics-registry) - - (when (assq-ref opts 'storage) - (start-mirroring-fiber database - mirror - (assq-ref opts 'storage-limit) - canonical-storage - metrics-registry))))) - (removal-channel - (let ((nar-removal-criteria - (filter-map - (match-lambda - ((key . val) - (if (eq? key 'storage-nar-removal-criteria) - val - #f))) - opts))) - (when (and (assq-ref opts 'storage) - (number? (assq-ref opts 'storage-limit)) - (not (null? nar-removal-criteria))) - (start-nar-removal-fiber database - canonical-storage - (assq-ref opts 'storage-limit) - metrics-registry - nar-removal-criteria)))) - (addition-channel (make-channel))) - - (spawn-fiber - (lambda () - (while #t - (match (get-message addition-channel) - (('addition file) - (when mirror-channel - (put-message mirror-channel - `(fetch ,file))) - (when removal-channel - (spawn-fiber - (lambda () - (sleep 60) - (put-message removal-channel - `(remove-from-storage ,file)) - (sleep (* 5 60)) - (put-message removal-channel - `(remove-from-storage ,file)) - (sleep (* 15 60)) - (put-message removal-channel - `(remove-from-storage ,file)) - (sleep 3600) - (put-message removal-channel - `(remove-from-storage ,file)))))))))) - - (start-recent-change-listener-fiber - database - addition-channel - removal-channel)) - - (log-msg 'DEBUG "finished maintenance setup") - (wait finished?)) - #:parallelism 1))) - - (call-with-sigint - (lambda () - (run-fibers - (lambda () - (let* ((current (current-scheduler)) - (schedulers - (cons current (scheduler-remote-peers current)))) - (for-each - (lambda (i sched) - (spawn-fiber - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append "fibers " (number->string i)))) - (const #t))) - sched)) - (iota (length schedulers)) - schedulers)) - - (log-msg 'INFO "starting server, listening on " - (assq-ref opts 'host) ":" (assq-ref opts 'port)) - - (run-server/patched - (make-request-handler database - canonical-storage - #:ttl (assq-ref opts 'narinfo-ttl) - #:negative-ttl (assq-ref opts 'narinfo-negative-ttl) - #:logger lgr - #:metrics-registry metrics-registry - #:maybe-trigger-creation-of-compressed-nars - maybe-trigger-creation-of-compressed-nars) - #:host (assq-ref opts 'host) - #:port (assq-ref opts 'port)) - - (wait finished?)) - #:hz 0 - #:parallelism 4)) - finished?)))))) + (run-nar-herder-service opts lgr)))) |