aboutsummaryrefslogtreecommitdiff
path: root/nar-herder/recent-changes.scm
diff options
context:
space:
mode:
Diffstat (limited to 'nar-herder/recent-changes.scm')
-rw-r--r--nar-herder/recent-changes.scm156
1 files changed, 135 insertions, 21 deletions
diff --git a/nar-herder/recent-changes.scm b/nar-herder/recent-changes.scm
index fee63f3..ccfff93 100644
--- a/nar-herder/recent-changes.scm
+++ b/nar-herder/recent-changes.scm
@@ -18,15 +18,25 @@
(define-module (nar-herder recent-changes)
#:use-module (srfi srfi-1)
+ #:use-module (ice-9 match)
#:use-module (ice-9 threads)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (logging logger)
+ #:use-module (prometheus)
+ #:use-module (web uri)
+ #:use-module (guix narinfo)
#:use-module (nar-herder database)
- #:export (start-recent-change-removal-and-database-dump-thread))
+ #:export (start-recent-change-removal-and-database-dump-fiber
+ start-recent-change-listener-fiber))
-(define (start-recent-change-removal-and-database-dump-thread database
- database-dump-filename
- check-interval
- recent-changes-limit)
+(define (start-recent-change-removal-and-database-dump-fiber database
+ metrics-registry
+ database-dump-filename
+ check-interval
+ recent-changes-limit)
(define (update-database-dump)
+ (log-msg 'DEBUG "updating the database dump at " database-dump-filename)
(let ((temp-database-dump-filename
(string-append database-dump-filename ".tmp")))
@@ -41,23 +51,127 @@
(simple-format (current-error-port)
"updated database dump\n")))
- (call-with-new-thread
+ (define recent-changes-count-metric
+ (metrics-registry-fetch-metric metrics-registry "recent_changes_count"))
+
+ (spawn-fiber
(lambda ()
(while #t
- (let ((recent-changes-id-for-deletion
- (database-get-recent-changes-id-for-deletion database
- recent-changes-limit)))
- (when recent-changes-id-for-deletion
- (update-database-dump)
-
- (let ((deleted-recent-changes
- (database-delete-recent-changes-with-id-below
- database
- recent-changes-id-for-deletion)))
- (simple-format (current-error-port)
- "deleted ~A recent changes\n"
- deleted-recent-changes)))
-
- (sleep check-interval))))))
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception in recent change removal thread: ~A\n"
+ exn)
+ (sleep 120))
+ (lambda ()
+ (let ((recent-changes-id-for-deletion
+ (database-get-recent-changes-id-for-deletion database
+ recent-changes-limit)))
+ (when recent-changes-id-for-deletion
+ (when database-dump-filename
+ (update-database-dump))
+
+ (let ((deleted-recent-changes
+ (database-call-with-transaction
+ database
+ (lambda _
+ (database-delete-recent-changes-with-id-below
+ database
+ recent-changes-id-for-deletion)))))
+
+ (metric-decrement recent-changes-count-metric
+ #:by deleted-recent-changes)
+
+ (simple-format (current-error-port)
+ "deleted ~A recent changes\n"
+ deleted-recent-changes)))
+
+ (sleep check-interval)))
+ #:unwind? #t)))))
+
+(define (start-recent-change-listener-fiber database
+ metrics-registry
+ addition-channel
+ removal-channel)
+ (define recent-changes-count-metric
+ (metrics-registry-fetch-metric metrics-registry "recent_changes_count"))
+
+ (define (process-addition-change change-details)
+ (let ((narinfo
+ (call-with-input-string
+ (assq-ref change-details 'data)
+ (lambda (port)
+ (read-narinfo port
+ "https://narherderdummyvalue")))))
+ (for-each
+ (lambda (uri)
+ (log-msg 'DEBUG "processing recent addition of " (uri-path uri))
+ (put-message addition-channel (list 'addition (uri-path uri))))
+ (narinfo-uris narinfo))))
+
+ (define (process-removal-change change-details)
+ (log-msg 'DEBUG "processing recent change triggered removal of "
+ (assq-ref change-details 'data))
+ (put-message removal-channel
+ (list 'remove (assq-ref change-details 'data))))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((recent-changes-count
+ (database-count-recent-changes database)))
+ (metric-set recent-changes-count-metric recent-changes-count)
+ (log-msg 'DEBUG recent-changes-count " recent changes in the database"))
+
+ (log-msg 'DEBUG "starting to listen for recent changes")
+ (let ((after-initial
+ (database-select-latest-recent-change-datetime database)))
+ (let loop ((after after-initial)
+ (last-processed-recent-changes
+ (database-select-recent-changes database after-initial)))
+ (sleep 10)
+
+ (match
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception in recent change listener " exn)
+ #f)
+ (lambda ()
+ (let* ((recent-changes
+ (database-select-recent-changes database after))
+ (unprocessed-recent-changes
+ (remove
+ (lambda (change-details)
+ (member change-details last-processed-recent-changes))
+ recent-changes)))
+
+ (unless (null? unprocessed-recent-changes)
+ (log-msg 'INFO "processing " (length unprocessed-recent-changes)
+ " recent changes")
+
+ (metric-increment recent-changes-count-metric
+ #:by (length unprocessed-recent-changes))
+
+ (for-each
+ (lambda (change-details)
+ (let ((change (assq-ref change-details 'change)))
+ (cond
+ ((string=? change "addition")
+ (process-addition-change change-details))
+ ((string=? change "removal")
+ (process-removal-change change-details))
+ (else #f))))
+ unprocessed-recent-changes))
+ ;; Use the unprocessed recent changes here to carry
+ ;; forward all processed changes to the next pass
+ unprocessed-recent-changes))
+ #:unwind? #t)
+ (#f (loop after '()))
+ (recent-changes
+ (if (null? recent-changes)
+ (loop after last-processed-recent-changes)
+ (loop (assq-ref (last recent-changes)
+ 'datetime)
+ recent-changes)))))))))