diff options
-rw-r--r-- | nar-herder/recent-changes.scm | 59 | ||||
-rw-r--r-- | nar-herder/utils.scm | 15 | ||||
-rw-r--r-- | scripts/nar-herder.in | 78 |
3 files changed, 90 insertions, 62 deletions
diff --git a/nar-herder/recent-changes.scm b/nar-herder/recent-changes.scm index ccfff93..62bd604 100644 --- a/nar-herder/recent-changes.scm +++ b/nar-herder/recent-changes.scm @@ -137,35 +137,38 @@ (log-msg 'ERROR "exception in recent change listener " exn) #f) (lambda () - (let* ((recent-changes - (database-select-recent-changes database after)) - (unprocessed-recent-changes - (remove + (with-throw-handler #t + (lambda () + (let* ((recent-changes + (database-select-recent-changes database after)) + (unprocessed-recent-changes + (remove + (lambda (change-details) + (member change-details last-processed-recent-changes)) + recent-changes))) + + (unless (null? unprocessed-recent-changes) + (log-msg 'INFO "processing " (length unprocessed-recent-changes) + " recent changes") + + (for-each (lambda (change-details) - (member change-details last-processed-recent-changes)) - recent-changes))) - - (unless (null? unprocessed-recent-changes) - (log-msg 'INFO "processing " (length unprocessed-recent-changes) - " recent changes") - - (metric-increment recent-changes-count-metric - #:by (length unprocessed-recent-changes)) - - (for-each - (lambda (change-details) - (let ((change (assq-ref change-details 'change))) - (cond - ((string=? change "addition") - (process-addition-change change-details)) - ((string=? change "removal") - (process-removal-change change-details)) - (else #f)))) - unprocessed-recent-changes)) - - ;; Use the unprocessed recent changes here to carry - ;; forward all processed changes to the next pass - unprocessed-recent-changes)) + (let ((change (assq-ref change-details 'change))) + (cond + ((string=? change "addition") + (process-addition-change change-details)) + ((string=? change "removal") + (process-removal-change change-details)) + (else #f)))) + unprocessed-recent-changes) + + (metric-increment recent-changes-count-metric + #:by (length unprocessed-recent-changes))) + ;; Use the unprocessed recent changes here to carry + ;; forward all processed changes to the next pass + unprocessed-recent-changes)) + (lambda _ + (backtrace)))) #:unwind? #t) (#f (loop after '())) (recent-changes diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index 4755d33..4261b05 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -701,7 +701,7 @@ If already in the worker thread, call PROC immediately." (define &port-timeout (make-exception-type '&port-timeout &external-error - '(port))) + '(thunk port))) (define make-port-timeout-error (record-constructor &port-timeout)) @@ -735,7 +735,7 @@ If already in the worker thread, call PROC immediately." #:key timeout (read-timeout timeout) (write-timeout timeout)) - (define (no-fibers-wait port mode timeout) + (define (no-fibers-wait thunk port mode timeout) (define poll-timeout-ms 200) ;; When the GC runs, it restarts the poll syscall, but the timeout @@ -746,8 +746,7 @@ If already in the worker thread, call PROC immediately." ;; timed out overall. (let ((timeout-internal (+ (get-internal-real-time) - (* internal-time-units-per-second - (/ timeout 1000))))) + (* timeout internal-time-units-per-second)))) (let loop ((poll-value (port-poll port mode poll-timeout-ms))) (if (= poll-value 0) @@ -755,8 +754,8 @@ If already in the worker thread, call PROC immediately." timeout-internal) (raise-exception (if (string=? mode "r") - (make-port-read-timeout-error port) - (make-port-write-timeout-error port))) + (make-port-read-timeout-error thunk port) + (make-port-write-timeout-error thunk port))) (loop (port-poll port mode poll-timeout-ms))) poll-value)))) @@ -772,7 +771,7 @@ If already in the worker thread, call PROC immediately." (lambda () (raise-exception (make-port-read-timeout-error thunk port)))))) - (no-fibers-wait port "r" read-timeout)))) + (no-fibers-wait thunk port "r" read-timeout)))) (current-write-waiter (lambda (port) (if (current-scheduler) @@ -784,5 +783,5 @@ If already in the worker thread, call PROC immediately." (lambda () (raise-exception (make-port-write-timeout-error thunk port)))))) - (no-fibers-wait port "w" write-timeout))))) + (no-fibers-wait thunk port "w" write-timeout))))) (thunk))) diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in index 515b98a..7f5db5c 100644 --- a/scripts/nar-herder.in +++ b/scripts/nar-herder.in @@ -391,37 +391,61 @@ (assq-ref opts 'arguments))) (len (length narinfos)) (progress - (progress-reporter/bar len - (format #f "importing ~a narinfos" - len) - (current-error-port)))) + (if (= 1 len) + progress-reporter/silent + (progress-reporter/bar len + (format #f "importing ~a narinfos" + len) + (current-error-port))))) (call-with-progress-reporter progress (lambda (report) (database-call-with-transaction database (lambda (db) - (let ((read-narinfos - (map - (lambda (narinfo-file) - (let ((narinfo - (call-with-input-file narinfo-file - (lambda (port) - ;; Set url to a dummy value as this doesn't - ;; matter - (read-narinfo port - "https://narherderdummyvalue"))))) - - (database-insert-narinfo - database - narinfo - #:tags (or (assq-ref opts 'tags) - '())) - - (report) - - narinfo)) - narinfos))) + (let* ((canonical-storage + (and=> (assq-ref opts 'storage) + canonicalize-path)) + (read-narinfos + (map + (lambda (narinfo-file) + (let ((narinfo + (call-with-input-file narinfo-file + (lambda (port) + ;; Set url to a dummy value as this doesn't + ;; matter + (read-narinfo port + "https://narherderdummyvalue"))))) + + (when canonical-storage + (for-each + (lambda (uri) + (unless (string=? canonical-storage + (dirname narinfo-file)) + (let ((source + (string-append + (dirname narinfo-file) + "/" (uri-decode (uri-path uri)))) + (dest + (string-append + canonical-storage + "/" (uri-decode (uri-path uri))))) + (simple-format (current-error-port) + "moving ~A to ~A\n" + source dest) + (rename-file source dest)))) + (narinfo-uris narinfo))) + + (database-insert-narinfo + database + narinfo + #:tags (or (assq-ref opts 'tags) + '())) + + (report) + + narinfo)) + narinfos))) (when (assq-ref opts 'ensure-references-exist) (for-each @@ -443,7 +467,9 @@ "missing reference to ~A\n" reference)))) (narinfo-references narinfo)))) - read-narinfos)))))))))) + read-narinfos))))))) + (when (= 1 len) + (simple-format (current-error-port) "imported narinfo\n"))))) (("remove" rest ...) (let* ((opts (parse-options %base-options %base-option-defaults |