summaryrefslogtreecommitdiff
path: root/vendor/github.com/hashicorp/terraform/vendor/github.com/hashicorp/raft/raft.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/terraform/vendor/github.com/hashicorp/raft/raft.go')
-rw-r--r--vendor/github.com/hashicorp/terraform/vendor/github.com/hashicorp/raft/raft.go1456
1 files changed, 0 insertions, 1456 deletions
diff --git a/vendor/github.com/hashicorp/terraform/vendor/github.com/hashicorp/raft/raft.go b/vendor/github.com/hashicorp/terraform/vendor/github.com/hashicorp/raft/raft.go
deleted file mode 100644
index aa8fe820..00000000
--- a/vendor/github.com/hashicorp/terraform/vendor/github.com/hashicorp/raft/raft.go
+++ /dev/null
@@ -1,1456 +0,0 @@
-package raft
-
-import (
- "bytes"
- "container/list"
- "fmt"
- "io"
- "time"
-
- "github.com/armon/go-metrics"
-)
-
-const (
- minCheckInterval = 10 * time.Millisecond
-)
-
-var (
- keyCurrentTerm = []byte("CurrentTerm")
- keyLastVoteTerm = []byte("LastVoteTerm")
- keyLastVoteCand = []byte("LastVoteCand")
-)
-
-// getRPCHeader returns an initialized RPCHeader struct for the given
-// Raft instance. This structure is sent along with RPC requests and
-// responses.
-func (r *Raft) getRPCHeader() RPCHeader {
- return RPCHeader{
- ProtocolVersion: r.conf.ProtocolVersion,
- }
-}
-
-// checkRPCHeader houses logic about whether this instance of Raft can process
-// the given RPC message.
-func (r *Raft) checkRPCHeader(rpc RPC) error {
- // Get the header off the RPC message.
- wh, ok := rpc.Command.(WithRPCHeader)
- if !ok {
- return fmt.Errorf("RPC does not have a header")
- }
- header := wh.GetRPCHeader()
-
- // First check is to just make sure the code can understand the
- // protocol at all.
- if header.ProtocolVersion < ProtocolVersionMin ||
- header.ProtocolVersion > ProtocolVersionMax {
- return ErrUnsupportedProtocol
- }
-
- // Second check is whether we should support this message, given the
- // current protocol we are configured to run. This will drop support
- // for protocol version 0 starting at protocol version 2, which is
- // currently what we want, and in general support one version back. We
- // may need to revisit this policy depending on how future protocol
- // changes evolve.
- if header.ProtocolVersion < r.conf.ProtocolVersion-1 {
- return ErrUnsupportedProtocol
- }
-
- return nil
-}
-
-// getSnapshotVersion returns the snapshot version that should be used when
-// creating snapshots, given the protocol version in use.
-func getSnapshotVersion(protocolVersion ProtocolVersion) SnapshotVersion {
- // Right now we only have two versions and they are backwards compatible
- // so we don't need to look at the protocol version.
- return 1
-}
-
-// commitTuple is used to send an index that was committed,
-// with an optional associated future that should be invoked.
-type commitTuple struct {
- log *Log
- future *logFuture
-}
-
-// leaderState is state that is used while we are a leader.
-type leaderState struct {
- commitCh chan struct{}
- commitment *commitment
- inflight *list.List // list of logFuture in log index order
- replState map[ServerID]*followerReplication
- notify map[*verifyFuture]struct{}
- stepDown chan struct{}
-}
-
-// setLeader is used to modify the current leader of the cluster
-func (r *Raft) setLeader(leader ServerAddress) {
- r.leaderLock.Lock()
- r.leader = leader
- r.leaderLock.Unlock()
-}
-
-// requestConfigChange is a helper for the above functions that make
-// configuration change requests. 'req' describes the change. For timeout,
-// see AddVoter.
-func (r *Raft) requestConfigChange(req configurationChangeRequest, timeout time.Duration) IndexFuture {
- var timer <-chan time.Time
- if timeout > 0 {
- timer = time.After(timeout)
- }
- future := &configurationChangeFuture{
- req: req,
- }
- future.init()
- select {
- case <-timer:
- return errorFuture{ErrEnqueueTimeout}
- case r.configurationChangeCh <- future:
- return future
- case <-r.shutdownCh:
- return errorFuture{ErrRaftShutdown}
- }
-}
-
-// run is a long running goroutine that runs the Raft FSM.
-func (r *Raft) run() {
- for {
- // Check if we are doing a shutdown
- select {
- case <-r.shutdownCh:
- // Clear the leader to prevent forwarding
- r.setLeader("")
- return
- default:
- }
-
- // Enter into a sub-FSM
- switch r.getState() {
- case Follower:
- r.runFollower()
- case Candidate:
- r.runCandidate()
- case Leader:
- r.runLeader()
- }
- }
-}
-
-// runFollower runs the FSM for a follower.
-func (r *Raft) runFollower() {
- didWarn := false
- r.logger.Printf("[INFO] raft: %v entering Follower state (Leader: %q)", r, r.Leader())
- metrics.IncrCounter([]string{"raft", "state", "follower"}, 1)
- heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
- for {
- select {
- case rpc := <-r.rpcCh:
- r.processRPC(rpc)
-
- case c := <-r.configurationChangeCh:
- // Reject any operations since we are not the leader
- c.respond(ErrNotLeader)
-
- case a := <-r.applyCh:
- // Reject any operations since we are not the leader
- a.respond(ErrNotLeader)
-
- case v := <-r.verifyCh:
- // Reject any operations since we are not the leader
- v.respond(ErrNotLeader)
-
- case r := <-r.userRestoreCh:
- // Reject any restores since we are not the leader
- r.respond(ErrNotLeader)
-
- case c := <-r.configurationsCh:
- c.configurations = r.configurations.Clone()
- c.respond(nil)
-
- case b := <-r.bootstrapCh:
- b.respond(r.liveBootstrap(b.configuration))
-
- case <-heartbeatTimer:
- // Restart the heartbeat timer
- heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)
-
- // Check if we have had a successful contact
- lastContact := r.LastContact()
- if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {
- continue
- }
-
- // Heartbeat failed! Transition to the candidate state
- lastLeader := r.Leader()
- r.setLeader("")
-
- if r.configurations.latestIndex == 0 {
- if !didWarn {
- r.logger.Printf("[WARN] raft: no known peers, aborting election")
- didWarn = true
- }
- } else if r.configurations.latestIndex == r.configurations.committedIndex &&
- !hasVote(r.configurations.latest, r.localID) {
- if !didWarn {
- r.logger.Printf("[WARN] raft: not part of stable configuration, aborting election")
- didWarn = true
- }
- } else {
- r.logger.Printf(`[WARN] raft: Heartbeat timeout from %q reached, starting election`, lastLeader)
- metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1)
- r.setState(Candidate)
- return
- }
-
- case <-r.shutdownCh:
- return
- }
- }
-}
-
-// liveBootstrap attempts to seed an initial configuration for the cluster. See
-// the Raft object's member BootstrapCluster for more details. This must only be
-// called on the main thread, and only makes sense in the follower state.
-func (r *Raft) liveBootstrap(configuration Configuration) error {
- // Use the pre-init API to make the static updates.
- err := BootstrapCluster(&r.conf, r.logs, r.stable, r.snapshots,
- r.trans, configuration)
- if err != nil {
- return err
- }
-
- // Make the configuration live.
- var entry Log
- if err := r.logs.GetLog(1, &entry); err != nil {
- panic(err)
- }
- r.setCurrentTerm(1)
- r.setLastLog(entry.Index, entry.Term)
- r.processConfigurationLogEntry(&entry)
- return nil
-}
-
-// runCandidate runs the FSM for a candidate.
-func (r *Raft) runCandidate() {
- r.logger.Printf("[INFO] raft: %v entering Candidate state in term %v",
- r, r.getCurrentTerm()+1)
- metrics.IncrCounter([]string{"raft", "state", "candidate"}, 1)
-
- // Start vote for us, and set a timeout
- voteCh := r.electSelf()
- electionTimer := randomTimeout(r.conf.ElectionTimeout)
-
- // Tally the votes, need a simple majority
- grantedVotes := 0
- votesNeeded := r.quorumSize()
- r.logger.Printf("[DEBUG] raft: Votes needed: %d", votesNeeded)
-
- for r.getState() == Candidate {
- select {
- case rpc := <-r.rpcCh:
- r.processRPC(rpc)
-
- case vote := <-voteCh:
- // Check if the term is greater than ours, bail
- if vote.Term > r.getCurrentTerm() {
- r.logger.Printf("[DEBUG] raft: Newer term discovered, fallback to follower")
- r.setState(Follower)
- r.setCurrentTerm(vote.Term)
- return
- }
-
- // Check if the vote is granted
- if vote.Granted {
- grantedVotes++
- r.logger.Printf("[DEBUG] raft: Vote granted from %s in term %v. Tally: %d",
- vote.voterID, vote.Term, grantedVotes)
- }
-
- // Check if we've become the leader
- if grantedVotes >= votesNeeded {
- r.logger.Printf("[INFO] raft: Election won. Tally: %d", grantedVotes)
- r.setState(Leader)
- r.setLeader(r.localAddr)
- return
- }
-
- case c := <-r.configurationChangeCh:
- // Reject any operations since we are not the leader
- c.respond(ErrNotLeader)
-
- case a := <-r.applyCh:
- // Reject any operations since we are not the leader
- a.respond(ErrNotLeader)
-
- case v := <-r.verifyCh:
- // Reject any operations since we are not the leader
- v.respond(ErrNotLeader)
-
- case r := <-r.userRestoreCh:
- // Reject any restores since we are not the leader
- r.respond(ErrNotLeader)
-
- case c := <-r.configurationsCh:
- c.configurations = r.configurations.Clone()
- c.respond(nil)
-
- case b := <-r.bootstrapCh:
- b.respond(ErrCantBootstrap)
-
- case <-electionTimer:
- // Election failed! Restart the election. We simply return,
- // which will kick us back into runCandidate
- r.logger.Printf("[WARN] raft: Election timeout reached, restarting election")
- return
-
- case <-r.shutdownCh:
- return
- }
- }
-}
-
-// runLeader runs the FSM for a leader. Do the setup here and drop into
-// the leaderLoop for the hot loop.
-func (r *Raft) runLeader() {
- r.logger.Printf("[INFO] raft: %v entering Leader state", r)
- metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)
-
- // Notify that we are the leader
- asyncNotifyBool(r.leaderCh, true)
-
- // Push to the notify channel if given
- if notify := r.conf.NotifyCh; notify != nil {
- select {
- case notify <- true:
- case <-r.shutdownCh:
- }
- }
-
- // Setup leader state
- r.leaderState.commitCh = make(chan struct{}, 1)
- r.leaderState.commitment = newCommitment(r.leaderState.commitCh,
- r.configurations.latest,
- r.getLastIndex()+1 /* first index that may be committed in this term */)
- r.leaderState.inflight = list.New()
- r.leaderState.replState = make(map[ServerID]*followerReplication)
- r.leaderState.notify = make(map[*verifyFuture]struct{})
- r.leaderState.stepDown = make(chan struct{}, 1)
-
- // Cleanup state on step down
- defer func() {
- // Since we were the leader previously, we update our
- // last contact time when we step down, so that we are not
- // reporting a last contact time from before we were the
- // leader. Otherwise, to a client it would seem our data
- // is extremely stale.
- r.setLastContact()
-
- // Stop replication
- for _, p := range r.leaderState.replState {
- close(p.stopCh)
- }
-
- // Respond to all inflight operations
- for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
- e.Value.(*logFuture).respond(ErrLeadershipLost)
- }
-
- // Respond to any pending verify requests
- for future := range r.leaderState.notify {
- future.respond(ErrLeadershipLost)
- }
-
- // Clear all the state
- r.leaderState.commitCh = nil
- r.leaderState.commitment = nil
- r.leaderState.inflight = nil
- r.leaderState.replState = nil
- r.leaderState.notify = nil
- r.leaderState.stepDown = nil
-
- // If we are stepping down for some reason, no known leader.
- // We may have stepped down due to an RPC call, which would
- // provide the leader, so we cannot always blank this out.
- r.leaderLock.Lock()
- if r.leader == r.localAddr {
- r.leader = ""
- }
- r.leaderLock.Unlock()
-
- // Notify that we are not the leader
- asyncNotifyBool(r.leaderCh, false)
-
- // Push to the notify channel if given
- if notify := r.conf.NotifyCh; notify != nil {
- select {
- case notify <- false:
- case <-r.shutdownCh:
- // On shutdown, make a best effort but do not block
- select {
- case notify <- false:
- default:
- }
- }
- }
- }()
-
- // Start a replication routine for each peer
- r.startStopReplication()
-
- // Dispatch a no-op log entry first. This gets this leader up to the latest
- // possible commit index, even in the absence of client commands. This used
- // to append a configuration entry instead of a noop. However, that permits
- // an unbounded number of uncommitted configurations in the log. We now
- // maintain that there exists at most one uncommitted configuration entry in
- // any log, so we have to do proper no-ops here.
- noop := &logFuture{
- log: Log{
- Type: LogNoop,
- },
- }
- r.dispatchLogs([]*logFuture{noop})
-
- // Sit in the leader loop until we step down
- r.leaderLoop()
-}
-
-// startStopReplication will set up state and start asynchronous replication to
-// new peers, and stop replication to removed peers. Before removing a peer,
-// it'll instruct the replication routines to try to replicate to the current
-// index. This must only be called from the main thread.
-func (r *Raft) startStopReplication() {
- inConfig := make(map[ServerID]bool, len(r.configurations.latest.Servers))
- lastIdx := r.getLastIndex()
-
- // Start replication goroutines that need starting
- for _, server := range r.configurations.latest.Servers {
- if server.ID == r.localID {
- continue
- }
- inConfig[server.ID] = true
- if _, ok := r.leaderState.replState[server.ID]; !ok {
- r.logger.Printf("[INFO] raft: Added peer %v, starting replication", server.ID)
- s := &followerReplication{
- peer: server,
- commitment: r.leaderState.commitment,
- stopCh: make(chan uint64, 1),
- triggerCh: make(chan struct{}, 1),
- currentTerm: r.getCurrentTerm(),
- nextIndex: lastIdx + 1,
- lastContact: time.Now(),
- notifyCh: make(chan struct{}, 1),
- stepDown: r.leaderState.stepDown,
- }
- r.leaderState.replState[server.ID] = s
- r.goFunc(func() { r.replicate(s) })
- asyncNotifyCh(s.triggerCh)
- }
- }
-
- // Stop replication goroutines that need stopping
- for serverID, repl := range r.leaderState.replState {
- if inConfig[serverID] {
- continue
- }
- // Replicate up to lastIdx and stop
- r.logger.Printf("[INFO] raft: Removed peer %v, stopping replication after %v", serverID, lastIdx)
- repl.stopCh <- lastIdx
- close(repl.stopCh)
- delete(r.leaderState.replState, serverID)
- }
-}
-
-// configurationChangeChIfStable returns r.configurationChangeCh if it's safe
-// to process requests from it, or nil otherwise. This must only be called
-// from the main thread.
-//
-// Note that if the conditions here were to change outside of leaderLoop to take
-// this from nil to non-nil, we would need leaderLoop to be kicked.
-func (r *Raft) configurationChangeChIfStable() chan *configurationChangeFuture {
- // Have to wait until:
- // 1. The latest configuration is committed, and
- // 2. This leader has committed some entry (the noop) in this term
- // https://groups.google.com/forum/#!msg/raft-dev/t4xj6dJTP6E/d2D9LrWRza8J
- if r.configurations.latestIndex == r.configurations.committedIndex &&
- r.getCommitIndex() >= r.leaderState.commitment.startIndex {
- return r.configurationChangeCh
- }
- return nil
-}
-
-// leaderLoop is the hot loop for a leader. It is invoked
-// after all the various leader setup is done.
-func (r *Raft) leaderLoop() {
- // stepDown is used to track if there is an inflight log that
- // would cause us to lose leadership (specifically a RemovePeer of
- // ourselves). If this is the case, we must not allow any logs to
- // be processed in parallel, otherwise we are basing commit on
- // only a single peer (ourself) and replicating to an undefined set
- // of peers.
- stepDown := false
-
- lease := time.After(r.conf.LeaderLeaseTimeout)
- for r.getState() == Leader {
- select {
- case rpc := <-r.rpcCh:
- r.processRPC(rpc)
-
- case <-r.leaderState.stepDown:
- r.setState(Follower)
-
- case <-r.leaderState.commitCh:
- // Process the newly committed entries
- oldCommitIndex := r.getCommitIndex()
- commitIndex := r.leaderState.commitment.getCommitIndex()
- r.setCommitIndex(commitIndex)
-
- if r.configurations.latestIndex > oldCommitIndex &&
- r.configurations.latestIndex <= commitIndex {
- r.configurations.committed = r.configurations.latest
- r.configurations.committedIndex = r.configurations.latestIndex
- if !hasVote(r.configurations.committed, r.localID) {
- stepDown = true
- }
- }
-
- for {
- e := r.leaderState.inflight.Front()
- if e == nil {
- break
- }
- commitLog := e.Value.(*logFuture)
- idx := commitLog.log.Index
- if idx > commitIndex {
- break
- }
- // Measure the commit time
- metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
- r.processLogs(idx, commitLog)
- r.leaderState.inflight.Remove(e)
- }
-
- if stepDown {
- if r.conf.ShutdownOnRemove {
- r.logger.Printf("[INFO] raft: Removed ourself, shutting down")
- r.Shutdown()
- } else {
- r.logger.Printf("[INFO] raft: Removed ourself, transitioning to follower")
- r.setState(Follower)
- }
- }
-
- case v := <-r.verifyCh:
- if v.quorumSize == 0 {
- // Just dispatched, start the verification
- r.verifyLeader(v)
-
- } else if v.votes < v.quorumSize {
- // Early return, means there must be a new leader
- r.logger.Printf("[WARN] raft: New leader elected, stepping down")
- r.setState(Follower)
- delete(r.leaderState.notify, v)
- v.respond(ErrNotLeader)
-
- } else {
- // Quorum of members agree, we are still leader
- delete(r.leaderState.notify, v)
- v.respond(nil)
- }
-
- case future := <-r.userRestoreCh:
- err := r.restoreUserSnapshot(future.meta, future.reader)
- future.respond(err)
-
- case c := <-r.configurationsCh:
- c.configurations = r.configurations.Clone()
- c.respond(nil)
-
- case future := <-r.configurationChangeChIfStable():
- r.appendConfigurationEntry(future)
-
- case b := <-r.bootstrapCh:
- b.respond(ErrCantBootstrap)
-
- case newLog := <-r.applyCh:
- // Group commit, gather all the ready commits
- ready := []*logFuture{newLog}
- for i := 0; i < r.conf.MaxAppendEntries; i++ {
- select {
- case newLog := <-r.applyCh:
- ready = append(ready, newLog)
- default:
- break
- }
- }
-
- // Dispatch the logs
- if stepDown {
- // we're in the process of stepping down as leader, don't process anything new
- for i := range ready {
- ready[i].respond(ErrNotLeader)
- }
- } else {
- r.dispatchLogs(ready)
- }
-
- case <-lease:
- // Check if we've exceeded the lease, potentially stepping down
- maxDiff := r.checkLeaderLease()
-
- // Next check interval should adjust for the last node we've
- // contacted, without going negative
- checkInterval := r.conf.LeaderLeaseTimeout - maxDiff
- if checkInterval < minCheckInterval {
- checkInterval = minCheckInterval
- }
-
- // Renew the lease timer
- lease = time.After(checkInterval)
-
- case <-r.shutdownCh:
- return
- }
- }
-}
-
-// verifyLeader must be called from the main thread for safety.
-// Causes the followers to attempt an immediate heartbeat.
-func (r *Raft) verifyLeader(v *verifyFuture) {
- // Current leader always votes for self
- v.votes = 1
-
- // Set the quorum size, hot-path for single node
- v.quorumSize = r.quorumSize()
- if v.quorumSize == 1 {
- v.respond(nil)
- return
- }
-
- // Track this request
- v.notifyCh = r.verifyCh
- r.leaderState.notify[v] = struct{}{}
-
- // Trigger immediate heartbeats
- for _, repl := range r.leaderState.replState {
- repl.notifyLock.Lock()
- repl.notify = append(repl.notify, v)
- repl.notifyLock.Unlock()
- asyncNotifyCh(repl.notifyCh)
- }
-}
-
-// checkLeaderLease is used to check if we can contact a quorum of nodes
-// within the last leader lease interval. If not, we need to step down,
-// as we may have lost connectivity. Returns the maximum duration without
-// contact. This must only be called from the main thread.
-func (r *Raft) checkLeaderLease() time.Duration {
- // Track contacted nodes, we can always contact ourself
- contacted := 1
-
- // Check each follower
- var maxDiff time.Duration
- now := time.Now()
- for peer, f := range r.leaderState.replState {
- diff := now.Sub(f.LastContact())
- if diff <= r.conf.LeaderLeaseTimeout {
- contacted++
- if diff > maxDiff {
- maxDiff = diff
- }
- } else {
- // Log at least once at high value, then debug. Otherwise it gets very verbose.
- if diff <= 3*r.conf.LeaderLeaseTimeout {
- r.logger.Printf("[WARN] raft: Failed to contact %v in %v", peer, diff)
- } else {
- r.logger.Printf("[DEBUG] raft: Failed to contact %v in %v", peer, diff)
- }
- }
- metrics.AddSample([]string{"raft", "leader", "lastContact"}, float32(diff/time.Millisecond))
- }
-
- // Verify we can contact a quorum
- quorum := r.quorumSize()
- if contacted < quorum {
- r.logger.Printf("[WARN] raft: Failed to contact quorum of nodes, stepping down")
- r.setState(Follower)
- metrics.IncrCounter([]string{"raft", "transition", "leader_lease_timeout"}, 1)
- }
- return maxDiff
-}
-
-// quorumSize is used to return the quorum size. This must only be called on
-// the main thread.
-// TODO: revisit usage
-func (r *Raft) quorumSize() int {
- voters := 0
- for _, server := range r.configurations.latest.Servers {
- if server.Suffrage == Voter {
- voters++
- }
- }
- return voters/2 + 1
-}
-
-// restoreUserSnapshot is used to manually consume an external snapshot, such
-// as if restoring from a backup. We will use the current Raft configuration,
-// not the one from the snapshot, so that we can restore into a new cluster. We
-// will also use the higher of the index of the snapshot, or the current index,
-// and then add 1 to that, so we force a new state with a hole in the Raft log,
-// so that the snapshot will be sent to followers and used for any new joiners.
-// This can only be run on the leader, and returns a future that can be used to
-// block until complete.
-func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
- defer metrics.MeasureSince([]string{"raft", "restoreUserSnapshot"}, time.Now())
-
- // Sanity check the version.
- version := meta.Version
- if version < SnapshotVersionMin || version > SnapshotVersionMax {
- return fmt.Errorf("unsupported snapshot version %d", version)
- }
-
- // We don't support snapshots while there's a config change
- // outstanding since the snapshot doesn't have a means to
- // represent this state.
- committedIndex := r.configurations.committedIndex
- latestIndex := r.configurations.latestIndex
- if committedIndex != latestIndex {
- return fmt.Errorf("cannot restore snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
- latestIndex, committedIndex)
- }
-
- // Cancel any inflight requests.
- for {
- e := r.leaderState.inflight.Front()
- if e == nil {
- break
- }
- e.Value.(*logFuture).respond(ErrAbortedByRestore)
- r.leaderState.inflight.Remove(e)
- }
-
- // We will overwrite the snapshot metadata with the current term,
- // an index that's greater than the current index, or the last
- // index in the snapshot. It's important that we leave a hole in
- // the index so we know there's nothing in the Raft log there and
- // replication will fault and send the snapshot.
- term := r.getCurrentTerm()
- lastIndex := r.getLastIndex()
- if meta.Index > lastIndex {
- lastIndex = meta.Index
- }
- lastIndex++
-
- // Dump the snapshot. Note that we use the latest configuration,
- // not the one that came with the snapshot.
- sink, err := r.snapshots.Create(version, lastIndex, term,
- r.configurations.latest, r.configurations.latestIndex, r.trans)
- if err != nil {
- return fmt.Errorf("failed to create snapshot: %v", err)
- }
- n, err := io.Copy(sink, reader)
- if err != nil {
- sink.Cancel()
- return fmt.Errorf("failed to write snapshot: %v", err)
- }
- if n != meta.Size {
- sink.Cancel()
- return fmt.Errorf("failed to write snapshot, size didn't match (%d != %d)", n, meta.Size)
- }
- if err := sink.Close(); err != nil {
- return fmt.Errorf("failed to close snapshot: %v", err)
- }
- r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n)
-
- // Restore the snapshot into the FSM. If this fails we are in a
- // bad state so we panic to take ourselves out.
- fsm := &restoreFuture{ID: sink.ID()}
- fsm.init()
- select {
- case r.fsmMutateCh <- fsm:
- case <-r.shutdownCh:
- return ErrRaftShutdown
- }
- if err := fsm.Error(); err != nil {
- panic(fmt.Errorf("failed to restore snapshot: %v", err))
- }
-
- // We set the last log so it looks like we've stored the empty
- // index we burned. The last applied is set because we made the
- // FSM take the snapshot state, and we store the last snapshot
- // in the stable store since we created a snapshot as part of
- // this process.
- r.setLastLog(lastIndex, term)
- r.setLastApplied(lastIndex)
- r.setLastSnapshot(lastIndex, term)
-
- r.logger.Printf("[INFO] raft: Restored user snapshot (index %d)", lastIndex)
- return nil
-}
-
-// appendConfigurationEntry changes the configuration and adds a new
-// configuration entry to the log. This must only be called from the
-// main thread.
-func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
- configuration, err := nextConfiguration(r.configurations.latest, r.configurations.latestIndex, future.req)
- if err != nil {
- future.respond(err)
- return
- }
-
- r.logger.Printf("[INFO] raft: Updating configuration with %s (%v, %v) to %+v",
- future.req.command, future.req.serverID, future.req.serverAddress, configuration.Servers)
-
- // In pre-ID compatibility mode we translate all configuration changes
- // in to an old remove peer message, which can handle all supported
- // cases for peer changes in the pre-ID world (adding and removing
- // voters). Both add peer and remove peer log entries are handled
- // similarly on old Raft servers, but remove peer does extra checks to
- // see if a leader needs to step down. Since they both assert the full
- // configuration, then we can safely call remove peer for everything.
- if r.protocolVersion < 2 {
- future.log = Log{
- Type: LogRemovePeerDeprecated,
- Data: encodePeers(configuration, r.trans),
- }
- } else {
- future.log = Log{
- Type: LogConfiguration,
- Data: encodeConfiguration(configuration),
- }
- }
-
- r.dispatchLogs([]*logFuture{&future.logFuture})
- index := future.Index()
- r.configurations.latest = configuration
- r.configurations.latestIndex = index
- r.leaderState.commitment.setConfiguration(configuration)
- r.startStopReplication()
-}
-
-// dispatchLog is called on the leader to push a log to disk, mark it
-// as inflight and begin replication of it.
-func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
- now := time.Now()
- defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)
-
- term := r.getCurrentTerm()
- lastIndex := r.getLastIndex()
- logs := make([]*Log, len(applyLogs))
-
- for idx, applyLog := range applyLogs {
- applyLog.dispatch = now
- lastIndex++
- applyLog.log.Index = lastIndex
- applyLog.log.Term = term
- logs[idx] = &applyLog.log
- r.leaderState.inflight.PushBack(applyLog)
- }
-
- // Write the log entry locally
- if err := r.logs.StoreLogs(logs); err != nil {
- r.logger.Printf("[ERR] raft: Failed to commit logs: %v", err)
- for _, applyLog := range applyLogs {
- applyLog.respond(err)
- }
- r.setState(Follower)
- return
- }
- r.leaderState.commitment.match(r.localID, lastIndex)
-
- // Update the last log since it's on disk now
- r.setLastLog(lastIndex, term)
-
- // Notify the replicators of the new log
- for _, f := range r.leaderState.replState {
- asyncNotifyCh(f.triggerCh)
- }
-}
-
-// processLogs is used to apply all the committed entires that haven't been
-// applied up to the given index limit.
-// This can be called from both leaders and followers.
-// Followers call this from AppendEntires, for n entires at a time, and always
-// pass future=nil.
-// Leaders call this once per inflight when entries are committed. They pass
-// the future from inflights.
-func (r *Raft) processLogs(index uint64, future *logFuture) {
- // Reject logs we've applied already
- lastApplied := r.getLastApplied()
- if index <= lastApplied {
- r.logger.Printf("[WARN] raft: Skipping application of old log: %d", index)
- return
- }
-
- // Apply all the preceding logs
- for idx := r.getLastApplied() + 1; idx <= index; idx++ {
- // Get the log, either from the future or from our log store
- if future != nil && future.log.Index == idx {
- r.processLog(&future.log, future)
-
- } else {
- l := new(Log)
- if err := r.logs.GetLog(idx, l); err != nil {
- r.logger.Printf("[ERR] raft: Failed to get log at %d: %v", idx, err)
- panic(err)
- }
- r.processLog(l, nil)
- }
-
- // Update the lastApplied index and term
- r.setLastApplied(idx)
- }
-}
-
-// processLog is invoked to process the application of a single committed log entry.
-func (r *Raft) processLog(l *Log, future *logFuture) {
- switch l.Type {
- case LogBarrier:
- // Barrier is handled by the FSM
- fallthrough
-
- case LogCommand:
- // Forward to the fsm handler
- select {
- case r.fsmMutateCh <- &commitTuple{l, future}:
- case <-r.shutdownCh:
- if future != nil {
- future.respond(ErrRaftShutdown)
- }
- }
-
- // Return so that the future is only responded to
- // by the FSM handler when the application is done
- return
-
- case LogConfiguration:
- case LogAddPeerDeprecated:
- case LogRemovePeerDeprecated:
- case LogNoop:
- // Ignore the no-op
-
- default:
- panic(fmt.Errorf("unrecognized log type: %#v", l))
- }
-
- // Invoke the future if given
- if future != nil {
- future.respond(nil)
- }
-}
-
-// processRPC is called to handle an incoming RPC request. This must only be
-// called from the main thread.
-func (r *Raft) processRPC(rpc RPC) {
- if err := r.checkRPCHeader(rpc); err != nil {
- rpc.Respond(nil, err)
- return
- }
-
- switch cmd := rpc.Command.(type) {
- case *AppendEntriesRequest:
- r.appendEntries(rpc, cmd)
- case *RequestVoteRequest:
- r.requestVote(rpc, cmd)
- case *InstallSnapshotRequest:
- r.installSnapshot(rpc, cmd)
- default:
- r.logger.Printf("[ERR] raft: Got unexpected command: %#v", rpc.Command)
- rpc.Respond(nil, fmt.Errorf("unexpected command"))
- }
-}
-
-// processHeartbeat is a special handler used just for heartbeat requests
-// so that they can be fast-pathed if a transport supports it. This must only
-// be called from the main thread.
-func (r *Raft) processHeartbeat(rpc RPC) {
- defer metrics.MeasureSince([]string{"raft", "rpc", "processHeartbeat"}, time.Now())
-
- // Check if we are shutdown, just ignore the RPC
- select {
- case <-r.shutdownCh:
- return
- default:
- }
-
- // Ensure we are only handling a heartbeat
- switch cmd := rpc.Command.(type) {
- case *AppendEntriesRequest:
- r.appendEntries(rpc, cmd)
- default:
- r.logger.Printf("[ERR] raft: Expected heartbeat, got command: %#v", rpc.Command)
- rpc.Respond(nil, fmt.Errorf("unexpected command"))
- }
-}
-
-// appendEntries is invoked when we get an append entries RPC call. This must
-// only be called from the main thread.
-func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
- defer metrics.MeasureSince([]string{"raft", "rpc", "appendEntries"}, time.Now())
- // Setup a response
- resp := &AppendEntriesResponse{
- RPCHeader: r.getRPCHeader(),
- Term: r.getCurrentTerm(),
- LastLog: r.getLastIndex(),
- Success: false,
- NoRetryBackoff: false,
- }
- var rpcErr error
- defer func() {
- rpc.Respond(resp, rpcErr)
- }()
-
- // Ignore an older term
- if a.Term < r.getCurrentTerm() {
- return
- }
-
- // Increase the term if we see a newer one, also transition to follower
- // if we ever get an appendEntries call
- if a.Term > r.getCurrentTerm() || r.getState() != Follower {
- // Ensure transition to follower
- r.setState(Follower)
- r.setCurrentTerm(a.Term)
- resp.Term = a.Term
- }
-
- // Save the current leader
- r.setLeader(ServerAddress(r.trans.DecodePeer(a.Leader)))
-
- // Verify the last log entry
- if a.PrevLogEntry > 0 {
- lastIdx, lastTerm := r.getLastEntry()
-
- var prevLogTerm uint64
- if a.PrevLogEntry == lastIdx {
- prevLogTerm = lastTerm
-
- } else {
- var prevLog Log
- if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil {
- r.logger.Printf("[WARN] raft: Failed to get previous log: %d %v (last: %d)",
- a.PrevLogEntry, err, lastIdx)
- resp.NoRetryBackoff = true
- return
- }
- prevLogTerm = prevLog.Term
- }
-
- if a.PrevLogTerm != prevLogTerm {
- r.logger.Printf("[WARN] raft: Previous log term mis-match: ours: %d remote: %d",
- prevLogTerm, a.PrevLogTerm)
- resp.NoRetryBackoff = true
- return
- }
- }
-
- // Process any new entries
- if len(a.Entries) > 0 {
- start := time.Now()
-
- // Delete any conflicting entries, skip any duplicates
- lastLogIdx, _ := r.getLastLog()
- var newEntries []*Log
- for i, entry := range a.Entries {
- if entry.Index > lastLogIdx {
- newEntries = a.Entries[i:]
- break
- }
- var storeEntry Log
- if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil {
- r.logger.Printf("[WARN] raft: Failed to get log entry %d: %v",
- entry.Index, err)
- return
- }
- if entry.Term != storeEntry.Term {
- r.logger.Printf("[WARN] raft: Clearing log suffix from %d to %d", entry.Index, lastLogIdx)
- if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil {
- r.logger.Printf("[ERR] raft: Failed to clear log suffix: %v", err)
- return
- }
- if entry.Index <= r.configurations.latestIndex {
- r.configurations.latest = r.configurations.committed
- r.configurations.latestIndex = r.configurations.committedIndex
- }
- newEntries = a.Entries[i:]
- break
- }
- }
-
- if n := len(newEntries); n > 0 {
- // Append the new entries
- if err := r.logs.StoreLogs(newEntries); err != nil {
- r.logger.Printf("[ERR] raft: Failed to append to logs: %v", err)
- // TODO: leaving r.getLastLog() in the wrong
- // state if there was a truncation above
- return
- }
-
- // Handle any new configuration changes
- for _, newEntry := range newEntries {
- r.processConfigurationLogEntry(newEntry)
- }
-
- // Update the lastLog
- last := newEntries[n-1]
- r.setLastLog(last.Index, last.Term)
- }
-
- metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "storeLogs"}, start)
- }
-
- // Update the commit index
- if a.LeaderCommitIndex > 0 && a.LeaderCommitIndex > r.getCommitIndex() {
- start := time.Now()
- idx := min(a.LeaderCommitIndex, r.getLastIndex())
- r.setCommitIndex(idx)
- if r.configurations.latestIndex <= idx {
- r.configurations.committed = r.configurations.latest
- r.configurations.committedIndex = r.configurations.latestIndex
- }
- r.processLogs(idx, nil)
- metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start)
- }
-
- // Everything went well, set success
- resp.Success = true
- r.setLastContact()
- return
-}
-
-// processConfigurationLogEntry takes a log entry and updates the latest
-// configuration if the entry results in a new configuration. This must only be
-// called from the main thread, or from NewRaft() before any threads have begun.
-func (r *Raft) processConfigurationLogEntry(entry *Log) {
- if entry.Type == LogConfiguration {
- r.configurations.committed = r.configurations.latest
- r.configurations.committedIndex = r.configurations.latestIndex
- r.configurations.latest = decodeConfiguration(entry.Data)
- r.configurations.latestIndex = entry.Index
- } else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
- r.configurations.committed = r.configurations.latest
- r.configurations.committedIndex = r.configurations.latestIndex
- r.configurations.latest = decodePeers(entry.Data, r.trans)
- r.configurations.latestIndex = entry.Index
- }
-}
-
-// requestVote is invoked when we get an request vote RPC call.
-func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
- defer metrics.MeasureSince([]string{"raft", "rpc", "requestVote"}, time.Now())
- r.observe(*req)
-
- // Setup a response
- resp := &RequestVoteResponse{
- RPCHeader: r.getRPCHeader(),
- Term: r.getCurrentTerm(),
- Granted: false,
- }
- var rpcErr error
- defer func() {
- rpc.Respond(resp, rpcErr)
- }()
-
- // Version 0 servers will panic unless the peers is present. It's only
- // used on them to produce a warning message.
- if r.protocolVersion < 2 {
- resp.Peers = encodePeers(r.configurations.latest, r.trans)
- }
-
- // Check if we have an existing leader [who's not the candidate]
- candidate := r.trans.DecodePeer(req.Candidate)
- if leader := r.Leader(); leader != "" && leader != candidate {
- r.logger.Printf("[WARN] raft: Rejecting vote request from %v since we have a leader: %v",
- candidate, leader)
- return
- }
-
- // Ignore an older term
- if req.Term < r.getCurrentTerm() {
- return
- }
-
- // Increase the term if we see a newer one
- if req.Term > r.getCurrentTerm() {
- // Ensure transition to follower
- r.setState(Follower)
- r.setCurrentTerm(req.Term)
- resp.Term = req.Term
- }
-
- // Check if we have voted yet
- lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm)
- if err != nil && err.Error() != "not found" {
- r.logger.Printf("[ERR] raft: Failed to get last vote term: %v", err)
- return
- }
- lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand)
- if err != nil && err.Error() != "not found" {
- r.logger.Printf("[ERR] raft: Failed to get last vote candidate: %v", err)
- return
- }
-
- // Check if we've voted in this election before
- if lastVoteTerm == req.Term && lastVoteCandBytes != nil {
- r.logger.Printf("[INFO] raft: Duplicate RequestVote for same term: %d", req.Term)
- if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 {
- r.logger.Printf("[WARN] raft: Duplicate RequestVote from candidate: %s", req.Candidate)
- resp.Granted = true
- }
- return
- }
-
- // Reject if their term is older
- lastIdx, lastTerm := r.getLastEntry()
- if lastTerm > req.LastLogTerm {
- r.logger.Printf("[WARN] raft: Rejecting vote request from %v since our last term is greater (%d, %d)",
- candidate, lastTerm, req.LastLogTerm)
- return
- }
-
- if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex {
- r.logger.Printf("[WARN] raft: Rejecting vote request from %v since our last index is greater (%d, %d)",
- candidate, lastIdx, req.LastLogIndex)
- return
- }
-
- // Persist a vote for safety
- if err := r.persistVote(req.Term, req.Candidate); err != nil {
- r.logger.Printf("[ERR] raft: Failed to persist vote: %v", err)
- return
- }
-
- resp.Granted = true
- r.setLastContact()
- return
-}
-
-// installSnapshot is invoked when we get a InstallSnapshot RPC call.
-// We must be in the follower state for this, since it means we are
-// too far behind a leader for log replay. This must only be called
-// from the main thread.
-func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
- defer metrics.MeasureSince([]string{"raft", "rpc", "installSnapshot"}, time.Now())
- // Setup a response
- resp := &InstallSnapshotResponse{
- Term: r.getCurrentTerm(),
- Success: false,
- }
- var rpcErr error
- defer func() {
- rpc.Respond(resp, rpcErr)
- }()
-
- // Sanity check the version
- if req.SnapshotVersion < SnapshotVersionMin ||
- req.SnapshotVersion > SnapshotVersionMax {
- rpcErr = fmt.Errorf("unsupported snapshot version %d", req.SnapshotVersion)
- return
- }
-
- // Ignore an older term
- if req.Term < r.getCurrentTerm() {
- return
- }
-
- // Increase the term if we see a newer one
- if req.Term > r.getCurrentTerm() {
- // Ensure transition to follower
- r.setState(Follower)
- r.setCurrentTerm(req.Term)
- resp.Term = req.Term
- }
-
- // Save the current leader
- r.setLeader(ServerAddress(r.trans.DecodePeer(req.Leader)))
-
- // Create a new snapshot
- var reqConfiguration Configuration
- var reqConfigurationIndex uint64
- if req.SnapshotVersion > 0 {
- reqConfiguration = decodeConfiguration(req.Configuration)
- reqConfigurationIndex = req.ConfigurationIndex
- } else {
- reqConfiguration = decodePeers(req.Peers, r.trans)
- reqConfigurationIndex = req.LastLogIndex
- }
- version := getSnapshotVersion(r.protocolVersion)
- sink, err := r.snapshots.Create(version, req.LastLogIndex, req.LastLogTerm,
- reqConfiguration, reqConfigurationIndex, r.trans)
- if err != nil {
- r.logger.Printf("[ERR] raft: Failed to create snapshot to install: %v", err)
- rpcErr = fmt.Errorf("failed to create snapshot: %v", err)
- return
- }
-
- // Spill the remote snapshot to disk
- n, err := io.Copy(sink, rpc.Reader)
- if err != nil {
- sink.Cancel()
- r.logger.Printf("[ERR] raft: Failed to copy snapshot: %v", err)
- rpcErr = err
- return
- }
-
- // Check that we received it all
- if n != req.Size {
- sink.Cancel()
- r.logger.Printf("[ERR] raft: Failed to receive whole snapshot: %d / %d", n, req.Size)
- rpcErr = fmt.Errorf("short read")
- return
- }
-
- // Finalize the snapshot
- if err := sink.Close(); err != nil {
- r.logger.Printf("[ERR] raft: Failed to finalize snapshot: %v", err)
- rpcErr = err
- return
- }
- r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n)
-
- // Restore snapshot
- future := &restoreFuture{ID: sink.ID()}
- future.init()
- select {
- case r.fsmMutateCh <- future:
- case <-r.shutdownCh:
- future.respond(ErrRaftShutdown)
- return
- }
-
- // Wait for the restore to happen
- if err := future.Error(); err != nil {
- r.logger.Printf("[ERR] raft: Failed to restore snapshot: %v", err)
- rpcErr = err
- return
- }
-
- // Update the lastApplied so we don't replay old logs
- r.setLastApplied(req.LastLogIndex)
-
- // Update the last stable snapshot info
- r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm)
-
- // Restore the peer set
- r.configurations.latest = reqConfiguration
- r.configurations.latestIndex = reqConfigurationIndex
- r.configurations.committed = reqConfiguration
- r.configurations.committedIndex = reqConfigurationIndex
-
- // Compact logs, continue even if this fails
- if err := r.compactLogs(req.LastLogIndex); err != nil {
- r.logger.Printf("[ERR] raft: Failed to compact logs: %v", err)
- }
-
- r.logger.Printf("[INFO] raft: Installed remote snapshot")
- resp.Success = true
- r.setLastContact()
- return
-}
-
-// setLastContact is used to set the last contact time to now
-func (r *Raft) setLastContact() {
- r.lastContactLock.Lock()
- r.lastContact = time.Now()
- r.lastContactLock.Unlock()
-}
-
-type voteResult struct {
- RequestVoteResponse
- voterID ServerID
-}
-
-// electSelf is used to send a RequestVote RPC to all peers, and vote for
-// ourself. This has the side affecting of incrementing the current term. The
-// response channel returned is used to wait for all the responses (including a
-// vote for ourself). This must only be called from the main thread.
-func (r *Raft) electSelf() <-chan *voteResult {
- // Create a response channel
- respCh := make(chan *voteResult, len(r.configurations.latest.Servers))
-
- // Increment the term
- r.setCurrentTerm(r.getCurrentTerm() + 1)
-
- // Construct the request
- lastIdx, lastTerm := r.getLastEntry()
- req := &RequestVoteRequest{
- RPCHeader: r.getRPCHeader(),
- Term: r.getCurrentTerm(),
- Candidate: r.trans.EncodePeer(r.localAddr),
- LastLogIndex: lastIdx,
- LastLogTerm: lastTerm,
- }
-
- // Construct a function to ask for a vote
- askPeer := func(peer Server) {
- r.goFunc(func() {
- defer metrics.MeasureSince([]string{"raft", "candidate", "electSelf"}, time.Now())
- resp := &voteResult{voterID: peer.ID}
- err := r.trans.RequestVote(peer.Address, req, &resp.RequestVoteResponse)
- if err != nil {
- r.logger.Printf("[ERR] raft: Failed to make RequestVote RPC to %v: %v", peer, err)
- resp.Term = req.Term
- resp.Granted = false
- }
- respCh <- resp
- })
- }
-
- // For each peer, request a vote
- for _, server := range r.configurations.latest.Servers {
- if server.Suffrage == Voter {
- if server.ID == r.localID {
- // Persist a vote for ourselves
- if err := r.persistVote(req.Term, req.Candidate); err != nil {
- r.logger.Printf("[ERR] raft: Failed to persist vote : %v", err)
- return nil
- }
- // Include our own vote
- respCh <- &voteResult{
- RequestVoteResponse: RequestVoteResponse{
- RPCHeader: r.getRPCHeader(),
- Term: req.Term,
- Granted: true,
- },
- voterID: r.localID,
- }
- } else {
- askPeer(server)
- }
- }
- }
-
- return respCh
-}
-
-// persistVote is used to persist our vote for safety.
-func (r *Raft) persistVote(term uint64, candidate []byte) error {
- if err := r.stable.SetUint64(keyLastVoteTerm, term); err != nil {
- return err
- }
- if err := r.stable.Set(keyLastVoteCand, candidate); err != nil {
- return err
- }
- return nil
-}
-
-// setCurrentTerm is used to set the current term in a durable manner.
-func (r *Raft) setCurrentTerm(t uint64) {
- // Persist to disk first
- if err := r.stable.SetUint64(keyCurrentTerm, t); err != nil {
- panic(fmt.Errorf("failed to save current term: %v", err))
- }
- r.raftState.setCurrentTerm(t)
-}
-
-// setState is used to update the current state. Any state
-// transition causes the known leader to be cleared. This means
-// that leader should be set only after updating the state.
-func (r *Raft) setState(state RaftState) {
- r.setLeader("")
- oldState := r.raftState.getState()
- r.raftState.setState(state)
- if oldState != state {
- r.observe(state)
- }
-}