Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: persist commit index in LogStore to accelerate recovery #613

Open
wants to merge 37 commits into
base: main
Choose a base branch
from

Conversation

lalalalatt
Copy link

@lalalalatt lalalalatt commented Sep 1, 2024

@lalalalatt lalalalatt requested review from a team as code owners September 1, 2024 08:25
@lalalalatt lalalalatt requested review from rboyer and removed request for a team September 1, 2024 08:25
Copy link

hashicorp-cla-app bot commented Sep 1, 2024

CLA assistant check
All committers have signed the CLA.

@lalalalatt
Copy link
Author

lalalalatt commented Sep 1, 2024

@banks This is first PR for #549.
Thanks for spending your time on reviewing this~

@lalalalatt
Copy link
Author

lalalalatt commented Sep 1, 2024

Proposal for next PR:
Persist commit index every time processLogs is called. (commit index changes only happened at there)

raft/raft.go

Lines 1292 to 1359 in 42d3446

func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
// Reject logs we've applied already
lastApplied := r.getLastApplied()
if index <= lastApplied {
r.logger.Warn("skipping application of old log", "index", index)
return
}
applyBatch := func(batch []*commitTuple) {
select {
case r.fsmMutateCh <- batch:
case <-r.shutdownCh:
for _, cl := range batch {
if cl.future != nil {
cl.future.respond(ErrRaftShutdown)
}
}
}
}
// Store maxAppendEntries for this call in case it ever becomes reloadable. We
// need to use the same value for all lines here to get the expected result.
maxAppendEntries := r.config().MaxAppendEntries
batch := make([]*commitTuple, 0, maxAppendEntries)
// Apply all the preceding logs
for idx := lastApplied + 1; idx <= index; idx++ {
var preparedLog *commitTuple
// Get the log, either from the future or from our log store
future, futureOk := futures[idx]
if futureOk {
preparedLog = r.prepareLog(&future.log, future)
} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {
r.logger.Error("failed to get log", "index", idx, "error", err)
panic(err)
}
preparedLog = r.prepareLog(l, nil)
}
switch {
case preparedLog != nil:
// If we have a log ready to send to the FSM add it to the batch.
// The FSM thread will respond to the future.
batch = append(batch, preparedLog)
// If we have filled up a batch, send it to the FSM
if len(batch) >= maxAppendEntries {
applyBatch(batch)
batch = make([]*commitTuple, 0, maxAppendEntries)
}
case futureOk:
// Invoke the future if given.
future.respond(nil)
}
}
// If there are any remaining logs in the batch apply them
if len(batch) != 0 {
applyBatch(batch)
}
// Update the lastApplied index and term
r.setLastApplied(index)
}

func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) { 
 	// Reject logs we've applied already 
 	lastApplied := r.getLastApplied() 
 	if index <= lastApplied { 
 		r.logger.Warn("skipping application of old log", "index", index) 
 		return 
 	} 

+       if r.fastRecovery && isCommitTrackingLogStore(r.logs) {
+               store := r.logs.(CommitTrackingLogStore)
+               if err = store.SetCommitIndex(index) {
+                       // show some error msg
+               }
+       }
  
 	....
  
 	// Update the lastApplied index and term 
 	r.setLastApplied(index) 
 }

@otoolep
Copy link
Contributor

otoolep commented Sep 1, 2024

As a long-term user of this library, this could be useful. However I would strongly recommend that this functionality be wrapped in a flag, which is settable in the Raft Config object (similar to NoSnapshotRestoreOnStart), and be disabled by default. Systems built on Raft need to be solid, so taking a conservative approach is warranted.

@lalalalatt
Copy link
Author

lalalalatt commented Sep 2, 2024

As a long-term user of this library, this could be useful. However I would strongly recommend that this functionality be wrapped in a flag, which is settable in the Raft Config object (similar to NoSnapshotRestoreOnStart), and be disabled by default. Systems built on Raft need to be solid, so taking a conservative approach is warranted.

@otoolep Thanks for the suggestion, I would add r.fastRecovery as the feature flag~

Copy link
Member

@banks banks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for starting on this @lalalalatt.

The interface looks good. I have a comment inline and given that I'm suggesting removing half the lines in this PR, it might make sense to also add the change you proposed for your "next PR" into this one?

I also think it might be worth rebasing this PR on to a feature branch like f/fast-recovery so we can keep PRs small to review but not merge code into main until it's a working feature that can be released. (I realise this would be a no-op, but still if we don't complete the feature it will be unused code that will need later cleanup or completion.)

What do you think of that approach?

