diff options
Diffstat (limited to 'nar-herder/mirror.scm')
-rw-r--r-- | nar-herder/mirror.scm | 85 |
1 files changed, 52 insertions, 33 deletions
diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm index 6b9f4f7..8aae845 100644 --- a/nar-herder/mirror.scm +++ b/nar-herder/mirror.scm @@ -19,6 +19,7 @@ (define-module (nar-herder mirror) #:use-module (srfi srfi-1) #:use-module (srfi srfi-43) + #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) @@ -29,22 +30,18 @@ #:use-module (prometheus) #:use-module (logging logger) #:use-module (json) + #:use-module (fibers) + #:use-module (fibers channels) #:use-module (guix narinfo) #:use-module ((guix store) #:select (store-path-hash-part)) #:use-module (nar-herder utils) #:use-module (nar-herder database) #:use-module (nar-herder storage) - #:export (start-fetch-changes-thread)) - -(define (start-fetch-changes-thread database storage-root - mirror metrics-registry) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) + #:export (start-fetch-changes-fiber)) +(define (start-fetch-changes-fiber database metrics-registry + storage-root mirror + cached-compression-management-channel) (define (request-recent-changes) (define latest-recent-change (database-select-latest-recent-change-datetime database)) @@ -72,18 +69,25 @@ (lambda () (retry-on-error (lambda () - (http-get uri #:decode-body? #f)) + (log-msg 'INFO "querying for recent changes since " + latest-recent-change) + (with-port-timeouts + (lambda () + (let ((port + socket + (open-socket-for-uri* uri))) + (http-get uri + #:port port + #:streaming? #t))) + #:timeout 30)) #:times 3 #:delay 15)) (lambda (response body) (if (= (response-code response) 200) - (let* ((json-body (json-string->scm - (utf8->string body))) + (let* ((json-body (json->scm body)) (recent-changes (assoc-ref json-body "recent_changes"))) - (log-msg 'INFO "queried for recent changes since " - latest-recent-change) (log-msg 'INFO "got " (vector-length recent-changes) " changes") ;; Switch to symbol keys and standardise the key order @@ -116,48 +120,62 @@ narinfo #:change-datetime (assq-ref change-details - 'datetime)) - - (let ((new-files-count (length (narinfo-uris narinfo)))) - (metric-increment nar-files-metric - #:by new-files-count - ;; TODO This should be - ;; checked, rather than - ;; assumed to be false - #:label-values '((stored . "false")))))) + 'datetime)))) + ((string=? change "removal") (let ((store-path (assq-ref change-details 'data))) (log-msg 'INFO "processing removal change for " store-path " (" (assq-ref change-details 'datetime) ")") + (let* ((hash (store-path-hash-part store-path)) + (narinfo-details + (database-select-narinfo-by-hash + database + hash))) + (when storage-root (remove-nar-files-by-hash database storage-root metrics-registry - (store-path-hash-part store-path))) + hash)) + + (let ((cached-narinfo-files + (database-select-cached-narinfo-files-by-narinfo-id + database + (assq-ref narinfo-details 'id)))) + (for-each + (lambda (cached-narinfo-file-details) + ;; TODO Delete the file as well + + (let ((reply (make-channel))) + (put-message + cached-compression-management-channel + (list 'cached-narinfo-removed + (assq-ref narinfo-details 'id) + (assq-ref cached-narinfo-files 'compression) + (assq-ref cached-narinfo-files 'size) + reply)) + (get-message reply))) + cached-narinfo-files)) (database-remove-narinfo database store-path #:change-datetime (assq-ref change-details - 'datetime)))) + 'datetime))))) (else (error "unimplemented")))))) recent-changes)) (raise-exception (make-exception-with-message - (simple-format #f "unknown response: ~A\n code: ~A response: ~A" + (simple-format #f "unknown response: ~A code: ~A" (uri->string uri) - (response-code response) - (utf8->string body)))))))) + (response-code response)))))))) - (call-with-new-thread + (spawn-fiber (lambda () - ;; This will initialise the nar_files_total metric - (get-nar-files database storage-root metrics-registry) - (while #t (with-exception-handler (lambda (exn) @@ -165,4 +183,5 @@ request-recent-changes #:unwind? #t) + (log-msg 'DEBUG "finished requesting recent changes, sleeping") (sleep 60))))) |