aboutsummaryrefslogtreecommitdiff
path: root/nar-herder/mirror.scm
diff options
context:
space:
mode:
Diffstat (limited to 'nar-herder/mirror.scm')
-rw-r--r--nar-herder/mirror.scm85
1 files changed, 52 insertions, 33 deletions
diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm
index 6b9f4f7..8aae845 100644
--- a/nar-herder/mirror.scm
+++ b/nar-herder/mirror.scm
@@ -19,6 +19,7 @@
(define-module (nar-herder mirror)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-43)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (ice-9 exceptions)
@@ -29,22 +30,18 @@
#:use-module (prometheus)
#:use-module (logging logger)
#:use-module (json)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
#:use-module (guix narinfo)
#:use-module ((guix store) #:select (store-path-hash-part))
#:use-module (nar-herder utils)
#:use-module (nar-herder database)
#:use-module (nar-herder storage)
- #:export (start-fetch-changes-thread))
-
-(define (start-fetch-changes-thread database storage-root
- mirror metrics-registry)
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
+ #:export (start-fetch-changes-fiber))
+(define (start-fetch-changes-fiber database metrics-registry
+ storage-root mirror
+ cached-compression-management-channel)
(define (request-recent-changes)
(define latest-recent-change
(database-select-latest-recent-change-datetime database))
@@ -72,18 +69,25 @@
(lambda ()
(retry-on-error
(lambda ()
- (http-get uri #:decode-body? #f))
+ (log-msg 'INFO "querying for recent changes since "
+ latest-recent-change)
+ (with-port-timeouts
+ (lambda ()
+ (let ((port
+ socket
+ (open-socket-for-uri* uri)))
+ (http-get uri
+ #:port port
+ #:streaming? #t)))
+ #:timeout 30))
#:times 3
#:delay 15))
(lambda (response body)
(if (= (response-code response) 200)
- (let* ((json-body (json-string->scm
- (utf8->string body)))
+ (let* ((json-body (json->scm body))
(recent-changes
(assoc-ref json-body "recent_changes")))
- (log-msg 'INFO "queried for recent changes since "
- latest-recent-change)
(log-msg 'INFO "got " (vector-length recent-changes) " changes")
;; Switch to symbol keys and standardise the key order
@@ -116,48 +120,62 @@
narinfo
#:change-datetime
(assq-ref change-details
- 'datetime))
-
- (let ((new-files-count (length (narinfo-uris narinfo))))
- (metric-increment nar-files-metric
- #:by new-files-count
- ;; TODO This should be
- ;; checked, rather than
- ;; assumed to be false
- #:label-values '((stored . "false"))))))
+ 'datetime))))
+
((string=? change "removal")
(let ((store-path (assq-ref change-details 'data)))
(log-msg 'INFO "processing removal change for "
store-path
" (" (assq-ref change-details 'datetime) ")")
+ (let* ((hash (store-path-hash-part store-path))
+ (narinfo-details
+ (database-select-narinfo-by-hash
+ database
+ hash)))
+
(when storage-root
(remove-nar-files-by-hash
database
storage-root
metrics-registry
- (store-path-hash-part store-path)))
+ hash))
+
+ (let ((cached-narinfo-files
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ (assq-ref narinfo-details 'id))))
+ (for-each
+ (lambda (cached-narinfo-file-details)
+ ;; TODO Delete the file as well
+
+ (let ((reply (make-channel)))
+ (put-message
+ cached-compression-management-channel
+ (list 'cached-narinfo-removed
+ (assq-ref narinfo-details 'id)
+ (assq-ref cached-narinfo-files 'compression)
+ (assq-ref cached-narinfo-files 'size)
+ reply))
+ (get-message reply)))
+ cached-narinfo-files))
(database-remove-narinfo database
store-path
#:change-datetime
(assq-ref change-details
- 'datetime))))
+ 'datetime)))))
(else
(error "unimplemented"))))))
recent-changes))
(raise-exception
(make-exception-with-message
- (simple-format #f "unknown response: ~A\n code: ~A response: ~A"
+ (simple-format #f "unknown response: ~A code: ~A"
(uri->string uri)
- (response-code response)
- (utf8->string body))))))))
+ (response-code response))))))))
- (call-with-new-thread
+ (spawn-fiber
(lambda ()
- ;; This will initialise the nar_files_total metric
- (get-nar-files database storage-root metrics-registry)
-
(while #t
(with-exception-handler
(lambda (exn)
@@ -165,4 +183,5 @@
request-recent-changes
#:unwind? #t)
+ (log-msg 'DEBUG "finished requesting recent changes, sleeping")
(sleep 60)))))