log.go Outdated Show resolved Hide resolved
@lalalalatt
Copy link
Author

I also think it might be worth rebasing this PR on to a feature branch like f/fast-recovery so we can keep PRs small to review but not merge code into main until it's a working feature that can be released. (I realise this would be a no-op, but still if we don't complete the feature it will be unused code that will need later cleanup or completion.)

@banks Sure, that sounds like a good plan. Could you help me create the f/fast-recovery branch? Thanks!

@otoolep
Copy link
Contributor

otoolep commented Sep 3, 2024

Drive-by comment.

@banks Sure, that sounds like a good plan. Could you help me create the f/fast-recovery branch? Thanks!

That's not the right way to think about the development flow (not unless this repo is managed in some unusual way). You create the branch in your own fork of this repo, and then generate a PR from that branch in your fork to the main branch in this repo.

@banks
Copy link
Member

banks commented Sep 3, 2024

@otoolep in general that is usually how GH works. In this case I wonder if we should have a long running branch here so that we can keep PRs small and review the changes in small parts rather than wait until the entire feature is built and have to review it all in one huge PR from the fork to here.

One way to do that would be for me to create a long-lived feature branch here, another would be for us to review PRs in a fork but that leaves all the interim review in a separate place 🤔 .

On reflection. I'm not sure what is gained by trying to make this many small PRs yet. Let's continue work here and see how large this PR gets as more of the code is built before we worry too much about making feature branches. Sorry for the suggestion that wasn't very well thought through!

@lalalalatt
Copy link
Author

On reflection. I'm not sure what is gained by trying to make this many small PRs yet. Let's continue work here and see how large this PR gets as more of the code is built before we worry too much about making feature branches. Sorry for the suggestion that wasn't very well thought through!

Ok, looks good!

- Introduced a `fastRecovery` flag in the Raft structure and configuration to enable fast recovery mode.
- Updated `NewRaft` to initialize `fastRecovery` from the configuration.
- Added `persistCommitIndex` function to store the commit index when fast recovery is enabled.
- Modified `processLogs` to persist the commit index before updating `lastApplied`.
- Documented the `FastRecovery` option in the config.
- Implemented `recoverFromCommitedLogs` function to recover the Raft node from committed logs.
- If `fastRecovery` is enabled and the log store implements `CommitTrackingLogStore`, the commit index is read from the store, avoiding the need to replay logs.
- Logs between the last applied and commit index are fed into the FSM for faster recovery.
…ency

- Refactor `ReadCommitIndex` to `GetCommitIndex` across `LogStore` and `InmemCommitTrackingStore`.
- Introduce `InmemCommitTrackingStore` to track commit index in memory for testing purposes.
- Add locking mechanism to safely read/write commit index in `InmemCommitTrackingStore`.
@lalalalatt lalalalatt changed the title Add CommitTrackingLogStore and its checker in LogStore Enhancement: persist commit index in LogStore to accelerate recovery Sep 6, 2024
api.go Outdated Show resolved Hide resolved
Copy link
Member

@banks banks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @lalalalatt, thanks for working on this.

I spotted a couple of things that we should tweak here. I've added comments inline - let me know if anything is confusing. I didn't do a "full" final review yet but these should get it more inline with my original design. Feel free to explain if I'm misunderstanding anything though!

api.go Outdated Show resolved Hide resolved
raft.go Outdated Show resolved Hide resolved
@peterxcli
Copy link

peterxcli commented Sep 10, 2024

tl;dr, we're fine here 😅 (on the correctness issue, the other comments about this PR stand). We should be careful to preserve the behaviour in this PR - even if we only replay up to commitIndex on the FSM, we should still process any config changes in the rest of the log after that!

@banks Thanks for these lots of comments 😄

But, for the current change, it is possible to apply same configurations twice, although its wouldn't violate the correctness, it still be wasteful.
So I would reverse the order of r.recoverFromCommitedLogs() and "iterating the whole log to see if any logs are config changes", then, if the persisted commit are applied, then we only need to "iterating the whole log r.logs[max(lastapplied, snapshotIndex) + 1 : ] to see if any logs are config changes" and that prevent the duplication~
upd: reverse order

@peterxcli
Copy link

I don't think this is the right place for this call. I think we should call it in storeLogs just before the actual call to StoreLogs on the storage since that is the write we want our log stores to be able to add this state to disk in. Calling it here will not do anything until the next StoreLogs call at least in the design I proposed in the issue.

I don't think storing commit index every time before StoreLogs is a good idea.

For followers, when they are appending the log received from leader, the commit index may not increase, because leader may not receive majority agreement on the corresponding logs.

