aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nar-herder/recent-changes.scm59
-rw-r--r--nar-herder/utils.scm15
-rw-r--r--scripts/nar-herder.in78
3 files changed, 90 insertions, 62 deletions
diff --git a/nar-herder/recent-changes.scm b/nar-herder/recent-changes.scm
index ccfff93..62bd604 100644
--- a/nar-herder/recent-changes.scm
+++ b/nar-herder/recent-changes.scm
@@ -137,35 +137,38 @@
(log-msg 'ERROR "exception in recent change listener " exn)
#f)
(lambda ()
- (let* ((recent-changes
- (database-select-recent-changes database after))
- (unprocessed-recent-changes
- (remove
+ (with-throw-handler #t
+ (lambda ()
+ (let* ((recent-changes
+ (database-select-recent-changes database after))
+ (unprocessed-recent-changes
+ (remove
+ (lambda (change-details)
+ (member change-details last-processed-recent-changes))
+ recent-changes)))
+
+ (unless (null? unprocessed-recent-changes)
+ (log-msg 'INFO "processing " (length unprocessed-recent-changes)
+ " recent changes")
+
+ (for-each
(lambda (change-details)
- (member change-details last-processed-recent-changes))
- recent-changes)))
-
- (unless (null? unprocessed-recent-changes)
- (log-msg 'INFO "processing " (length unprocessed-recent-changes)
- " recent changes")
-
- (metric-increment recent-changes-count-metric
- #:by (length unprocessed-recent-changes))
-
- (for-each
- (lambda (change-details)
- (let ((change (assq-ref change-details 'change)))
- (cond
- ((string=? change "addition")
- (process-addition-change change-details))
- ((string=? change "removal")
- (process-removal-change change-details))
- (else #f))))
- unprocessed-recent-changes))
-
- ;; Use the unprocessed recent changes here to carry
- ;; forward all processed changes to the next pass
- unprocessed-recent-changes))
+ (let ((change (assq-ref change-details 'change)))
+ (cond
+ ((string=? change "addition")
+ (process-addition-change change-details))
+ ((string=? change "removal")
+ (process-removal-change change-details))
+ (else #f))))
+ unprocessed-recent-changes)
+
+ (metric-increment recent-changes-count-metric
+ #:by (length unprocessed-recent-changes)))
+ ;; Use the unprocessed recent changes here to carry
+ ;; forward all processed changes to the next pass
+ unprocessed-recent-changes))
+ (lambda _
+ (backtrace))))
#:unwind? #t)
(#f (loop after '()))
(recent-changes
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index 4755d33..4261b05 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -701,7 +701,7 @@ If already in the worker thread, call PROC immediately."
(define &port-timeout
(make-exception-type '&port-timeout
&external-error
- '(port)))
+ '(thunk port)))
(define make-port-timeout-error
(record-constructor &port-timeout))
@@ -735,7 +735,7 @@ If already in the worker thread, call PROC immediately."
#:key timeout
(read-timeout timeout)
(write-timeout timeout))
- (define (no-fibers-wait port mode timeout)
+ (define (no-fibers-wait thunk port mode timeout)
(define poll-timeout-ms 200)
;; When the GC runs, it restarts the poll syscall, but the timeout
@@ -746,8 +746,7 @@ If already in the worker thread, call PROC immediately."
;; timed out overall.
(let ((timeout-internal
(+ (get-internal-real-time)
- (* internal-time-units-per-second
- (/ timeout 1000)))))
+ (* timeout internal-time-units-per-second))))
(let loop ((poll-value
(port-poll port mode poll-timeout-ms)))
(if (= poll-value 0)
@@ -755,8 +754,8 @@ If already in the worker thread, call PROC immediately."
timeout-internal)
(raise-exception
(if (string=? mode "r")
- (make-port-read-timeout-error port)
- (make-port-write-timeout-error port)))
+ (make-port-read-timeout-error thunk port)
+ (make-port-write-timeout-error thunk port)))
(loop (port-poll port mode poll-timeout-ms)))
poll-value))))
@@ -772,7 +771,7 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(raise-exception
(make-port-read-timeout-error thunk port))))))
- (no-fibers-wait port "r" read-timeout))))
+ (no-fibers-wait thunk port "r" read-timeout))))
(current-write-waiter
(lambda (port)
(if (current-scheduler)
@@ -784,5 +783,5 @@ If already in the worker thread, call PROC immediately."
(lambda ()
(raise-exception
(make-port-write-timeout-error thunk port))))))
- (no-fibers-wait port "w" write-timeout)))))
+ (no-fibers-wait thunk port "w" write-timeout)))))
(thunk)))
diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in
index 515b98a..7f5db5c 100644
--- a/scripts/nar-herder.in
+++ b/scripts/nar-herder.in
@@ -391,37 +391,61 @@
(assq-ref opts 'arguments)))
(len (length narinfos))
(progress
- (progress-reporter/bar len
- (format #f "importing ~a narinfos"
- len)
- (current-error-port))))
+ (if (= 1 len)
+ progress-reporter/silent
+ (progress-reporter/bar len
+ (format #f "importing ~a narinfos"
+ len)
+ (current-error-port)))))
(call-with-progress-reporter progress
(lambda (report)
(database-call-with-transaction
database
(lambda (db)
- (let ((read-narinfos
- (map
- (lambda (narinfo-file)
- (let ((narinfo
- (call-with-input-file narinfo-file
- (lambda (port)
- ;; Set url to a dummy value as this doesn't
- ;; matter
- (read-narinfo port
- "https://narherderdummyvalue")))))
-
- (database-insert-narinfo
- database
- narinfo
- #:tags (or (assq-ref opts 'tags)
- '()))
-
- (report)
-
- narinfo))
- narinfos)))
+ (let* ((canonical-storage
+ (and=> (assq-ref opts 'storage)
+ canonicalize-path))
+ (read-narinfos
+ (map
+ (lambda (narinfo-file)
+ (let ((narinfo
+ (call-with-input-file narinfo-file
+ (lambda (port)
+ ;; Set url to a dummy value as this doesn't
+ ;; matter
+ (read-narinfo port
+ "https://narherderdummyvalue")))))
+
+ (when canonical-storage
+ (for-each
+ (lambda (uri)
+ (unless (string=? canonical-storage
+ (dirname narinfo-file))
+ (let ((source
+ (string-append
+ (dirname narinfo-file)
+ "/" (uri-decode (uri-path uri))))
+ (dest
+ (string-append
+ canonical-storage
+ "/" (uri-decode (uri-path uri)))))
+ (simple-format (current-error-port)
+ "moving ~A to ~A\n"
+ source dest)
+ (rename-file source dest))))
+ (narinfo-uris narinfo)))
+
+ (database-insert-narinfo
+ database
+ narinfo
+ #:tags (or (assq-ref opts 'tags)
+ '()))
+
+ (report)
+
+ narinfo))
+ narinfos)))
(when (assq-ref opts 'ensure-references-exist)
(for-each
@@ -443,7 +467,9 @@
"missing reference to ~A\n"
reference))))
(narinfo-references narinfo))))
- read-narinfos))))))))))
+ read-narinfos)))))))
+ (when (= 1 len)
+ (simple-format (current-error-port) "imported narinfo\n")))))
(("remove" rest ...)
(let* ((opts (parse-options %base-options
%base-option-defaults