;;; Nar Herder ;;; ;;; Copyright © 2021 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (nar-herder recent-changes) #:use-module (srfi srfi-1) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (fibers) #:use-module (fibers channels) #:use-module (logging logger) #:use-module (prometheus) #:use-module (web uri) #:use-module (guix narinfo) #:use-module (nar-herder database) #:export (start-recent-change-removal-and-database-dump-fiber start-recent-change-listener-fiber)) (define (start-recent-change-removal-and-database-dump-fiber database metrics-registry database-dump-filename check-interval recent-changes-limit) (define (update-database-dump) (log-msg 'DEBUG "updating the database dump at " database-dump-filename) (let ((temp-database-dump-filename (string-append database-dump-filename ".tmp"))) (when (file-exists? temp-database-dump-filename) (delete-file temp-database-dump-filename)) (dump-database database temp-database-dump-filename) (rename-file temp-database-dump-filename database-dump-filename) (simple-format (current-error-port) "updated database dump\n"))) (define recent-changes-count-metric (metrics-registry-fetch-metric metrics-registry "recent_changes_count")) (spawn-fiber (lambda () (while #t (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception in recent change removal thread: ~A\n" exn) (sleep 120)) (lambda () (let ((recent-changes-id-for-deletion (database-get-recent-changes-id-for-deletion database recent-changes-limit))) (when recent-changes-id-for-deletion (when database-dump-filename (update-database-dump)) (let ((deleted-recent-changes (database-call-with-transaction database (lambda _ (database-delete-recent-changes-with-id-below database recent-changes-id-for-deletion))))) (metric-decrement recent-changes-count-metric #:by deleted-recent-changes) (simple-format (current-error-port) "deleted ~A recent changes\n" deleted-recent-changes))) (sleep check-interval))) #:unwind? #t))))) (define (start-recent-change-listener-fiber database metrics-registry addition-channel removal-channel) (define recent-changes-count-metric (metrics-registry-fetch-metric metrics-registry "recent_changes_count")) (define (process-addition-change change-details) (let ((narinfo (call-with-input-string (assq-ref change-details 'data) (lambda (port) (read-narinfo port "https://narherderdummyvalue"))))) (for-each (lambda (uri) (log-msg 'DEBUG "processing recent addition of " (uri-path uri)) (put-message addition-channel (list 'addition (uri-path uri)))) (narinfo-uris narinfo)))) (define (process-removal-change change-details) (log-msg 'DEBUG "processing recent change triggered removal of " (assq-ref change-details 'data)) (put-message removal-channel (list 'remove (assq-ref change-details 'data)))) (spawn-fiber (lambda () (let ((recent-changes-count (database-count-recent-changes database))) (metric-set recent-changes-count-metric recent-changes-count) (log-msg 'DEBUG recent-changes-count " recent changes in the database")) (log-msg 'DEBUG "starting to listen for recent changes") (let ((after-initial (database-select-latest-recent-change-datetime database))) (let loop ((after after-initial) (last-processed-recent-changes (database-select-recent-changes database after-initial))) (sleep 10) (match (with-exception-handler (lambda (exn) (log-msg 'ERROR "exception in recent change listener " exn) #f) (lambda () (let* ((recent-changes (database-select-recent-changes database after)) (unprocessed-recent-changes (remove (lambda (change-details) (member change-details last-processed-recent-changes)) recent-changes))) (unless (null? unprocessed-recent-changes) (log-msg 'INFO "processing " (length unprocessed-recent-changes) " recent changes") (metric-increment recent-changes-count-metric #:by (length unprocessed-recent-changes)) (for-each (lambda (change-details) (let ((change (assq-ref change-details 'change))) (cond ((string=? change "addition") (process-addition-change change-details)) ((string=? change "removal") (process-removal-change change-details)) (else #f)))) unprocessed-recent-changes)) ;; Use the unprocessed recent changes here to carry ;; forward all processed changes to the next pass unprocessed-recent-changes)) #:unwind? #t) (#f (loop after '())) (recent-changes (if (null? recent-changes) (loop after last-processed-recent-changes) (loop (assq-ref (last recent-changes) 'datetime) recent-changes)))))))))