For leader, as I mention previously, it persist logs by calling log.StoreLogs() when receiving new log from applyCh which client requested, but that doesn't mean the logs are committed, unless majority accept them.

My idea is: the commit index only update at two place:

  1. receive higher leaderCommit when member is follower
  2. leader's commitment detect majority have accepted a higher log index, then it would send a message to c.commitCh, then the leader main thread would receive that new commit index and actually update it in raft in-memory state.

They both trigger r.processLogs eventually, so we can just update the commit index there.

Maybe that's my misunderstanding 😅, if there is anything wrong, please correct me~

@banks
Copy link
Member

banks commented Sep 10, 2024

Hi @peterxcli

The idea behind persisting commit index during StoreLogs is to make sure it doesn't cause worse performance. Writing to disk is the slowest part of raft in most implementations.

If we add a new write to disk anywhere then performance will decrease and that's not acceptable for our products at least and probably not to other users of this library.

So the whole design was intended to persist the current known commit index along with the logs every time we write new logs - yes it's a few extra bytes but it's the fsync calls that are slow so if we can write in the same write it's virtually "free" for all the LogStore implemetations we use currently and doesn't impact performance.

For followers, when they are appending the log received from leader, the commit index may not increase, because leader may not receive majority agreement on the corresponding logs.

This is true, but the AppendEntries RPC is the method that a follower learns about previous commits. In a given AppendEntries RPC, there will be one or more new logs (ignoring heartbeats for now), lets say those are logs {1000, 1001} and the leader will also include CommitIndex = 998 which lets this follower know that everything up to 998 is comitted. My proposal is that we call store.SetCommitIndex() with the commit index the leader just sent in each AppendEntries RPC before we call StoreLogs then the log store can just include that meta data in the same write and we always have persisted the absolute most up to date commit info each follower knows, all without any additional disk fsyncs.

If we only call this during processLogs on the follower as you propose then there are two possibilities depending on how the LogStore implements it:

  1. The LogStore buffers that index in memory until the next call to StoreLogs - that would work and have the same performance as my proposal, but I think it's slightly worse than my proposal for two reasons:
    1. If commitIndex sent by the leader is not actually persisted until the next RPC arrives which means that whenever the node restarts it can only restore a smaller number of logs even though it did "know" about and possibly apply more logs than it's persisted commitIndex before it restarted. This difference is likely small, but it's easy to avoid this.
    2. I find it much more surprising and hard to reason about if we actually don't persist the commitIndex we learned from the leader until the next RPC (where we'll learn about a new commit index most likely).
  2. Or, the LogStore could write the commitIndex to disk synchronously when SetCommitIndex is called. This would be a whole extra fsync (or two for BoltDB or WAL implementations) which would likely cause a significant slow down in the speed we can write to raft. I don't think we should consider this option especially when there's an easy way to avoid it (my proposal).

For leader, as I mention previously, it persist logs by calling log.StoreLogs() when receiving new log from applyCh which client requested, but that doesn't mean the logs are committed, unless majority accept them.

You're correct that all the logs we store when we call StoreLogs on the leader are not yet committed, in fact, with our current implementation we guarantee that none of them are committed (except in the trivial case of a single node cluster) because we don't replicate to followers until after this. But again the leader would not be calling SetCommitIndex with the new log index, it would just be calling it with whatever it's current in-memory commitIndex is which is guaranteed to be some way behind.

Again this is so that we can "re-use" the fsync on the LogStore and so avoid more disk IO or waiting on the critical path of writing data.

Most of the same arguments as above apply here: if we wait until commitIndex updates and processLogs is called on the leader we'll either have a more confusing model where the commitIndex persisted is a batch behind the last known state on the leader for any given set of logs, or we'll make it all slower by adding a new sync disk write just to persist this state during processLogs.

As an aside, it we were OK with adding more disk writes, I'd not have proposed this as an extension of LogStore at all - we already have a StableStore that we could just add a new KV pair to. But if we did that and updated it on every commit it would massively impact the throughput of a raft cluster since disk writes (and fsyncs) are the slowest part and we'd effectively double the number of them on both leader and follower for every commit.

Does that make sense?

@peterxcli
Copy link

@banks oh~~ It seems like I completely misunderstood your design.

Again this is so that we can "re-use" the fsync on the LogStore and so avoid more disk IO or waiting on the critical path of writing data.

Got it, thanks~ 😄

As an aside, it we were OK with adding more disk writes, I'd not have proposed this as an extension of LogStore at all - we already have a StableStore that we could just add a new KV pair to. But if we did that and updated it on every commit it would massively impact the throughput of a raft cluster since disk writes (and fsyncs) are the slowest part and we'd effectively double the number of them on both leader and follower for every commit.

