-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhancement: persist commit index in LogStore to accelerate recovery #613
base: main
Are you sure you want to change the base?
Changes from 25 commits
2e5a8a0
ffc6b3b
7383d96
f6295e0
f2ae7a9
ce1895c
ab50a58
4e7e04b
41df55e
400a27d
e2617e8
6daca47
cc09317
fe57b32
20e8701
6f146e1
a8438b0
5e6d8a4
e248f00
2a913ab
7cd6732
92c04a0
8e8ba07
2020cab
2a7d584
bdac45b
ed47a25
ad87d86
30fc43e
e797962
500567f
cfffcb5
560c0b9
8c722fa
300a6e7
8d11a28
1bdf161
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -217,6 +217,10 @@ type Raft struct { | |
// preVoteDisabled control if the pre-vote feature is activated, | ||
// prevote feature is disabled if set to true. | ||
preVoteDisabled bool | ||
|
||
// fastRecovery is used to enable fast recovery mode | ||
// fast recovery mode is disabled if set to false. | ||
fastRecovery bool | ||
} | ||
|
||
// BootstrapCluster initializes a server's storage with the given cluster | ||
|
@@ -566,6 +570,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna | |
followerNotifyCh: make(chan struct{}, 1), | ||
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second), | ||
preVoteDisabled: conf.PreVoteDisabled || !transportSupportPreVote, | ||
fastRecovery: conf.FastRecovery, | ||
} | ||
if !transportSupportPreVote && !conf.PreVoteDisabled { | ||
r.logger.Warn("pre-vote is disabled because it is not supported by the Transport") | ||
|
@@ -585,9 +590,12 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna | |
return nil, err | ||
} | ||
|
||
r.recoverFromCommittedLogs() | ||
|
||
// Scan through the log for any configuration change entries. | ||
snapshotIndex, _ := r.getLastSnapshot() | ||
for index := snapshotIndex + 1; index <= lastLog.Index; index++ { | ||
lastappliedIndex := r.getLastApplied() | ||
for index := max(snapshotIndex, lastappliedIndex) + 1; index <= lastLog.Index; index++ { | ||
var entry Log | ||
if err := r.logs.GetLog(index, &entry); err != nil { | ||
r.logger.Error("failed to get log", "index", index, "error", err) | ||
|
@@ -697,6 +705,39 @@ func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool { | |
return true | ||
} | ||
|
||
// recoverFromCommittedLogs recovers the Raft node from committed logs. | ||
func (r *Raft) recoverFromCommittedLogs() { | ||
if !r.fastRecovery { | ||
return | ||
} | ||
|
||
// If the store implements CommitTrackingLogStore, we can read the commit index from the store. | ||
// This is useful when the store is able to track the commit index and we can avoid replaying logs. | ||
store, ok := r.logs.(CommitTrackingLogStore) | ||
if !ok { | ||
r.logger.Warn("fast recovery enabled but log store does not support it", "log_store", fmt.Sprintf("%T", r.logs)) | ||
return | ||
} | ||
|
||
commitIndex, err := store.GetCommitIndex() | ||
if err != nil { | ||
r.logger.Error("failed to get commit index from store", "error", err) | ||
panic(err) | ||
} | ||
|
||
lastIndex, err := r.logs.LastIndex() | ||
if err != nil { | ||
r.logger.Error("failed to get last log index from store", "error", err) | ||
panic(err) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we fallback to the non fast recovery path instead of panic? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 Great question. For transient errors it's probably better to hard stop rather than silently weaken the expected guarantees. Someone who has enabled this feature (and provided a supported logstore) should be able to assume that by the time raft has started (without error), their FSM is at least as up-to-date as it was before a restart. Silently falling back seems like it makes it impossible to really trust that guarantee and may mean similar bugs that we are trying to prevent are still possible with no way to mitigate them (even if they are rarer). In most use cases where each server is using this library as a core of it's functionality (i.e. all our products) and only uses a single raft group initialised during startup, a panic is probably reasonable too because then we crash and the supervisor will restart and if it was a transient error then great and if not it's no different to what will happen later when we are unable to read logs. That said, in writing this I realised it's probably too strong of a decision to make in a library here: it would be possible for a server process to manage multiple raft instances for example and a fatal error in one of them shouldn't terminate the whole process. So I think I'd vote for making these cases return a hard error from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess returning an error from I think we should implement this as a specific error type that we publish on the API to allow calling |
||
if commitIndex > lastIndex { | ||
commitIndex = lastIndex | ||
} | ||
|
||
r.setCommitIndex(commitIndex) | ||
r.processLogs(commitIndex, nil) | ||
} | ||
|
||
func (r *Raft) config() Config { | ||
return r.conf.Load().(Config) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -235,6 +235,16 @@ type Config struct { | |
// PreVoteDisabled deactivate the pre-vote feature when set to true | ||
PreVoteDisabled bool | ||
|
||
// FastRecovery controls if the Raft server should use the fast recovery | ||
// mechanism. Fast recovery requires a LogStore implementation that | ||
// support commit tracking. When such a store is used and this config | ||
// enabled, raft nodes will replay all known-committed logs on disk | ||
// before completing `NewRaft` on startup. This is mainly useful where | ||
// the application allows relaxed-consistency reads from followers as it | ||
// will reduce how far behind the follower's FSM is when it starts. If all reads | ||
// are forwarded to the leader then there won't be observable benefit from this feature. | ||
FastRecovery bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if this is a naming nitpick or just a question: From the perspective of a caller to NewRaft, IIUC the "fast" aspect is due to more logs being replayed locally instead of streamed from a peer. Logs committed while this member was down will need to be streamed, but presumably that's often a fraction of the total log size. If my understanding is accurate, an alternative name might be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is great point actually. I'm not sure "Fast" does capture the semantics in any case really: mostly startup will take marginally to a lot longer, but on the plus side the FSM will actually startup in the same state it was before the node restarted which is probably what most users of the library assumed was the case already!
Yeah I think you perfectly described the tradeoff. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @schmichael @banks How about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naming is the worst. 😅 Looking around a bit we're not totally consistent, but I think generally:
So I think switching After that I think I prefer |
||
|
||
// skipStartup allows NewRaft() to bypass all background work goroutines | ||
skipStartup bool | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -190,3 +190,24 @@ func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, st | |
} | ||
} | ||
} | ||
|
||
type CommitTrackingLogStore interface { | ||
LogStore | ||
|
||
// StageCommitIndex stages a new commit index to be persisted. | ||
// The staged commit index MUST only be persisted in a manner that is atomic | ||
// with the following StoreLogs call in the face of a crash. | ||
// This allows the Raft implementation to optimize commit index updates | ||
// without risking inconsistency between the commit index and the log entries. | ||
// | ||
// The implementation MUST NOT persist this value separately from the log entries. | ||
// Instead, it should stage the value to be written atomically with the next | ||
// StoreLogs call. | ||
// | ||
// GetCommitIndex MUST never return a value higher than the last index in the log, | ||
// even if a higher value has been staged with this method. | ||
// | ||
// idx is the new commit index to stage. | ||
StageCommitIndex(idx uint64) error | ||
dhiaayachi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
GetCommitIndex() (uint64, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How would a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For BoltDB, I imagine commit index would be a single KV in a separate bucket from logs so it would just read that and return it. For WAL I anticipated extending the format slightly so that each commit entry in the log stores the most recently staged commit index and then re-populated that into memory when we open the log an scan it like we do with indexes. If there is no commit index stored, we should just return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree! I think that should be documented though. Because the API allow erroring on |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1095,6 +1095,184 @@ func TestRaft_RestoreSnapshotOnStartup_Monotonic(t *testing.T) { | |
assert.Equal(t, lastIdx, last) | ||
} | ||
|
||
func TestRaft_RestoreSnapshotOnStartup_CommitTrackingLogs(t *testing.T) { | ||
// Make the cluster | ||
conf := inmemConfig(t) | ||
conf.TrailingLogs = 10 | ||
opts := &MakeClusterOpts{ | ||
Peers: 1, | ||
Bootstrap: true, | ||
Conf: conf, | ||
CommitTrackingLogs: true, | ||
} | ||
c := MakeClusterCustom(t, opts) | ||
defer c.Close() | ||
|
||
leader := c.Leader() | ||
|
||
// Commit a lot of things | ||
var future Future | ||
for i := 0; i < 100; i++ { | ||
future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) | ||
} | ||
|
||
// Wait for the last future to apply | ||
if err := future.Error(); err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
|
||
// Take a snapshot | ||
snapFuture := leader.Snapshot() | ||
if err := snapFuture.Error(); err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
|
||
// Check for snapshot | ||
snaps, _ := leader.snapshots.List() | ||
if len(snaps) != 1 { | ||
t.Fatalf("should have a snapshot") | ||
} | ||
snap := snaps[0] | ||
|
||
// Logs should be trimmed | ||
firstIdx, err := leader.logs.FirstIndex() | ||
if err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
lastIdx, err := leader.logs.LastIndex() | ||
if err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
|
||
if firstIdx != snap.Index-conf.TrailingLogs+1 { | ||
t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, firstIdx) | ||
} | ||
|
||
// Shutdown | ||
shutdown := leader.Shutdown() | ||
if err := shutdown.Error(); err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
|
||
// Restart the Raft | ||
r := leader | ||
// Can't just reuse the old transport as it will be closed | ||
_, trans2 := NewInmemTransport(r.trans.LocalAddr()) | ||
cfg := r.config() | ||
r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) | ||
if err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
c.rafts[0] = r | ||
|
||
// We should have restored from the snapshot! | ||
if last := r.getLastApplied(); last != snap.Index { | ||
t.Fatalf("bad last index: %d, expecting %d", last, snap.Index) | ||
} | ||
|
||
// Verify that logs have not been reset | ||
first, _ := r.logs.FirstIndex() | ||
last, _ := r.logs.LastIndex() | ||
assert.Equal(t, firstIdx, first) | ||
assert.Equal(t, lastIdx, last) | ||
} | ||
|
||
func TestRaft_FastRecovery(t *testing.T) { | ||
// Make the cluster | ||
conf := inmemConfig(t) | ||
conf.TrailingLogs = 10 | ||
conf.FastRecovery = true | ||
opts := &MakeClusterOpts{ | ||
Peers: 1, | ||
Bootstrap: true, | ||
Conf: conf, | ||
CommitTrackingLogs: true, | ||
} | ||
c := MakeClusterCustom(t, opts) | ||
defer c.Close() | ||
|
||
leader := c.Leader() | ||
|
||
// Commit a lot of things | ||
var future Future | ||
for i := 0; i < 100; i++ { | ||
future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) | ||
} | ||
|
||
// Wait for the last future to apply | ||
if err := future.Error(); err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
|
||
// Take a snapshot | ||
snapFuture := leader.Snapshot() | ||
if err := snapFuture.Error(); err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
|
||
// Check for snapshot | ||
snaps, _ := leader.snapshots.List() | ||
if len(snaps) != 1 { | ||
t.Fatalf("should have a snapshot") | ||
} | ||
snap := snaps[0] | ||
|
||
// Logs should be trimmed | ||
firstIdx, err := leader.logs.FirstIndex() | ||
if err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
|
||
if firstIdx != snap.Index-conf.TrailingLogs+1 { | ||
t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, firstIdx) | ||
} | ||
|
||
// Commit a lot of things (for fast recovery test) | ||
for i := 0; i < 100; i++ { | ||
future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) | ||
} | ||
|
||
// Wait for the last future to apply | ||
if err := future.Error(); err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
|
||
// Shutdown | ||
shutdown := leader.Shutdown() | ||
if err := shutdown.Error(); err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
|
||
// Restart the Raft | ||
r := leader | ||
// Can't just reuse the old transport as it will be closed | ||
_, trans2 := NewInmemTransport(r.trans.LocalAddr()) | ||
cfg := r.config() | ||
r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) | ||
if err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
c.rafts[0] = r | ||
|
||
store, ok := r.logs.(CommitTrackingLogStore) | ||
if !ok { | ||
t.Fatal("err: raft log store does not implement CommitTrackingLogStore interface") | ||
} | ||
Comment on lines
+1260
to
+1263
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 This is fine, but I'd probably have left it like it was before. The behaviour of an unchecked type assert like we had before (i.e. no We can leave it like this for now though I guess at least it doesn't stop the entire test run which is arguably nicer although something that was a programming mistake that would never work is fine to panic on in tests IMO. |
||
commitIdx, err := store.GetCommitIndex() | ||
// We should have applied all committed logs | ||
if last := r.getLastApplied(); last != commitIdx { | ||
t.Fatalf("bad last index: %d, expecting %d", last, commitIdx) | ||
} | ||
|
||
// Expect: snap.Index --- commitIdx --- lastIdx | ||
lastIdx, err := r.logs.LastIndex() | ||
if err != nil { | ||
t.Fatalf("err: %v", err) | ||
} | ||
assert.LessOrEqual(t, snap.Index, commitIdx) | ||
assert.LessOrEqual(t, commitIdx, lastIdx) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm assuming these tests are failing right now because the InmemStore wasn't updated to match the new interface right? If they are passing for you then it might be worth a look! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It passes😱 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @banks Back to this topic, I think this test case is ok🤔. Because basically the commit index stored in the store would always lower than the last log index in store (there is always one As the comment "Expect: snap.Index --- commitIdx --- lastIdx" leaved, I think we can't sure what is the exact position of the What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 I think the assertion is fine as it is, the real issue seems to be the line above:
I'm not sure how that didn't just panic if the log store actually didn't implement the interface 🤷 . I don't think we need to change the assertions. |
||
|
||
func TestRaft_SnapshotRestore_Progress(t *testing.T) { | ||
// Make the cluster | ||
conf := inmemConfig(t) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as we're considering returning an error below instead of panicking, I think we should consider doing so here as well. This is going to be a "programmer error" rather than a runtime error -- the consumer of the library should be ensuring they're passing a compatible combination of log store and
FastRecovery
configuration.