Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support checking assertion of Leadership #624

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ type Raft struct {
// to verify we are still the leader
verifyCh chan *verifyFuture

// assertedCh is used to check if the leader has fully asserted leadership
// by committing an entry in this term.
assertedCh chan *assertedFuture

// configurationsCh is used to get the configuration data safely from
// outside of the main thread.
configurationsCh chan *configurationsFuture
Expand Down Expand Up @@ -558,6 +562,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
stable: stable,
trans: trans,
verifyCh: make(chan *verifyFuture, 64),
assertedCh: make(chan *assertedFuture, 64),
configurationsCh: make(chan *configurationsFuture, 8),
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
Expand Down Expand Up @@ -883,6 +888,24 @@ func (r *Raft) VerifyLeader() Future {
}
}

// VerifyAssertedLeadership is used to check if the node, if acting as a leader,
// has fully asserted leadership by committing an entry in this term. The returned
// future indicates the term for which leadership has been asserted.
//
// Since this call may be relatively slow, it is not suitable for code paths that requir
// low latency. One suggested way to use it is to call this once per term, storing the
// returned state in a variable. Once leadership has been asserted for a term, the state
// will not change, so there is no need to call this again for the same term.
//
// Must be run on the leader, or it will fail.
func (r *Raft) VerifyAssertedLeadership() AssertedFuture {
metrics.IncrCounter([]string{"raft", "verify_asserted_leadership"}, 1)
assertedFuture := &assertedFuture{}
assertedFuture.init()
r.assertedCh <- assertedFuture
return assertedFuture
}

// GetConfiguration returns the latest configuration. This may not yet be
// committed. The main loop can access this directly.
func (r *Raft) GetConfiguration() ConfigurationFuture {
Expand Down
26 changes: 26 additions & 0 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ type ApplyFuture interface {
Response() interface{}
}

// AssertedFuture is used to check if the current node has asserted leadership
// for the current term by committing at least one entry in this term.
type AssertedFuture interface {
Future

// Asserted returns true if the current node has asserted leadership.
Asserted() bool

// Term returns the term in which the leadership was asserted.
Term() uint64
}

// ConfigurationFuture is used for GetConfiguration and can return the
// latest configuration in use by Raft.
type ConfigurationFuture interface {
Expand Down Expand Up @@ -243,6 +255,20 @@ type verifyFuture struct {
voteLock sync.Mutex
}

type assertedFuture struct {
deferError
asserted bool
term uint64
}

func (a *assertedFuture) Asserted() bool {
return a.asserted
}

func (a *assertedFuture) Term() uint64 {
return a.term
}

// leadershipTransferFuture is used to track the progress of a leadership
// transfer internally.
type leadershipTransferFuture struct {
Expand Down
21 changes: 21 additions & 0 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func (r *Raft) runFollower() {
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case v := <-r.assertedCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case ur := <-r.userRestoreCh:
r.mainThreadSaturation.working()
// Reject any restores since we are not the leader
Expand Down Expand Up @@ -398,6 +403,11 @@ func (r *Raft) runCandidate() {
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case v := <-r.assertedCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case ur := <-r.userRestoreCh:
r.mainThreadSaturation.working()
// Reject any restores since we are not the leader
Expand Down Expand Up @@ -676,6 +686,8 @@ func (r *Raft) leaderLoop() {
// based on the current config value.
lease := time.After(r.config().LeaderLeaseTimeout)

leadershipAsserted := false

for r.getState() == Leader {
r.mainThreadSaturation.sleeping()

Expand Down Expand Up @@ -788,6 +800,9 @@ func (r *Raft) leaderLoop() {
oldCommitIndex := r.getCommitIndex()
commitIndex := r.leaderState.commitment.getCommitIndex()
r.setCommitIndex(commitIndex)
if !leadershipAsserted {
leadershipAsserted = true
}

// New configuration has been committed, set it as the committed
// value.
Expand Down Expand Up @@ -869,6 +884,12 @@ func (r *Raft) leaderLoop() {
v.respond(nil)
}

case v := <-r.assertedCh:
r.mainThreadSaturation.working()
v.asserted = leadershipAsserted
v.term = r.getCurrentTerm()
v.respond(nil)

case future := <-r.userRestoreCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
Expand Down
81 changes: 81 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,87 @@ func TestRaft_VerifyLeader_PartialConnect(t *testing.T) {
}
}

func TestRaft_VerifyAssertedLeadership(t *testing.T) {
// Make the cluster
c := MakeCluster(3, t, nil)
defer c.Close()

// Get the leader
leader := c.Leader()

time.Sleep(c.propagateTimeout)

// Check that leadership has been asserted.
asserted := leader.VerifyAssertedLeadership()

// Wait for the asserted to return
if err := asserted.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Verify that leadership has been asserted for the
// correct term.
if asserted.Asserted() != true {
t.Fatalf("expected leadership to be asserted")
}
if asserted.Term() != leader.getCurrentTerm() {
t.Fatalf("expected term %d, got %d", leader.getCurrentTerm(), asserted.Term())
}
}

func TestRaft_VerifyAssertedLeadership_Single(t *testing.T) {
// Make the cluster
c := MakeCluster(1, t, nil)
defer c.Close()

// Get the leader
leader := c.Leader()

time.Sleep(c.propagateTimeout)

// Check that leadership has been asserted.
asserted := leader.VerifyAssertedLeadership()

// Wait for the asserted to return
if err := asserted.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Verify that leadership has been asserted for the
// correct term.
if asserted.Asserted() != true {
t.Fatalf("expected leadership to be asserted")
}
if asserted.Term() != leader.getCurrentTerm() {
t.Fatalf("expected term %d, got %d", leader.getCurrentTerm(), asserted.Term())
}
}

func TestRaft_VerifyAssertedLeadership_Fail(t *testing.T) {
// Make the cluster
c := MakeCluster(3, t, nil)
defer c.Close()

// Get the leader
leader := c.Leader()

time.Sleep(c.propagateTimeout)

// Check that leadership has been asserted.
asserted := leader.VerifyAssertedLeadership()

// Wait for the asserted to return
if err := asserted.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Confirm that an error is received when called on a Follower.
follower := c.Followers()[0]
if err := follower.VerifyAssertedLeadership().Error(); err == nil {
t.Fatal("expected error asserting leadership on a Follower")
}
}

func TestRaft_NotifyCh(t *testing.T) {
ch := make(chan bool, 1)
conf := inmemConfig(t)
Expand Down
Loading