That's true 😅

@banks Thanks for your clarification very much~

This commit adds a warning log message when fast recovery is enabled but
the log store does not support the CommitTrackingLogStore interface. This
provides better visibility into potential configuration issues.

- Add warning log in recoverFromCommittedLogs when log store doesn't
  implement CommitTrackingLogStore
- Include the type of the log store in the warning message for easier debugging
@peterxcli
Copy link

Would it be safer to just expect log stores to implement a new store log entrypoint since we're having to type assert on every StoreLog call anyway?

Is it reasonable to add a new field in Raft struct and use it to cache the pointer to CommitTrackingLogStore at the start of Raft process? Then we could use the store directly and no type assertion anymore.
But as the store variant become more and more, it would be a mass😅...

@dhiaayachi
Copy link
Contributor

Thank you for sharing your thoughts on the API @banks!

When I was thinking about the API I was too attached to the idea of storing the commit index along with the log. Now I realize that it's just an optimization, so an implementation detail that the store need to care about but not necessarily the raft library.

If I try to think about this from the perspective of the raft library and the requirements that it needs to operate correctly:

  • The raft library will provide a commit Index for the store to persist, this value can be provided at any time and not necessarily with each log (maybe once per batch, once per log or at any time?)
  • The raft library will read the latest stored value from the store at startup (Can we assume this only happen at startup?), the store provided value SHOULD NOT exceed the last index in the persisted logs in the store. The store can decide to not provide that value because it's not available even if the store supports it and return an error.

While I understand that the commit index could be persisted independently from the log, for example a naive implementation could just write a separate file (from the logs file) on disk that just keep the latest value, I think the important requirement here is the store provided value SHOULD NOT exceed the last index in the persisted logs in the store and IMO the question that need to be answered to design the right interface is: Is it the Raft library or the LogStore responsibility to keep that requirement true all the time?

In other words can a LogStore implementation assume that every time a commit index is provided by the raft library it can be stored right away or it need to wait for the next call to StoreLog/StoreLogs to persist it? I think in the current API we assume that it need to wait that's why we call it Stage but we don't explicitly say what it need to wait for.

If we change the API to attach the commit index to StoreLogs as @schmichael suggested I think it make the requirement more explicit while not perfect as you pointed out, we still need to either successfully persist the logs before persisting the index or write them in a single transaction to hold the requirement true.

For StoreLogs I don't think we assume today that all the logs need to be stored in a single transaction. That said, in both our LogStore implementations we do it for performance reasons, right?

I think it will be cleaner to go with @schmichael suggestion for the API and to simplify the requirement to assume that StoreLogsTx(tx StoreLogsTx) need to write all the content of StoreLogsTx in a transaction.

I agree with your point on the exploding possibilities for the API if we decide other options to storing logs. That said, I think StoreLogsAsync could be implemented simply as a StoreLogs specific implementation and not its own API but let's keep that discussion to the async log PR when we get to it 😅

@schmichael
Copy link
Member

the store provided value SHOULD NOT exceed the last index in the persisted logs in the store

You made me realize something: I think SHOULD is key there. It's not a MUST. If the committed index persisted exceeds the highest index persisted log then the remaining logs must be streamed from a peer. AFAICT the persisted commit index is only needed to ensure uncommitted logs aren't replayed.

