diff options
author | Christopher Baines <mail@cbaines.net> | 2025-03-04 13:47:08 +0000 |
---|---|---|
committer | Christopher Baines <mail@cbaines.net> | 2025-03-11 08:58:03 +0000 |
commit | 64d36bef58150697e62ce46c8b952725dd585a9c (patch) | |
tree | b626efaa1bfb8e219d2752ecc6871d60c07e35c8 | |
parent | 70df5af752ba9ed9dc414d011a1358babc5e40b1 (diff) | |
download | nar-herder-64d36bef58150697e62ce46c8b952725dd585a9c.tar nar-herder-64d36bef58150697e62ce46c8b952725dd585a9c.tar.gz |
Improve mirror request handling
Move the fetching of the body inside the retry-on-error and
with-port-timeouts.
-rw-r--r-- | nar-herder/mirror.scm | 251 |
1 files changed, 130 insertions, 121 deletions
diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm index 67d4c00..e59f80e 100644 --- a/nar-herder/mirror.scm +++ b/nar-herder/mirror.scm @@ -32,6 +32,7 @@ #:use-module (json) #:use-module (fibers) #:use-module (fibers channels) + #:use-module (knots) #:use-module (knots timeout) #:use-module (knots non-blocking) #:use-module (guix narinfo) @@ -71,124 +72,126 @@ latest-recent-change)) "")))) - (call-with-values - (lambda () - (retry-on-error - (lambda () - (log-msg 'INFO "querying for recent changes since " - latest-recent-change) - (with-port-timeouts - (lambda () - (let ((port (non-blocking-open-socket-for-uri uri))) - (http-get uri - #:port port - #:streaming? #t))) - #:timeout 30)) - #:times 3 - #:delay 15)) - (lambda (response body) - (if (= (response-code response) 200) - (let* ((json-body (json->scm body)) - (recent-changes - (assoc-ref json-body "recent_changes"))) - - (log-msg 'INFO "got " (vector-length recent-changes) " changes") - - ;; Switch to symbol keys and standardise the key order - (vector-map! - (lambda (_ change-details) - `((datetime . ,(assoc-ref change-details "datetime")) - (change . ,(assoc-ref change-details "change")) - (data . ,(assoc-ref change-details "data")))) - recent-changes) - - (vector-for-each - (lambda (_ change-details) - ;; Guard against processing changes that have already - ;; been processed - (unless (member (strip-change-datetime change-details) - processed-recent-changes) - (let ((change (assq-ref change-details 'change))) - (metric-increment recent-changes-count-metric) - - (cond - ((string=? change "addition") - (let ((narinfo - (call-with-input-string - (assq-ref change-details 'data) - (lambda (port) - (read-narinfo port - "https://narherderdummyvalue"))))) - (log-msg 'INFO "processing addition change for " - (uri-path (first (narinfo-uris narinfo))) - " (" (assq-ref change-details 'datetime) ")") - (database-insert-narinfo database - narinfo - #:change-datetime - (assq-ref change-details - 'datetime)) - - (when addition-channel - (for-each - (lambda (uri) - (spawn-fiber - (lambda () - (put-message addition-channel - `(addition ,(uri-path uri)))))) - (narinfo-uris narinfo))))) - - ((string=? change "removal") - (let ((store-path (assq-ref change-details 'data))) - ;; TODO Use the nar removal fiber - (log-msg 'INFO "processing removal change for " - store-path - " (" (assq-ref change-details 'datetime) ")") - - (let* ((hash (store-path-hash-part store-path)) - (narinfo-details - (database-select-narinfo-by-hash - database - hash))) - - (when storage-root - (remove-nar-files-by-hash + (let ((json-body + (with-port-timeouts + (lambda () + (retry-on-error + (lambda () + (call-with-values + (lambda () + (log-msg 'INFO "querying for recent changes since " + latest-recent-change) + (let ((port (non-blocking-open-socket-for-uri uri))) + (http-get uri + #:port port + #:streaming? #t))) + (lambda (response body) + (if (= (response-code response) 200) + (json->scm body) + (raise-exception + (make-exception-with-message + (simple-format #f "unknown response: ~A code: ~A" + (uri->string uri) + (response-code response)))))))) + #:times 3 + #:delay 15)) + #:timeout 30))) + + (let* ((recent-changes + (assoc-ref json-body "recent_changes"))) + + (log-msg 'INFO "got " (vector-length recent-changes) " changes") + + ;; Switch to symbol keys and standardise the key order + (vector-map! + (lambda (_ change-details) + `((datetime . ,(assoc-ref change-details "datetime")) + (change . ,(assoc-ref change-details "change")) + (data . ,(assoc-ref change-details "data")))) + recent-changes) + + (vector-for-each + (lambda (_ change-details) + ;; Guard against processing changes that have already + ;; been processed + (unless (member (strip-change-datetime change-details) + processed-recent-changes) + (let ((change (assq-ref change-details 'change))) + (metric-increment recent-changes-count-metric) + + (cond + ((string=? change "addition") + (let ((narinfo + (call-with-input-string + (assq-ref change-details 'data) + (lambda (port) + (read-narinfo port + "https://narherderdummyvalue"))))) + (log-msg 'INFO "processing addition change for " + (uri-path (first (narinfo-uris narinfo))) + " (" (assq-ref change-details 'datetime) ")") + (database-insert-narinfo database + narinfo + #:change-datetime + (assq-ref change-details + 'datetime)) + + (when addition-channel + (for-each + (lambda (uri) + (spawn-fiber + (lambda () + (put-message addition-channel + `(addition ,(uri-path uri)))))) + (narinfo-uris narinfo))))) + + ((string=? change "removal") + (let ((store-path (assq-ref change-details 'data))) + ;; TODO Use the nar removal fiber + (log-msg 'INFO "processing removal change for " + store-path + " (" (assq-ref change-details 'datetime) ")") + + (let* ((hash (store-path-hash-part store-path)) + (narinfo-details + (database-select-narinfo-by-hash database - storage-root - metrics-registry - hash)) - - (let ((cached-narinfo-files - (database-select-cached-narinfo-files-by-narinfo-id - database - (assq-ref narinfo-details 'id)))) - (for-each - (lambda (cached-narinfo-file-details) - ;; TODO Delete the file as well - - (let ((reply (make-channel))) - (put-message - cached-compression-management-channel - (list 'cached-narinfo-removed - (assq-ref narinfo-details 'id) - (assq-ref cached-narinfo-files 'compression) - (assq-ref cached-narinfo-files 'size) - reply)) - (get-message reply))) - cached-narinfo-files)) - - (database-remove-narinfo database - store-path - #:change-datetime - (assq-ref change-details - 'datetime))))) - (else - (error "unimplemented")))))) - recent-changes)) - (raise-exception - (make-exception-with-message - (simple-format #f "unknown response: ~A code: ~A" - (uri->string uri) - (response-code response)))))))) + hash))) + + (when storage-root + (remove-nar-files-by-hash + database + storage-root + metrics-registry + hash)) + + (let ((cached-narinfo-files + (database-select-cached-narinfo-files-by-narinfo-id + database + (assq-ref narinfo-details 'id)))) + (for-each + (lambda (cached-narinfo-file-details) + ;; TODO Delete the file as well + + (let ((reply (make-channel))) + (put-message + cached-compression-management-channel + (list 'cached-narinfo-removed + (assq-ref narinfo-details 'id) + (assq-ref cached-narinfo-files 'compression) + (assq-ref cached-narinfo-files 'size) + reply)) + (get-message reply))) + cached-narinfo-files)) + + (database-remove-narinfo database + store-path + #:change-datetime + (assq-ref change-details + 'datetime))))) + (else + (error "unimplemented")))))) + recent-changes)))) (spawn-fiber (lambda () @@ -199,10 +202,16 @@ (while #t (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "fetching changes failed " exn)) - request-recent-changes + (lambda _ #f) + (lambda () + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "fetching changes failed") + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + request-recent-changes) + + (log-msg 'DEBUG "finished requesting recent changes, sleeping")) #:unwind? #t) - (log-msg 'DEBUG "finished requesting recent changes, sleeping") (sleep 60))))) |