diff options
Diffstat (limited to 'nar-herder/recent-changes.scm')
-rw-r--r-- | nar-herder/recent-changes.scm | 156 |
1 files changed, 135 insertions, 21 deletions
diff --git a/nar-herder/recent-changes.scm b/nar-herder/recent-changes.scm index fee63f3..ccfff93 100644 --- a/nar-herder/recent-changes.scm +++ b/nar-herder/recent-changes.scm @@ -18,15 +18,25 @@ (define-module (nar-herder recent-changes) #:use-module (srfi srfi-1) + #:use-module (ice-9 match) #:use-module (ice-9 threads) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (logging logger) + #:use-module (prometheus) + #:use-module (web uri) + #:use-module (guix narinfo) #:use-module (nar-herder database) - #:export (start-recent-change-removal-and-database-dump-thread)) + #:export (start-recent-change-removal-and-database-dump-fiber + start-recent-change-listener-fiber)) -(define (start-recent-change-removal-and-database-dump-thread database - database-dump-filename - check-interval - recent-changes-limit) +(define (start-recent-change-removal-and-database-dump-fiber database + metrics-registry + database-dump-filename + check-interval + recent-changes-limit) (define (update-database-dump) + (log-msg 'DEBUG "updating the database dump at " database-dump-filename) (let ((temp-database-dump-filename (string-append database-dump-filename ".tmp"))) @@ -41,23 +51,127 @@ (simple-format (current-error-port) "updated database dump\n"))) - (call-with-new-thread + (define recent-changes-count-metric + (metrics-registry-fetch-metric metrics-registry "recent_changes_count")) + + (spawn-fiber (lambda () (while #t - (let ((recent-changes-id-for-deletion - (database-get-recent-changes-id-for-deletion database - recent-changes-limit))) - (when recent-changes-id-for-deletion - (update-database-dump) - - (let ((deleted-recent-changes - (database-delete-recent-changes-with-id-below - database - recent-changes-id-for-deletion))) - (simple-format (current-error-port) - "deleted ~A recent changes\n" - deleted-recent-changes))) - - (sleep check-interval)))))) + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception in recent change removal thread: ~A\n" + exn) + (sleep 120)) + (lambda () + (let ((recent-changes-id-for-deletion + (database-get-recent-changes-id-for-deletion database + recent-changes-limit))) + (when recent-changes-id-for-deletion + (when database-dump-filename + (update-database-dump)) + + (let ((deleted-recent-changes + (database-call-with-transaction + database + (lambda _ + (database-delete-recent-changes-with-id-below + database + recent-changes-id-for-deletion))))) + + (metric-decrement recent-changes-count-metric + #:by deleted-recent-changes) + + (simple-format (current-error-port) + "deleted ~A recent changes\n" + deleted-recent-changes))) + + (sleep check-interval))) + #:unwind? #t))))) + +(define (start-recent-change-listener-fiber database + metrics-registry + addition-channel + removal-channel) + (define recent-changes-count-metric + (metrics-registry-fetch-metric metrics-registry "recent_changes_count")) + + (define (process-addition-change change-details) + (let ((narinfo + (call-with-input-string + (assq-ref change-details 'data) + (lambda (port) + (read-narinfo port + "https://narherderdummyvalue"))))) + (for-each + (lambda (uri) + (log-msg 'DEBUG "processing recent addition of " (uri-path uri)) + (put-message addition-channel (list 'addition (uri-path uri)))) + (narinfo-uris narinfo)))) + + (define (process-removal-change change-details) + (log-msg 'DEBUG "processing recent change triggered removal of " + (assq-ref change-details 'data)) + (put-message removal-channel + (list 'remove (assq-ref change-details 'data)))) + + (spawn-fiber + (lambda () + (let ((recent-changes-count + (database-count-recent-changes database))) + (metric-set recent-changes-count-metric recent-changes-count) + (log-msg 'DEBUG recent-changes-count " recent changes in the database")) + + (log-msg 'DEBUG "starting to listen for recent changes") + (let ((after-initial + (database-select-latest-recent-change-datetime database))) + (let loop ((after after-initial) + (last-processed-recent-changes + (database-select-recent-changes database after-initial))) + (sleep 10) + + (match + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception in recent change listener " exn) + #f) + (lambda () + (let* ((recent-changes + (database-select-recent-changes database after)) + (unprocessed-recent-changes + (remove + (lambda (change-details) + (member change-details last-processed-recent-changes)) + recent-changes))) + + (unless (null? unprocessed-recent-changes) + (log-msg 'INFO "processing " (length unprocessed-recent-changes) + " recent changes") + + (metric-increment recent-changes-count-metric + #:by (length unprocessed-recent-changes)) + + (for-each + (lambda (change-details) + (let ((change (assq-ref change-details 'change))) + (cond + ((string=? change "addition") + (process-addition-change change-details)) + ((string=? change "removal") + (process-removal-change change-details)) + (else #f)))) + unprocessed-recent-changes)) + ;; Use the unprocessed recent changes here to carry + ;; forward all processed changes to the next pass + unprocessed-recent-changes)) + #:unwind? #t) + (#f (loop after '())) + (recent-changes + (if (null? recent-changes) + (loop after last-processed-recent-changes) + (loop (assq-ref (last recent-changes) + 'datetime) + recent-changes))))))))) |