diff --git a/integ_test.go b/integ_test.go index 61557a35..f5bb6512 100644 --- a/integ_test.go +++ b/integ_test.go @@ -5,6 +5,7 @@ package raft import ( "bytes" + "context" "fmt" "os" "sync/atomic" @@ -490,3 +491,98 @@ func TestRaft_RestartFollower_LongInitialHeartbeat(t *testing.T) { }) } } + +// TestRaft_PreVote_LeaderSpam test that when a leader spam the followers +// with pre-vote requests they can still transition to candidate. +// The reason this test need to live in here is that we need the transport heartbeat fast-path +// to use as a trick to avoid heartbeat keeping the cluster stable. +// That fast-path only exists in the net transport. +func TestRaft_PreVote_LeaderSpam(t *testing.T) { + CheckInteg(t) + conf := DefaultConfig() + conf.LocalID = ServerID("first") + conf.HeartbeatTimeout = 50 * time.Millisecond + conf.ElectionTimeout = 50 * time.Millisecond + conf.LeaderLeaseTimeout = 50 * time.Millisecond + conf.CommitTimeout = 5 * time.Second + conf.SnapshotThreshold = 100 + conf.TrailingLogs = 10 + + // Create a single node + leader := MakeRaft(t, conf, true) + NoErr(WaitFor(leader, Leader), t) + + // Join a few nodes! + var followers []*RaftEnv + for i := 0; i < 2; i++ { + conf.LocalID = ServerID(fmt.Sprintf("next-batch-%d", i)) + env := MakeRaft(t, conf, false) + addr := env.trans.LocalAddr() + NoErr(WaitFuture(leader.raft.AddVoter(conf.LocalID, addr, 0, 0)), t) + followers = append(followers, env) + } + + // Wait for a leader + _, err := WaitForAny(Leader, append([]*RaftEnv{leader}, followers...)) + NoErr(err, t) + + CheckConsistent(append([]*RaftEnv{leader}, followers...), t) + + leaderT := leader.raft.trans + + // spam all the followers with pre-vote requests from the leader + // those requests should be granted as long as the leader haven't changed. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + for { + ticker := time.NewTicker(conf.HeartbeatTimeout / 2) + for _, f := range followers { + rsp := RequestPreVoteResponse{} + reqPreVote := RequestPreVoteRequest{ + RPCHeader: leader.raft.getRPCHeader(), + Term: leader.raft.getCurrentTerm() + 1, + LastLogIndex: leader.raft.getLastIndex(), + LastLogTerm: leader.raft.getCurrentTerm(), + } + // We don't need to check the error here because when leader change + // it will start failing with "rejecting pre-vote request since we have a leader" + _ = leaderT.(WithPreVote).RequestPreVote(f.raft.localID, f.raft.localAddr, &reqPreVote, &rsp) + } + select { + case <-ticker.C: + case <-ctx.Done(): + return + } + } + }() + time.Sleep(time.Second) + + // for all followers ignore heartbeat from current leader, so we can transition to candidate state. + // the purpose of this test is to verify that spamming nodes with pre-votes don't cause them to never + // transition to Candidates. + for _, f := range followers { + //copy f to avoid data race + f1 := f + f1.trans.SetHeartbeatHandler(func(rpc RPC) { + if a, ok := rpc.Command.(*AppendEntriesRequest); ok { + if ServerID(a.GetRPCHeader().ID) == leader.raft.localID { + resp := &AppendEntriesResponse{ + RPCHeader: f1.raft.getRPCHeader(), + Term: f1.raft.getCurrentTerm(), + LastLog: f1.raft.getLastIndex(), + Success: false, + NoRetryBackoff: false, + } + rpc.Respond(resp, nil) + } else { + f.raft.processHeartbeat(rpc) + } + } + }) + } + time.Sleep(1 * time.Second) + // New leader should be one of the former followers. + _, err = WaitForAny(Leader, followers) + NoErr(err, t) +} diff --git a/raft.go b/raft.go index 1ebcef1a..cbc9a59a 100644 --- a/raft.go +++ b/raft.go @@ -1805,7 +1805,6 @@ func (r *Raft) requestPreVote(rpc RPC, req *RequestPreVoteRequest) { } resp.Granted = true - r.setLastContact() } // installSnapshot is invoked when we get a InstallSnapshot RPC call.