diff options
author | Joey Hess <joey@kodama.kitenet.net> | 2008-02-03 16:48:26 -0500 |
---|---|---|
committer | Joey Hess <joey@kodama.kitenet.net> | 2008-02-03 16:48:26 -0500 |
commit | 9d54cc4659248f9820f47a021b694405d75404a8 (patch) | |
tree | 6d2b775006630ace819299997312f10b979e7ff4 /IkiWiki | |
parent | 38affb0c1c4e2b89beb63d6f8dc3f172eee7bd02 (diff) | |
download | ikiwiki-9d54cc4659248f9820f47a021b694405d75404a8.tar ikiwiki-9d54cc4659248f9820f47a021b694405d75404a8.tar.gz |
implement aggregate_locking design
Now aggregation will not lock the wiki. Any changes made during aggregaton are
merged in with the changed state accumulated while aggregating. A separate
lock file prevents multiple concurrent aggregators. Garbage collection
of orphaned guids is much improved. loadstate() is only called once
per process, so tricky support for reloading wiki state is not needed.
(Tested fairly thuroughly.)
Diffstat (limited to 'IkiWiki')
-rw-r--r-- | IkiWiki/Plugin/aggregate.pm | 181 |
1 files changed, 124 insertions, 57 deletions
diff --git a/IkiWiki/Plugin/aggregate.pm b/IkiWiki/Plugin/aggregate.pm index cfc4ec955..ba40ee6bc 100644 --- a/IkiWiki/Plugin/aggregate.pm +++ b/IkiWiki/Plugin/aggregate.pm @@ -33,33 +33,62 @@ sub getopt () { #{{{ sub checkconfig () { #{{{ if ($config{aggregate} && ! ($config{post_commit} && IkiWiki::commit_hook_enabled())) { - if (! IkiWiki::lockwiki(0)) { - debug("wiki is locked by another process, not aggregating"); - exit 1; - } - + # See if any feeds need aggregation. loadstate(); - IkiWiki::loadindex(); - aggregate(); - expire(); - savestate(); - clearstate(); + my @feeds=needsaggregate(); + return unless @feeds; + if (! lockaggregate()) { + debug("an aggregation process is already running"); + return; + } + # force a later rebuild of source pages + $IkiWiki::forcerebuild{$_->{sourcepage}}=1 + foreach @feeds; + + # Fork a child process to handle the aggregation. + # The parent process will then handle building the + # result. This avoids messy code to clear state + # accumulated while aggregating. + defined(my $pid = fork) or error("Can't fork: $!"); + if (! $pid) { + IkiWiki::loadindex(); + + # Aggregation happens without the main wiki lock + # being held. This allows editing pages etc while + # aggregation is running. + aggregate(@feeds); + + IkiWiki::lockwiki; + # Merge changes, since aggregation state may have + # changed on disk while the aggregation was happening. + mergestate(); + expire(); + savestate(); + IkiWiki::unlockwiki; + exit 0; + } + waitpid($pid,0); + if ($?) { + error "aggregation failed with code $?"; + } - IkiWiki::unlockwiki(); + clearstate(); + unlockaggregate(); } } #}}} sub needsbuild (@) { #{{{ my $needsbuild=shift; - loadstate(); # if not already loaded + loadstate(); foreach my $feed (values %feeds) { if (exists $pagesources{$feed->{sourcepage}} && grep { $_ eq $pagesources{$feed->{sourcepage}} } @$needsbuild) { - # Mark all feeds originating on this page as removable; - # preprocess will unmark those that still exist. - remove_feeds($feed->{sourcepage}); + # Mark all feeds originating on this page as + # not yet seen; preprocess will unmark those that + # still exist. + markunseen($feed->{sourcepage}); } } } # }}} @@ -92,8 +121,7 @@ sub preprocess (@) { #{{{ $feed->{updateinterval}=defined $params{updateinterval} ? $params{updateinterval} * 60 : 15 * 60; $feed->{expireage}=defined $params{expireage} ? $params{expireage} : 0; $feed->{expirecount}=defined $params{expirecount} ? $params{expirecount} : 0; - delete $feed->{remove}; - delete $feed->{expired}; + delete $feed->{unseen}; $feed->{lastupdate}=0 unless defined $feed->{lastupdate}; $feed->{numposts}=0 unless defined $feed->{numposts}; $feed->{newposts}=0 unless defined $feed->{newposts}; @@ -123,16 +151,27 @@ sub delete (@) { #{{{ # Remove feed data for removed pages. foreach my $file (@files) { my $page=pagename($file); - remove_feeds($page); + markunseen($page); + } +} #}}} + +sub markunseen ($) { #{{{ + my $page=shift; + + foreach my $id (keys %feeds) { + if ($feeds{$id}->{sourcepage} eq $page) { + $feeds{$id}->{unseen}=1; + } } } #}}} my $state_loaded=0; + sub loadstate () { #{{{ return if $state_loaded; $state_loaded=1; if (-e "$config{wikistatedir}/aggregate") { - open(IN, "<", "$config{wikistatedir}/aggregate") || + open(IN, "$config{wikistatedir}/aggregate") || die "$config{wikistatedir}/aggregate: $!"; while (<IN>) { $_=IkiWiki::possibly_foolish_untaint($_); @@ -166,32 +205,13 @@ sub loadstate () { #{{{ sub savestate () { #{{{ return unless $state_loaded; + garbage_collect(); eval q{use HTML::Entities}; error($@) if $@; my $newfile="$config{wikistatedir}/aggregate.new"; my $cleanup = sub { unlink($newfile) }; - open (OUT, ">", $newfile) || error("open $newfile: $!", $cleanup); + open (OUT, ">$newfile") || error("open $newfile: $!", $cleanup); foreach my $data (values %feeds, values %guids) { - if ($data->{remove}) { - if ($data->{name}) { - foreach my $guid (values %guids) { - if ($guid->{feed} eq $data->{name}) { - $guid->{remove}=1; - } - } - } - else { - unlink pagefile($data->{page}) - if exists $data->{page}; - } - next; - } - elsif ($data->{expired} && exists $data->{page}) { - unlink pagefile($data->{page}); - delete $data->{page}; - delete $data->{md5}; - } - my @line; foreach my $field (keys %$data) { if ($field eq "name" || $field eq "feed" || @@ -212,6 +232,63 @@ sub savestate () { #{{{ error("rename $newfile: $!", $cleanup); } #}}} +sub garbage_collect () { #{{{ + foreach my $name (keys %feeds) { + # remove any feeds that were not seen while building the pages + # that used to contain them + if ($feeds{$name}->{unseen}) { + delete $feeds{$name}; + } + } + + foreach my $guid (values %guids) { + # any guid whose feed is gone should be removed + if (! exists $feeds{$guid->{feed}}) { + unlink pagefile($guid->{page}) + if exists $guid->{page}; + delete $guids{$guid->{guid}}; + } + # handle expired guids + elsif ($guid->{expired} && exists $guid->{page}) { + unlink pagefile($guid->{page}); + delete $guid->{page}; + delete $guid->{md5}; + } + } +} #}}} + +sub mergestate () { #{{{ + # Load the current state in from disk, and merge into it + # values from the state in memory that might have changed + # during aggregation. + my %myfeeds=%feeds; + my %myguids=%guids; + clearstate(); + loadstate(); + + # All that can change in feed state during aggregation is a few + # fields. + foreach my $name (keys %myfeeds) { + if (exists $feeds{$name}) { + foreach my $field (qw{message lastupdate numposts + newposts error}) { + $feeds{$name}->{$field}=$myfeeds{$name}->{$field}; + } + } + } + + # New guids can be created during aggregation. + # It's also possible that guids were removed from the on-disk state + # while the aggregation was in process. That would only happen if + # their feed was also removed, so any removed guids added back here + # will be garbage collected later. + foreach my $guid (keys %myguids) { + if (! exists $guids{$guid}) { + $guids{$guid}=$myguids{$guid}; + } + } +} #}}} + sub clearstate () { #{{{ %feeds=(); %guids=(); @@ -249,7 +326,12 @@ sub expire () { #{{{ } } #}}} -sub aggregate () { #{{{ +sub needsaggregate () { #{{{ + return values %feeds if $config{rebuild}; + return grep { time - $_->{lastupdate} >= $_->{updateinterval} } values %feeds; +} #}}} + +sub aggregate (@) { #{{{ eval q{use XML::Feed}; error($@) if $@; eval q{use URI::Fetch}; @@ -257,15 +339,12 @@ sub aggregate () { #{{{ eval q{use HTML::Entities}; error($@) if $@; - foreach my $feed (values %feeds) { - next unless $config{rebuild} || - time - $feed->{lastupdate} >= $feed->{updateinterval}; + foreach my $feed (@_) { $feed->{lastupdate}=time; $feed->{newposts}=0; $feed->{message}=sprintf(gettext("processed ok at %s"), displaytime($feed->{lastupdate})); $feed->{error}=0; - $IkiWiki::forcerebuild{$feed->{sourcepage}}=1; debug(sprintf(gettext("checking feed %s ..."), $feed->{name})); @@ -473,18 +552,6 @@ sub htmlabs ($$) { #{{{ return $ret; } #}}} -sub remove_feeds () { #{{{ - my $page=shift; - - my %removed; - foreach my $id (keys %feeds) { - if ($feeds{$id}->{sourcepage} eq $page) { - $feeds{$id}->{remove}=1; - $removed{$id}=1; - } - } -} #}}} - sub pagefile ($) { #{{{ my $page=shift; |