aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2025-03-04 13:47:08 +0000
committerChristopher Baines <mail@cbaines.net>2025-03-11 08:58:03 +0000
commit64d36bef58150697e62ce46c8b952725dd585a9c (patch)
treeb626efaa1bfb8e219d2752ecc6871d60c07e35c8
parent70df5af752ba9ed9dc414d011a1358babc5e40b1 (diff)
downloadnar-herder-64d36bef58150697e62ce46c8b952725dd585a9c.tar
nar-herder-64d36bef58150697e62ce46c8b952725dd585a9c.tar.gz
Improve mirror request handling
Move the fetching of the body inside the retry-on-error and with-port-timeouts.
-rw-r--r--nar-herder/mirror.scm251
1 files changed, 130 insertions, 121 deletions
diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm
index 67d4c00..e59f80e 100644
--- a/nar-herder/mirror.scm
+++ b/nar-herder/mirror.scm
@@ -32,6 +32,7 @@
#:use-module (json)
#:use-module (fibers)
#:use-module (fibers channels)
+ #:use-module (knots)
#:use-module (knots timeout)
#:use-module (knots non-blocking)
#:use-module (guix narinfo)
@@ -71,124 +72,126 @@
latest-recent-change))
""))))
- (call-with-values
- (lambda ()
- (retry-on-error
- (lambda ()
- (log-msg 'INFO "querying for recent changes since "
- latest-recent-change)
- (with-port-timeouts
- (lambda ()
- (let ((port (non-blocking-open-socket-for-uri uri)))
- (http-get uri
- #:port port
- #:streaming? #t)))
- #:timeout 30))
- #:times 3
- #:delay 15))
- (lambda (response body)
- (if (= (response-code response) 200)
- (let* ((json-body (json->scm body))
- (recent-changes
- (assoc-ref json-body "recent_changes")))
-
- (log-msg 'INFO "got " (vector-length recent-changes) " changes")
-
- ;; Switch to symbol keys and standardise the key order
- (vector-map!
- (lambda (_ change-details)
- `((datetime . ,(assoc-ref change-details "datetime"))
- (change . ,(assoc-ref change-details "change"))
- (data . ,(assoc-ref change-details "data"))))
- recent-changes)
-
- (vector-for-each
- (lambda (_ change-details)
- ;; Guard against processing changes that have already
- ;; been processed
- (unless (member (strip-change-datetime change-details)
- processed-recent-changes)
- (let ((change (assq-ref change-details 'change)))
- (metric-increment recent-changes-count-metric)
-
- (cond
- ((string=? change "addition")
- (let ((narinfo
- (call-with-input-string
- (assq-ref change-details 'data)
- (lambda (port)
- (read-narinfo port
- "https://narherderdummyvalue")))))
- (log-msg 'INFO "processing addition change for "
- (uri-path (first (narinfo-uris narinfo)))
- " (" (assq-ref change-details 'datetime) ")")
- (database-insert-narinfo database
- narinfo
- #:change-datetime
- (assq-ref change-details
- 'datetime))
-
- (when addition-channel
- (for-each
- (lambda (uri)
- (spawn-fiber
- (lambda ()
- (put-message addition-channel
- `(addition ,(uri-path uri))))))
- (narinfo-uris narinfo)))))
-
- ((string=? change "removal")
- (let ((store-path (assq-ref change-details 'data)))
- ;; TODO Use the nar removal fiber
- (log-msg 'INFO "processing removal change for "
- store-path
- " (" (assq-ref change-details 'datetime) ")")
-
- (let* ((hash (store-path-hash-part store-path))
- (narinfo-details
- (database-select-narinfo-by-hash
- database
- hash)))
-
- (when storage-root
- (remove-nar-files-by-hash
+ (let ((json-body
+ (with-port-timeouts
+ (lambda ()
+ (retry-on-error
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (log-msg 'INFO "querying for recent changes since "
+ latest-recent-change)
+ (let ((port (non-blocking-open-socket-for-uri uri)))
+ (http-get uri
+ #:port port
+ #:streaming? #t)))
+ (lambda (response body)
+ (if (= (response-code response) 200)
+ (json->scm body)
+ (raise-exception
+ (make-exception-with-message
+ (simple-format #f "unknown response: ~A code: ~A"
+ (uri->string uri)
+ (response-code response))))))))
+ #:times 3
+ #:delay 15))
+ #:timeout 30)))
+
+ (let* ((recent-changes
+ (assoc-ref json-body "recent_changes")))
+
+ (log-msg 'INFO "got " (vector-length recent-changes) " changes")
+
+ ;; Switch to symbol keys and standardise the key order
+ (vector-map!
+ (lambda (_ change-details)
+ `((datetime . ,(assoc-ref change-details "datetime"))
+ (change . ,(assoc-ref change-details "change"))
+ (data . ,(assoc-ref change-details "data"))))
+ recent-changes)
+
+ (vector-for-each
+ (lambda (_ change-details)
+ ;; Guard against processing changes that have already
+ ;; been processed
+ (unless (member (strip-change-datetime change-details)
+ processed-recent-changes)
+ (let ((change (assq-ref change-details 'change)))
+ (metric-increment recent-changes-count-metric)
+
+ (cond
+ ((string=? change "addition")
+ (let ((narinfo
+ (call-with-input-string
+ (assq-ref change-details 'data)
+ (lambda (port)
+ (read-narinfo port
+ "https://narherderdummyvalue")))))
+ (log-msg 'INFO "processing addition change for "
+ (uri-path (first (narinfo-uris narinfo)))
+ " (" (assq-ref change-details 'datetime) ")")
+ (database-insert-narinfo database
+ narinfo
+ #:change-datetime
+ (assq-ref change-details
+ 'datetime))
+
+ (when addition-channel
+ (for-each
+ (lambda (uri)
+ (spawn-fiber
+ (lambda ()
+ (put-message addition-channel
+ `(addition ,(uri-path uri))))))
+ (narinfo-uris narinfo)))))
+
+ ((string=? change "removal")
+ (let ((store-path (assq-ref change-details 'data)))
+ ;; TODO Use the nar removal fiber
+ (log-msg 'INFO "processing removal change for "
+ store-path
+ " (" (assq-ref change-details 'datetime) ")")
+
+ (let* ((hash (store-path-hash-part store-path))
+ (narinfo-details
+ (database-select-narinfo-by-hash
database
- storage-root
- metrics-registry
- hash))
-
- (let ((cached-narinfo-files
- (database-select-cached-narinfo-files-by-narinfo-id
- database
- (assq-ref narinfo-details 'id))))
- (for-each
- (lambda (cached-narinfo-file-details)
- ;; TODO Delete the file as well
-
- (let ((reply (make-channel)))
- (put-message
- cached-compression-management-channel
- (list 'cached-narinfo-removed
- (assq-ref narinfo-details 'id)
- (assq-ref cached-narinfo-files 'compression)
- (assq-ref cached-narinfo-files 'size)
- reply))
- (get-message reply)))
- cached-narinfo-files))
-
- (database-remove-narinfo database
- store-path
- #:change-datetime
- (assq-ref change-details
- 'datetime)))))
- (else
- (error "unimplemented"))))))
- recent-changes))
- (raise-exception
- (make-exception-with-message
- (simple-format #f "unknown response: ~A code: ~A"
- (uri->string uri)
- (response-code response))))))))
+ hash)))
+
+ (when storage-root
+ (remove-nar-files-by-hash
+ database
+ storage-root
+ metrics-registry
+ hash))
+
+ (let ((cached-narinfo-files
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ (assq-ref narinfo-details 'id))))
+ (for-each
+ (lambda (cached-narinfo-file-details)
+ ;; TODO Delete the file as well
+
+ (let ((reply (make-channel)))
+ (put-message
+ cached-compression-management-channel
+ (list 'cached-narinfo-removed
+ (assq-ref narinfo-details 'id)
+ (assq-ref cached-narinfo-files 'compression)
+ (assq-ref cached-narinfo-files 'size)
+ reply))
+ (get-message reply)))
+ cached-narinfo-files))
+
+ (database-remove-narinfo database
+ store-path
+ #:change-datetime
+ (assq-ref change-details
+ 'datetime)))))
+ (else
+ (error "unimplemented"))))))
+ recent-changes))))
(spawn-fiber
(lambda ()
@@ -199,10 +202,16 @@
(while #t
(with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "fetching changes failed " exn))
- request-recent-changes
+ (lambda _ #f)
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "fetching changes failed")
+ (print-backtrace-and-exception/knots exn)
+ (raise-exception exn))
+ request-recent-changes)
+
+ (log-msg 'DEBUG "finished requesting recent changes, sleeping"))
#:unwind? #t)
- (log-msg 'DEBUG "finished requesting recent changes, sleeping")
(sleep 60)))))