aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2023-09-12 14:48:04 +0100
committerChristopher Baines <mail@cbaines.net>2023-09-12 14:48:04 +0100
commitbf539aa08edfe8010606a31c00e0296c3d400319 (patch)
treee10db5e2ac90a2b596410bd0eead1d71613ea88e
parent5415a220bed4c84108caf35480d4b97764b006d3 (diff)
downloadnar-herder-bf539aa08edfe8010606a31c00e0296c3d400319.tar
nar-herder-bf539aa08edfe8010606a31c00e0296c3d400319.tar.gz
Move most functionality out of the start script
As this can make debugging easier.
-rw-r--r--nar-herder/server.scm258
-rw-r--r--scripts/nar-herder.in256
2 files changed, 261 insertions, 253 deletions
diff --git a/nar-herder/server.scm b/nar-herder/server.scm
index 525d70b..7aa4115 100644
--- a/nar-herder/server.scm
+++ b/nar-herder/server.scm
@@ -28,26 +28,38 @@
#:use-module (ice-9 binary-ports)
#:use-module (rnrs bytevectors)
#:use-module (web uri)
+ #:use-module (web client)
#:use-module (web response)
#:use-module (web request)
#:use-module (logging logger)
#:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers scheduler)
+ #:use-module (fibers conditions)
+ #:use-module (fibers operations)
#:use-module (prometheus)
#:use-module (json)
#:use-module ((system foreign)
#:select (bytevector->pointer pointer->bytevector))
#:use-module (guix store)
#:use-module (guix base32)
+ #:use-module (guix progress)
#:use-module (guix serialization)
#:use-module ((guix utils)
#:select (decompressed-port))
#:use-module ((guix build utils)
#:select (dump-port))
+ #:use-module ((guix build syscalls) #:select (set-thread-name))
#:use-module (nar-herder database)
#:use-module (nar-herder storage)
+ #:use-module (nar-herder utils)
+ #:use-module (nar-herder mirror)
+ #:use-module (nar-herder recent-changes)
+ #:use-module (nar-herder cached-compression)
#:use-module (ice-9 textual-ports)
#:export (%compression-options
+ run-nar-herder-service
make-request-handler))
(define %compression-options
@@ -537,3 +549,249 @@
(values (build-response #:code 404)
"404")))))
+(define* (run-nar-herder-service opts lgr)
+ (define (download-database)
+ (let ((database-uri
+ (string->uri
+ (string-append (assq-ref opts 'mirror)
+ "/latest-database-dump"))))
+ (with-fibers-port-timeouts
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (simple-format (current-error-port)
+ "starting downloading the database\n")
+ (let ((port
+ socket
+ (open-socket-for-uri* database-uri)))
+ (http-get database-uri
+ #:port port
+ #:streaming? #t)))
+ (lambda (response body)
+ (when (not (= (response-code response) 200))
+ (error "unable to fetch database from mirror"))
+
+ (let* ((reporter (progress-reporter/file
+ (uri->string database-uri)
+ (response-content-length response)
+ (current-error-port)))
+ (port
+ (progress-report-port
+ reporter
+ body
+ #:download-size (response-content-length response))))
+
+ (call-with-output-file (assq-ref opts 'database)
+ (lambda (output-port)
+ (dump-port port output-port)))
+
+ (close-port port))
+
+ (simple-format (current-error-port)
+ "finished downloading the database\n"))))
+ #:timeout 30)))
+
+ (define metrics-registry
+ (make-metrics-registry
+ #:namespace
+ "narherder"))
+
+ (and=>
+ (assq-ref opts 'mirror)
+ (lambda (mirror)
+ (let ((database-file (assq-ref opts 'database)))
+ (if (file-exists? database-file)
+ (begin
+ ;; TODO Open the database, and check if the
+ ;; latest changes in the database are visible on
+ ;; the source to mirror. If they're not, then
+ ;; delete the database and download it to get
+ ;; back in sync
+
+ #f)
+ (download-database)))))
+
+ (let* ((database (setup-database (assq-ref opts 'database)
+ metrics-registry))
+ (canonical-storage (and=> (assq-ref opts 'storage)
+ canonicalize-path))
+
+ (enabled-cached-compressions
+ (let ((explicit-cached-compression-directories
+ (filter-map
+ (match-lambda
+ (('cached-compression-directory . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-directories-max-sizes
+ (filter-map
+ (match-lambda
+ (('cached-compression-directory-max-size . details) details)
+ (_ #f))
+ opts)))
+ (filter-map
+ (match-lambda
+ (('cached-compression . details)
+ (let ((compression
+ (assq-ref details 'type)))
+ (cons compression
+ `(,@(alist-delete 'type details)
+ (directory
+ . ,(or (assq-ref explicit-cached-compression-directories
+ compression)
+ (simple-format #f "/var/cache/nar-herder/nar/~A"
+ compression)))
+ (directory-max-size
+ . ,(assq-ref cached-compression-directories-max-sizes
+ compression))))))
+ (_ #f))
+ opts)))
+
+ (cached-compression-min-uses
+ (assq-ref opts 'cached-compression-min-uses))
+
+ (maybe-trigger-creation-of-compressed-nars
+ (if (null? enabled-cached-compressions)
+ #f
+ (make-maybe-trigger-creation-of-compressed-nars
+ database
+ metrics-registry
+ (or (assq-ref opts 'cached-compression-nar-source)
+ canonical-storage)
+ enabled-cached-compressions
+ cached-compression-min-uses
+ #:cached-compression-workers
+ (assq-ref opts 'cached-compression-workers)))))
+
+ (if (string=? (assq-ref opts 'database-dump)
+ "disabled")
+ (log-msg 'INFO "database dump disabled")
+ (when (not (file-exists? (assq-ref opts 'database-dump)))
+ (log-msg 'INFO "dumping database...")
+ (dump-database database (assq-ref opts 'database-dump))))
+
+ (let ((finished? (make-condition)))
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name "maintenance"))
+ (const #t))
+
+ (run-fibers
+ (lambda ()
+ (start-recent-change-removal-and-database-dump-fiber
+ database
+ (let ((filename (assq-ref opts 'database-dump)))
+ (if (string=? filename "disabled")
+ #f
+ filename))
+ (* 24 3600) ; 24 hours
+ (assq-ref opts 'recent-changes-limit))
+
+ (let ((mirror-channel
+ (and=>
+ (assq-ref opts 'mirror)
+ (lambda (mirror)
+ (start-fetch-changes-fiber database
+ canonical-storage
+ mirror
+ metrics-registry)
+
+ (when (assq-ref opts 'storage)
+ (start-mirroring-fiber database
+ mirror
+ (assq-ref opts 'storage-limit)
+ canonical-storage
+ metrics-registry)))))
+ (removal-channel
+ (let ((nar-removal-criteria
+ (filter-map
+ (match-lambda
+ ((key . val)
+ (if (eq? key 'storage-nar-removal-criteria)
+ val
+ #f)))
+ opts)))
+ (when (and (assq-ref opts 'storage)
+ (number? (assq-ref opts 'storage-limit))
+ (not (null? nar-removal-criteria)))
+ (start-nar-removal-fiber database
+ canonical-storage
+ (assq-ref opts 'storage-limit)
+ metrics-registry
+ nar-removal-criteria))))
+ (addition-channel (make-channel)))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (match (get-message addition-channel)
+ (('addition file)
+ (when mirror-channel
+ (put-message mirror-channel
+ `(fetch ,file)))
+ (when removal-channel
+ (spawn-fiber
+ (lambda ()
+ (sleep 60)
+ (put-message removal-channel
+ `(remove-from-storage ,file))
+ (sleep (* 5 60))
+ (put-message removal-channel
+ `(remove-from-storage ,file))
+ (sleep (* 15 60))
+ (put-message removal-channel
+ `(remove-from-storage ,file))
+ (sleep 3600)
+ (put-message removal-channel
+ `(remove-from-storage ,file))))))))))
+
+ (start-recent-change-listener-fiber
+ database
+ addition-channel
+ removal-channel))
+
+ (log-msg 'DEBUG "finished maintenance setup")
+ (wait finished?))
+ #:parallelism 1)))
+
+ (call-with-sigint
+ (lambda ()
+ (run-fibers
+ (lambda ()
+ (let* ((current (current-scheduler))
+ (schedulers
+ (cons current (scheduler-remote-peers current))))
+ (for-each
+ (lambda (i sched)
+ (spawn-fiber
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append "fibers " (number->string i))))
+ (const #t)))
+ sched))
+ (iota (length schedulers))
+ schedulers))
+
+ (log-msg 'INFO "starting server, listening on "
+ (assq-ref opts 'host) ":" (assq-ref opts 'port))
+
+ (run-server/patched
+ (make-request-handler database
+ canonical-storage
+ #:ttl (assq-ref opts 'narinfo-ttl)
+ #:negative-ttl (assq-ref opts 'narinfo-negative-ttl)
+ #:logger lgr
+ #:metrics-registry metrics-registry
+ #:maybe-trigger-creation-of-compressed-nars
+ maybe-trigger-creation-of-compressed-nars)
+ #:host (assq-ref opts 'host)
+ #:port (assq-ref opts 'port))
+
+ (wait finished?))
+ #:hz 0
+ #:parallelism 4))
+ finished?))))
diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in
index 6d01f74..cfa4fe7 100644
--- a/scripts/nar-herder.in
+++ b/scripts/nar-herder.in
@@ -55,24 +55,16 @@
(logging port-log)
(prometheus)
(fibers)
- (fibers channels)
- (fibers scheduler)
- (fibers conditions)
- (fibers web server)
((guix ui) #:select (read/eval string->duration))
(guix store)
- (guix progress)
(guix narinfo)
+ (guix progress)
(guix derivations)
((guix store) #:select (store-path-hash-part))
((guix build utils) #:select (dump-port))
- ((guix build syscalls) #:select (set-thread-name))
(nar-herder utils)
(nar-herder database)
- (nar-herder recent-changes)
- (nar-herder cached-compression)
(nar-herder storage)
- (nar-herder mirror)
(nar-herder server))
(install-suspendable-ports!)
@@ -468,51 +460,7 @@
(format #f "~a (~5a): ~a~%"
(strftime "%F %H:%M:%S" (localtime time))
lvl
- str))))
- (metrics-registry (make-metrics-registry
- #:namespace
- "narherder")))
-
- (define (download-database)
- (let ((database-uri
- (string->uri
- (string-append (assq-ref opts 'mirror)
- "/latest-database-dump"))))
- (with-fibers-port-timeouts
- (lambda ()
- (call-with-values
- (lambda ()
- (simple-format (current-error-port)
- "starting downloading the database\n")
- (let ((port
- socket
- (open-socket-for-uri* database-uri)))
- (http-get database-uri
- #:port port
- #:streaming? #t)))
- (lambda (response body)
- (when (not (= (response-code response) 200))
- (error "unable to fetch database from mirror"))
-
- (let* ((reporter (progress-reporter/file
- (uri->string database-uri)
- (response-content-length response)
- (current-error-port)))
- (port
- (progress-report-port
- reporter
- body
- #:download-size (response-content-length response))))
-
- (call-with-output-file (assq-ref opts 'database)
- (lambda (output-port)
- (dump-port port output-port)))
-
- (close-port port))
-
- (simple-format (current-error-port)
- "finished downloading the database\n"))))
- #:timeout 30)))
+ str)))))
(add-handler! lgr port-log)
(open-log! lgr)
@@ -543,202 +491,4 @@
(lambda (port)
(simple-format port "~A\n" (getpid))))))
- (and=>
- (assq-ref opts 'mirror)
- (lambda (mirror)
- (let ((database-file (assq-ref opts 'database)))
- (if (file-exists? database-file)
- (begin
- ;; TODO Open the database, and check if the
- ;; latest changes in the database are visible on
- ;; the source to mirror. If they're not, then
- ;; delete the database and download it to get
- ;; back in sync
-
- #f)
- (download-database)))))
-
- (let* ((database (setup-database (assq-ref opts 'database)
- metrics-registry))
- (canonical-storage (and=> (assq-ref opts 'storage)
- canonicalize-path))
-
- (enabled-cached-compressions
- (let ((explicit-cached-compression-directories
- (filter-map
- (match-lambda
- (('cached-compression-directory . details) details)
- (_ #f))
- opts))
- (cached-compression-directories-max-sizes
- (filter-map
- (match-lambda
- (('cached-compression-directory-max-size . details) details)
- (_ #f))
- opts)))
- (filter-map
- (match-lambda
- (('cached-compression . details)
- (let ((compression
- (assq-ref details 'type)))
- (cons compression
- `(,@(alist-delete 'type details)
- (directory
- . ,(or (assq-ref explicit-cached-compression-directories
- compression)
- (simple-format #f "/var/cache/nar-herder/nar/~A"
- compression)))
- (directory-max-size
- . ,(assq-ref cached-compression-directories-max-sizes
- compression))))))
- (_ #f))
- opts)))
-
- (cached-compression-min-uses
- (assq-ref opts 'cached-compression-min-uses))
-
- (maybe-trigger-creation-of-compressed-nars
- (if (null? enabled-cached-compressions)
- #f
- (make-maybe-trigger-creation-of-compressed-nars
- database
- metrics-registry
- (or (assq-ref opts 'cached-compression-nar-source)
- canonical-storage)
- enabled-cached-compressions
- cached-compression-min-uses
- #:cached-compression-workers
- (assq-ref opts 'cached-compression-workers)))))
-
- (if (string=? (assq-ref opts 'database-dump)
- "disabled")
- (log-msg 'INFO "database dump disabled")
- (when (not (file-exists? (assq-ref opts 'database-dump)))
- (log-msg 'INFO "dumping database...")
- (dump-database database (assq-ref opts 'database-dump))))
-
- (let ((finished? (make-condition)))
- (call-with-new-thread
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name "maintenance"))
- (const #t))
-
- (run-fibers
- (lambda ()
- (start-recent-change-removal-and-database-dump-fiber
- database
- (let ((filename (assq-ref opts 'database-dump)))
- (if (string=? filename "disabled")
- #f
- filename))
- (* 24 3600) ; 24 hours
- (assq-ref opts 'recent-changes-limit))
-
- (let ((mirror-channel
- (and=>
- (assq-ref opts 'mirror)
- (lambda (mirror)
- (start-fetch-changes-fiber database
- canonical-storage
- mirror
- metrics-registry)
-
- (when (assq-ref opts 'storage)
- (start-mirroring-fiber database
- mirror
- (assq-ref opts 'storage-limit)
- canonical-storage
- metrics-registry)))))
- (removal-channel
- (let ((nar-removal-criteria
- (filter-map
- (match-lambda
- ((key . val)
- (if (eq? key 'storage-nar-removal-criteria)
- val
- #f)))
- opts)))
- (when (and (assq-ref opts 'storage)
- (number? (assq-ref opts 'storage-limit))
- (not (null? nar-removal-criteria)))
- (start-nar-removal-fiber database
- canonical-storage
- (assq-ref opts 'storage-limit)
- metrics-registry
- nar-removal-criteria))))
- (addition-channel (make-channel)))
-
- (spawn-fiber
- (lambda ()
- (while #t
- (match (get-message addition-channel)
- (('addition file)
- (when mirror-channel
- (put-message mirror-channel
- `(fetch ,file)))
- (when removal-channel
- (spawn-fiber
- (lambda ()
- (sleep 60)
- (put-message removal-channel
- `(remove-from-storage ,file))
- (sleep (* 5 60))
- (put-message removal-channel
- `(remove-from-storage ,file))
- (sleep (* 15 60))
- (put-message removal-channel
- `(remove-from-storage ,file))
- (sleep 3600)
- (put-message removal-channel
- `(remove-from-storage ,file))))))))))
-
- (start-recent-change-listener-fiber
- database
- addition-channel
- removal-channel))
-
- (log-msg 'DEBUG "finished maintenance setup")
- (wait finished?))
- #:parallelism 1)))
-
- (call-with-sigint
- (lambda ()
- (run-fibers
- (lambda ()
- (let* ((current (current-scheduler))
- (schedulers
- (cons current (scheduler-remote-peers current))))
- (for-each
- (lambda (i sched)
- (spawn-fiber
- (lambda ()
- (catch 'system-error
- (lambda ()
- (set-thread-name
- (string-append "fibers " (number->string i))))
- (const #t)))
- sched))
- (iota (length schedulers))
- schedulers))
-
- (log-msg 'INFO "starting server, listening on "
- (assq-ref opts 'host) ":" (assq-ref opts 'port))
-
- (run-server/patched
- (make-request-handler database
- canonical-storage
- #:ttl (assq-ref opts 'narinfo-ttl)
- #:negative-ttl (assq-ref opts 'narinfo-negative-ttl)
- #:logger lgr
- #:metrics-registry metrics-registry
- #:maybe-trigger-creation-of-compressed-nars
- maybe-trigger-creation-of-compressed-nars)
- #:host (assq-ref opts 'host)
- #:port (assq-ref opts 'port))
-
- (wait finished?))
- #:hz 0
- #:parallelism 4))
- finished?))))))
+ (run-nar-herder-service opts lgr))))