This implies the existing API is fine and not really brittle. A misbehaving implementation that writes the commit index on every call distinctly from StoreLogs is a performance issue (since its doing extra IOPs and those IOPs might be "wasted" if they're for a commit index ahead of what StoreLogs has persisted), not a correctness issue (since on recovery the delta from last-persisted-log to persisted-commit-index will be streamed).

If my understanding is correct then we need to relax the MUSTs in StageCommitIndex to SHOULDs or similar.

That being said I'm not sure if relaxing the atomicity complicates the implementation of Raft or LogStores! If requiring atomicity simplifies the implementation: we're probably best off doing that since that's what we already have in front of us. :)

If it does not complicate implementations, I vote we change StageCommitIndex -> SetCommitIndex or similar and ship this as-is.

My idea to avoid runtime type assertions by turning minimal-logstore-implementations into maximal-logstore-implementations via noop-adapters could even be done as a followup PR with very little-to-no debt incurred by first accepting this approach.


Regardless I am quite pleased with the implementation and options here. There's not an option mentioned I would oppose, so as soon as CI is green I'm happy to 👍 and have 1 less cook in the kitchen.

@dhiaayachi
Copy link
Contributor

I agree, if the commit index is higher we can always fallback to the current implementation but in this case we can apply the same logic to any error mode specific to this feature and chose to return an error from NewRaft and let the library user decide what to do with it.

Also it will be hard to determine that error without processing most of the logs, as we read them in sequence from the oldest to the newest.

@dhiaayachi
Copy link
Contributor

@banks @lalalalatt Same here, I don't have a strong opinion about the API and I'm ok to ✅ this as is. The only blocking items for me are:

  • relaxing the panic when restoring to return an error instead
  • document the expected behaviour when GetCommitIndex is called and no commit index is found
  • add some error mode tests

api.go Outdated
Comment on lines 714 to 720
// If the store implements CommitTrackingLogStore, we can read the commit index from the store.
// This is useful when the store is able to track the commit index and we can avoid replaying logs.
store, ok := r.logs.(CommitTrackingLogStore)
if !ok {
r.logger.Warn("fast recovery enabled but log store does not support it", "log_store", fmt.Sprintf("%T", r.logs))
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we're considering returning an error below instead of panicking, I think we should consider doing so here as well. This is going to be a "programmer error" rather than a runtime error -- the consumer of the library should be ensuring they're passing a compatible combination of log store and FastRecovery configuration.

This commit renames the FastRecovery feature to RestoreCommittedLogs for
better clarity and consistency. This is a breaking change as it modifies
public API elements.

- Rename Config.FastRecovery to Config.RestoreCommittedLogs
- Rename Raft.fastRecovery to Raft.RestoreCommittedLogs
- Update all references to fastRecovery in the codebase
- Rename TestRaft_FastRecovery to TestRaft_RestoreCommittedLogs
- Update comments to reflect the new terminology
- Refactor tryStageCommitIndex to accept commitIndex as a parameter

BREAKING CHANGE: Config.FastRecovery has been renamed to Config.RestoreCommittedLogs
- Add a notice in the Config struct documentation for RestoreCommittedLogs
- Specify that Raft will fail to start with ErrIncompatibleLogStore if the
  requirement is not met
- Update makeCluster to return (*cluster, error)
- Modify MakeCluster, MakeClusterNoBootstrap, and MakeClusterCustom to return error
- Update all test cases to handle potential errors from cluster creation
- Replace t.Fatalf() calls with t.Logf() and error returns in makeCluster

BREAKING CHANGE: MakeCluster, MakeClusterNoBootstrap, and MakeClusterCustom
now return an additional error value, which needs to be handled in existing tests.
@peterxcli
Copy link

peterxcli commented Oct 11, 2024

To support error assertion for NewRaft error when making cluster, I add a error return field in makeCluster, but it requires a bunch of change for existing test to handle the returned error.
I think it is necessary cause I guess there would be more and more log store variant appearing, and their error mode test are needed.


Document fix commits:


Error mode testing:

schmichael
schmichael previously approved these changes Oct 11, 2024
Copy link
Member

@schmichael schmichael left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

}

// makeCluster will return a cluster with the given config and number of peers.
// If bootstrap is true, the servers will know about each other before starting,
// otherwise their transports will be wired up but they won't yet have configured
// each other.
func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this was added to support 1 test that asserts a specific error is returned. Since this causes tons of other test churn I have a slight preference for reverting the error return here and special casing the 1 test that needs to check the error return of NewRaft. Not a blocker though.

- Add PropagateError option to MakeClusterOpts
- Update makeCluster to return (*cluster, error)
- Modify MakeCluster, MakeClusterNo
Copy link
Contributor

@dhiaayachi dhiaayachi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you again for the prompt update @lalalalatt!
I left a minor suggestion for a comment, other then that I think from testing perspective we can add more tests that cover the following cases:

  • the logstore return 0, nil all the time (no commit index is in the store).
  • GetCommitIndex return a value that is bigger then the last index

log.go Outdated

// GetCommitIndex returns the latest persisted commit index from the latest log entry
// in the store at startup.
// It is ok to return a value higher than the last index in the log (But it should never happen).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// It is ok to return a value higher than the last index in the log (But it should never happen).
// GetCommitIndex should not return a value higher than the last index in the log. If that happens, the last index in the log will be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also document here that GetCommitIndex need to return 0,nil when no commit index is found in the log.

…face

- Specify that GetCommitIndex should not return a value higher than the last
  index in the log
- Clarify that if a higher value is returned, the last index in the log will
  be used instead
- Add instruction to return (0, nil) when no commit index is found in the
  log store
@lalalalatt
Copy link
Author

Hi, @banks, any new progress on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants