;;; Nar Herder ;;; ;;; Copyright © 2021 Christopher Baines ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (nar-herder mirror) #:use-module (srfi srfi-1) #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) #:use-module (web uri) #:use-module (web client) #:use-module (web response) #:use-module (prometheus) #:use-module (logging logger) #:use-module (json) #:use-module (fibers) #:use-module (fibers channels) #:use-module (guix narinfo) #:use-module ((guix store) #:select (store-path-hash-part)) #:use-module (nar-herder utils) #:use-module (nar-herder database) #:use-module (nar-herder storage) #:export (start-fetch-changes-fiber)) (define (start-fetch-changes-fiber database metrics-registry storage-root mirror cached-compression-management-channel) (define (request-recent-changes) (define latest-recent-change (database-select-latest-recent-change-datetime database)) (define processed-recent-changes ;; Strip datetimes, as these could differ from the mirrors ;; datetimes (since a mirror will often record different change ;; datetimes, since it's delayed in making changes (map strip-change-datetime (database-select-recent-changes database latest-recent-change))) (define (strip-change-datetime change) `((change . ,(assq-ref change 'change)) (data . ,(assq-ref change 'data)))) (define uri (string->uri (string-append mirror "/recent-changes" (if latest-recent-change (string-append "?since=" (uri-encode latest-recent-change)) "")))) (call-with-values (lambda () (retry-on-error (lambda () (log-msg 'INFO "querying for recent changes since " latest-recent-change) (with-port-timeouts (lambda () (let ((port socket (open-socket-for-uri* uri))) (http-get uri #:port port #:streaming? #t))) #:timeout 30)) #:times 3 #:delay 15)) (lambda (response body) (if (= (response-code response) 200) (let* ((json-body (json->scm body)) (recent-changes (assoc-ref json-body "recent_changes"))) (log-msg 'INFO "got " (vector-length recent-changes) " changes") ;; Switch to symbol keys and standardise the key order (vector-map! (lambda (_ change-details) `((datetime . ,(assoc-ref change-details "datetime")) (change . ,(assoc-ref change-details "change")) (data . ,(assoc-ref change-details "data")))) recent-changes) (vector-for-each (lambda (_ change-details) ;; Guard against processing changes that have already ;; been processed (unless (member (strip-change-datetime change-details) processed-recent-changes) (let ((change (assq-ref change-details 'change))) (cond ((string=? change "addition") (let ((narinfo (call-with-input-string (assq-ref change-details 'data) (lambda (port) (read-narinfo port "https://narherderdummyvalue"))))) (log-msg 'INFO "processing addition change for " (uri-path (first (narinfo-uris narinfo))) " (" (assq-ref change-details 'datetime) ")") (database-insert-narinfo database narinfo #:change-datetime (assq-ref change-details 'datetime)))) ((string=? change "removal") (let ((store-path (assq-ref change-details 'data))) (log-msg 'INFO "processing removal change for " store-path " (" (assq-ref change-details 'datetime) ")") (let* ((hash (store-path-hash-part store-path)) (narinfo-details (database-select-narinfo-by-hash database hash))) (when storage-root (remove-nar-files-by-hash database storage-root metrics-registry hash)) (let ((cached-narinfo-files (database-select-cached-narinfo-files-by-narinfo-id database (assq-ref narinfo-details 'id)))) (for-each (lambda (cached-narinfo-file-details) ;; TODO Delete the file as well (let ((reply (make-channel))) (put-message cached-compression-management-channel (list 'cached-narinfo-removed (assq-ref narinfo-details 'id) (assq-ref cached-narinfo-files 'compression) (assq-ref cached-narinfo-files 'size) reply)) (get-message reply))) cached-narinfo-files)) (database-remove-narinfo database store-path #:change-datetime (assq-ref change-details 'datetime))))) (else (error "unimplemented")))))) recent-changes)) (raise-exception (make-exception-with-message (simple-format #f "unknown response: ~A code: ~A" (uri->string uri) (response-code response)))))))) (spawn-fiber (lambda () (while #t (with-exception-handler (lambda (exn) (log-msg 'ERROR "fetching changes failed " exn)) request-recent-changes #:unwind? #t) (log-msg 'DEBUG "finished requesting recent changes, sleeping") (sleep 60)))))