From 2e5a8a0bbaa2cc5ecba1f2a68a3391d730f33b28 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sun, 1 Sep 2024 16:21:56 +0800 Subject: [PATCH 01/37] feat: add CommitTrackingLogStore interface for commit index management --- log.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/log.go b/log.go index 4ae21932..7de07348 100644 --- a/log.go +++ b/log.go @@ -190,3 +190,13 @@ func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, st } } } + +type CommitTrackingLogStore interface { + SetCommitIndex(idx uint64) error + ReadCommitIndex() (uint64, error) +} + +func isCommitTrackingLogStore(s LogStore) bool { + _, ok := s.(CommitTrackingLogStore) + return ok +} From ffc6b3b1ddea562dc1db4752349ead9b3a6b08eb Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 3 Sep 2024 23:52:34 +0800 Subject: [PATCH 02/37] chore: remove non-idiomatic type assert func --- log.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/log.go b/log.go index 7de07348..5cef65de 100644 --- a/log.go +++ b/log.go @@ -195,8 +195,3 @@ type CommitTrackingLogStore interface { SetCommitIndex(idx uint64) error ReadCommitIndex() (uint64, error) } - -func isCommitTrackingLogStore(s LogStore) bool { - _, ok := s.(CommitTrackingLogStore) - return ok -} From 7383d96db9464c05402ae06df8e4123e0a7ce938 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 4 Sep 2024 23:13:05 +0800 Subject: [PATCH 03/37] feat(raft): add fast recovery mode for quicker log application - Introduced a `fastRecovery` flag in the Raft structure and configuration to enable fast recovery mode. - Updated `NewRaft` to initialize `fastRecovery` from the configuration. - Added `persistCommitIndex` function to store the commit index when fast recovery is enabled. - Modified `processLogs` to persist the commit index before updating `lastApplied`. - Documented the `FastRecovery` option in the config. --- api.go | 8 ++++++++ config.go | 5 +++++ raft.go | 16 ++++++++++++++++ 3 files changed, 29 insertions(+) diff --git a/api.go b/api.go index cff2eaac..1ad8a24c 100644 --- a/api.go +++ b/api.go @@ -217,6 +217,10 @@ type Raft struct { // preVoteDisabled control if the pre-vote feature is activated, // prevote feature is disabled if set to true. preVoteDisabled bool + + // fastRecovery is used to enable fast recovery mode + // fast recovery mode is disabled if set to false. + fastRecovery bool } // BootstrapCluster initializes a server's storage with the given cluster @@ -566,6 +570,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna followerNotifyCh: make(chan struct{}, 1), mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second), preVoteDisabled: conf.PreVoteDisabled || !transportSupportPreVote, + fastRecovery: conf.FastRecovery, } if !transportSupportPreVote && !conf.PreVoteDisabled { r.logger.Warn("pre-vote is disabled because it is not supported by the Transport") @@ -606,6 +611,9 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna // to be called concurrently with a blocking RPC. trans.SetHeartbeatHandler(r.processHeartbeat) + // TODO: if `fastRecovery` is enabled and the store also implements `CommitTrackingLogStore`, + // we should read the commit index from the store and set it here. Then feed [lastapplied, commitindex] logs to the FSM. + if conf.skipStartup { return r, nil } diff --git a/config.go b/config.go index d14392fc..5cf4bc56 100644 --- a/config.go +++ b/config.go @@ -235,6 +235,11 @@ type Config struct { // PreVoteDisabled deactivate the pre-vote feature when set to true PreVoteDisabled bool + // FastRecovery controls if the Raft server should use the fast recovery + // mechanism. This mechanism allows a server to apply logs to the FSM till + // the last committed log + FastRecovery bool + // skipStartup allows NewRaft() to bypass all background work goroutines skipStartup bool } diff --git a/raft.go b/raft.go index cbc9a59a..a4db3271 100644 --- a/raft.go +++ b/raft.go @@ -1354,6 +1354,8 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) { applyBatch(batch) } + r.persistCommitIndex(index) + // Update the lastApplied index and term r.setLastApplied(index) } @@ -1385,6 +1387,20 @@ func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple { return nil } +// persistCommitIndex updates the commit index in persist store if fast recovery is enabled. +func (r *Raft) persistCommitIndex(index uint64) { + if !r.fastRecovery { + return + } + store, ok := r.logs.(CommitTrackingLogStore) + if !ok { + return + } + if err := store.SetCommitIndex(index); err != nil { + r.logger.Error("failed to set commit index in commit tracking log store", "index", index, "error", err) + } +} + // processRPC is called to handle an incoming RPC request. This must only be // called from the main thread. func (r *Raft) processRPC(rpc RPC) { From f6295e04a993c3eb4273060290c4f8bd80a2692d Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 4 Sep 2024 23:49:31 +0800 Subject: [PATCH 04/37] feat(raft): add recovery from committed logs during startup - Implemented `recoverFromCommitedLogs` function to recover the Raft node from committed logs. - If `fastRecovery` is enabled and the log store implements `CommitTrackingLogStore`, the commit index is read from the store, avoiding the need to replay logs. - Logs between the last applied and commit index are fed into the FSM for faster recovery. --- api.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/api.go b/api.go index 1ad8a24c..166562f3 100644 --- a/api.go +++ b/api.go @@ -611,8 +611,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna // to be called concurrently with a blocking RPC. trans.SetHeartbeatHandler(r.processHeartbeat) - // TODO: if `fastRecovery` is enabled and the store also implements `CommitTrackingLogStore`, - // we should read the commit index from the store and set it here. Then feed [lastapplied, commitindex] logs to the FSM. + r.recoverFromCommitedLogs() if conf.skipStartup { return r, nil @@ -705,6 +704,29 @@ func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool { return true } +// recoverFromCommitedLogs recovers the Raft node from committed logs. +func (r *Raft) recoverFromCommitedLogs() error { + if !r.fastRecovery { + return nil + } + // If the store implements CommitTrackingLogStore, we can read the commit index from the store. + // This is useful when the store is able to track the commit index and we can avoid replaying logs. + store, ok := r.logs.(CommitTrackingLogStore) + if !ok { + return nil + } + commitIndex, err := store.ReadCommitIndex() + if err != nil { + return fmt.Errorf("failed to read commit index from store: %w", err) + } + if commitIndex == 0 { + return nil + } + + r.processLogs(commitIndex, nil) + return nil +} + func (r *Raft) config() Config { return r.conf.Load().(Config) } From f2ae7a92f310350e77f894dd08fc035aa12279cb Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 6 Sep 2024 22:07:52 +0800 Subject: [PATCH 05/37] refactor(store): rename ReadCommitIndex to GetCommitIndex for consistency - Refactor `ReadCommitIndex` to `GetCommitIndex` across `LogStore` and `InmemCommitTrackingStore`. - Introduce `InmemCommitTrackingStore` to track commit index in memory for testing purposes. - Add locking mechanism to safely read/write commit index in `InmemCommitTrackingStore`. --- api.go | 2 +- inmem_store.go | 28 ++++++++++++++++++++++++++++ log.go | 2 +- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/api.go b/api.go index 166562f3..e1451666 100644 --- a/api.go +++ b/api.go @@ -715,7 +715,7 @@ func (r *Raft) recoverFromCommitedLogs() error { if !ok { return nil } - commitIndex, err := store.ReadCommitIndex() + commitIndex, err := store.GetCommitIndex() if err != nil { return fmt.Errorf("failed to read commit index from store: %w", err) } diff --git a/inmem_store.go b/inmem_store.go index 730d03f2..ce4b8cef 100644 --- a/inmem_store.go +++ b/inmem_store.go @@ -131,3 +131,31 @@ func (i *InmemStore) GetUint64(key []byte) (uint64, error) { defer i.l.RUnlock() return i.kvInt[string(key)], nil } + +type InmemCommitTrackingStore struct { + *InmemStore + commitIndexLock sync.RWMutex + commitIndex uint64 +} + +// NewInmemCommitTrackingStore returns a new in-memory backend that tracks the commit index. Do not ever +// use for production. Only for testing. +func NewInmemCommitTrackingStore() *InmemCommitTrackingStore { + i := &InmemCommitTrackingStore{ + InmemStore: NewInmemStore(), + } + return i +} + +func (i *InmemCommitTrackingStore) SetCommitIndex(index uint64) error { + i.commitIndexLock.Lock() + defer i.commitIndexLock.Unlock() + i.commitIndex = index + return nil +} + +func (i *InmemCommitTrackingStore) GetCommitIndex() (uint64, error) { + i.commitIndexLock.RLock() + defer i.commitIndexLock.RUnlock() + return i.commitIndex, nil +} diff --git a/log.go b/log.go index 5cef65de..be39299b 100644 --- a/log.go +++ b/log.go @@ -193,5 +193,5 @@ func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, st type CommitTrackingLogStore interface { SetCommitIndex(idx uint64) error - ReadCommitIndex() (uint64, error) + GetCommitIndex() (uint64, error) } From ce1895c644ac99c29ffc8deb71ac6bf67332e10e Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 10 Sep 2024 15:06:41 +0800 Subject: [PATCH 06/37] fix: also set inmem commit index when revocer log commit progress from persist store --- api.go | 1 + 1 file changed, 1 insertion(+) diff --git a/api.go b/api.go index e1451666..3e071df5 100644 --- a/api.go +++ b/api.go @@ -723,6 +723,7 @@ func (r *Raft) recoverFromCommitedLogs() error { return nil } + r.setCommitIndex(commitIndex) r.processLogs(commitIndex, nil) return nil } From ab50a58f37dbdb3da2b0f4b3f617e5520a90c3ec Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 10 Sep 2024 17:05:25 +0800 Subject: [PATCH 07/37] perf: optimize startup recovery by skipping duplicated log replay --- api.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/api.go b/api.go index 3e071df5..fc1b2b3e 100644 --- a/api.go +++ b/api.go @@ -590,9 +590,12 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna return nil, err } + r.recoverFromCommitedLogs() + // Scan through the log for any configuration change entries. snapshotIndex, _ := r.getLastSnapshot() - for index := snapshotIndex + 1; index <= lastLog.Index; index++ { + lastappliedIndex := r.getLastApplied() + for index := max(snapshotIndex, lastappliedIndex) + 1; index <= lastLog.Index; index++ { var entry Log if err := r.logs.GetLog(index, &entry); err != nil { r.logger.Error("failed to get log", "index", index, "error", err) @@ -611,8 +614,6 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna // to be called concurrently with a blocking RPC. trans.SetHeartbeatHandler(r.processHeartbeat) - r.recoverFromCommitedLogs() - if conf.skipStartup { return r, nil } From 4e7e04b17f4c2d342482f7af90ae317d2c8e1c7e Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 13 Sep 2024 10:25:58 +0800 Subject: [PATCH 08/37] refactor(inmem-commit-tracking-store): store commit index in memory until the next StoreLogs call and then write it out on disk along with the log data. --- inmem_store.go | 127 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 122 insertions(+), 5 deletions(-) diff --git a/inmem_store.go b/inmem_store.go index ce4b8cef..f3fb3fc8 100644 --- a/inmem_store.go +++ b/inmem_store.go @@ -132,8 +132,18 @@ func (i *InmemStore) GetUint64(key []byte) (uint64, error) { return i.kvInt[string(key)], nil } +type commitIndexTrackingLog struct { + log *Log + CommitIndex uint64 +} type InmemCommitTrackingStore struct { - *InmemStore + l sync.RWMutex + lowIndex uint64 + highIndex uint64 + logs map[uint64]*commitIndexTrackingLog + kv map[string][]byte + kvInt map[string]uint64 + commitIndexLock sync.RWMutex commitIndex uint64 } @@ -142,11 +152,114 @@ type InmemCommitTrackingStore struct { // use for production. Only for testing. func NewInmemCommitTrackingStore() *InmemCommitTrackingStore { i := &InmemCommitTrackingStore{ - InmemStore: NewInmemStore(), + logs: make(map[uint64]*commitIndexTrackingLog), + kv: make(map[string][]byte), + kvInt: make(map[string]uint64), } return i } +// FirstIndex implements the CommitTrackingLogStore interface. +func (i *InmemCommitTrackingStore) FirstIndex() (uint64, error) { + i.l.RLock() + defer i.l.RUnlock() + return i.lowIndex, nil +} + +// LastIndex implements the CommitTrackingLogStore interface. +func (i *InmemCommitTrackingStore) LastIndex() (uint64, error) { + i.l.RLock() + defer i.l.RUnlock() + return i.highIndex, nil +} + +// GetLog implements the CommitTrackingLogStore interface. +func (i *InmemCommitTrackingStore) GetLog(index uint64, log *Log) error { + i.l.RLock() + defer i.l.RUnlock() + l, ok := i.logs[index] + if !ok { + return ErrLogNotFound + } + *log = *l.log + return nil +} + +// StoreLog implements the LogStore interface. +func (i *InmemCommitTrackingStore) StoreLog(log *Log) error { + return i.StoreLogs([]*Log{log}) +} + +// StoreLogs implements the LogStore interface. +func (i *InmemCommitTrackingStore) StoreLogs(logs []*Log) error { + i.l.Lock() + defer i.l.Unlock() + for _, l := range logs { + i.logs[l.Index] = &commitIndexTrackingLog{log: l, CommitIndex: i.commitIndex} + if i.lowIndex == 0 { + i.lowIndex = l.Index + } + if l.Index > i.highIndex { + i.highIndex = l.Index + } + } + return nil +} + +// DeleteRange implements the LogStore interface. +func (i *InmemCommitTrackingStore) DeleteRange(min, max uint64) error { + i.l.Lock() + defer i.l.Unlock() + for j := min; j <= max; j++ { + delete(i.logs, j) + } + if min <= i.lowIndex { + i.lowIndex = max + 1 + } + if max >= i.highIndex { + i.highIndex = min - 1 + } + if i.lowIndex > i.highIndex { + i.lowIndex = 0 + i.highIndex = 0 + } + return nil +} + +// Set implements the StableStore interface. +func (i *InmemCommitTrackingStore) Set(key []byte, val []byte) error { + i.l.Lock() + defer i.l.Unlock() + i.kv[string(key)] = val + return nil +} + +// Get implements the StableStore interface. +func (i *InmemCommitTrackingStore) Get(key []byte) ([]byte, error) { + i.l.RLock() + defer i.l.RUnlock() + val := i.kv[string(key)] + if val == nil { + return nil, errors.New("not found") + } + return val, nil +} + +// SetUint64 implements the StableStore interface. +func (i *InmemCommitTrackingStore) SetUint64(key []byte, val uint64) error { + i.l.Lock() + defer i.l.Unlock() + i.kvInt[string(key)] = val + return nil +} + +// GetUint64 implements the StableStore interface. +func (i *InmemCommitTrackingStore) GetUint64(key []byte) (uint64, error) { + i.l.RLock() + defer i.l.RUnlock() + return i.kvInt[string(key)], nil +} + func (i *InmemCommitTrackingStore) SetCommitIndex(index uint64) error { i.commitIndexLock.Lock() defer i.commitIndexLock.Unlock() @@ -155,7 +268,11 @@ func (i *InmemCommitTrackingStore) SetCommitIndex(index uint64) error { } func (i *InmemCommitTrackingStore) GetCommitIndex() (uint64, error) { - i.commitIndexLock.RLock() - defer i.commitIndexLock.RUnlock() - return i.commitIndex, nil + i.l.RLock() + defer i.l.RUnlock() + log, ok := i.logs[i.highIndex] + if !ok { + return 0, ErrLogNotFound + } + return log.CommitIndex, nil } From 41df55ec11ff539e1220e9adc0daddfa7a16c570 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 13 Sep 2024 14:22:30 +0800 Subject: [PATCH 09/37] chore: fix typo in recoverFromCommittedLogs function name --- api.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api.go b/api.go index fc1b2b3e..592b2fa5 100644 --- a/api.go +++ b/api.go @@ -590,7 +590,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna return nil, err } - r.recoverFromCommitedLogs() + r.recoverFromCommittedLogs() // Scan through the log for any configuration change entries. snapshotIndex, _ := r.getLastSnapshot() @@ -705,8 +705,8 @@ func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool { return true } -// recoverFromCommitedLogs recovers the Raft node from committed logs. -func (r *Raft) recoverFromCommitedLogs() error { +// recoverFromCommittedLogs recovers the Raft node from committed logs. +func (r *Raft) recoverFromCommittedLogs() error { if !r.fastRecovery { return nil } From 400a27deff9fee3a71dfb241b78f9b8a94eec5da Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 13 Sep 2024 15:14:11 +0800 Subject: [PATCH 10/37] refactor(raft): update parameter name in persistCommitIndex function --- raft.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/raft.go b/raft.go index a4db3271..8e959a6c 100644 --- a/raft.go +++ b/raft.go @@ -1388,7 +1388,7 @@ func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple { } // persistCommitIndex updates the commit index in persist store if fast recovery is enabled. -func (r *Raft) persistCommitIndex(index uint64) { +func (r *Raft) persistCommitIndex(commitIndex uint64) { if !r.fastRecovery { return } @@ -1396,8 +1396,8 @@ func (r *Raft) persistCommitIndex(index uint64) { if !ok { return } - if err := store.SetCommitIndex(index); err != nil { - r.logger.Error("failed to set commit index in commit tracking log store", "index", index, "error", err) + if err := store.SetCommitIndex(commitIndex); err != nil { + r.logger.Error("failed to set commit index in commit tracking log store", "index", commitIndex, "error", err) } } From e2617e8d3162b6da7116012001ddab71bbae15a8 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 13 Sep 2024 15:14:51 +0800 Subject: [PATCH 11/37] refactor(raft): set commit index in memory before `StoreLogs` --- raft.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/raft.go b/raft.go index 8e959a6c..faf276c9 100644 --- a/raft.go +++ b/raft.go @@ -1262,6 +1262,9 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { r.leaderState.inflight.PushBack(applyLog) } + commitIndex := r.getCommitIndex() + r.persistCommitIndex(commitIndex) + // Write the log entry locally if err := r.logs.StoreLogs(logs); err != nil { r.logger.Error("failed to commit logs", "error", err) @@ -1354,8 +1357,6 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) { applyBatch(batch) } - r.persistCommitIndex(index) - // Update the lastApplied index and term r.setLastApplied(index) } @@ -1551,6 +1552,9 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } if n := len(newEntries); n > 0 { + commitIndex := r.getCommitIndex() + r.persistCommitIndex(commitIndex) + // Append the new entries if err := r.logs.StoreLogs(newEntries); err != nil { r.logger.Error("failed to append to logs", "error", err) From 6daca47df09a50a7f1c4aa897aa6fa9754c4b686 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 18 Sep 2024 10:04:21 +0800 Subject: [PATCH 12/37] refactor(raft): fix condition for skipping recovery in `recoverFromCommittedLogs` --- api.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api.go b/api.go index 592b2fa5..3e98d9f3 100644 --- a/api.go +++ b/api.go @@ -720,7 +720,8 @@ func (r *Raft) recoverFromCommittedLogs() error { if err != nil { return fmt.Errorf("failed to read commit index from store: %w", err) } - if commitIndex == 0 { + lastApplied := r.getLastApplied() + if commitIndex <= lastApplied { return nil } From cc09317d49490bf18854572358c77c9c19db70bb Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 18 Sep 2024 13:44:15 +0800 Subject: [PATCH 13/37] feat(raft): add commit tracking logs and fast recovery tests - Implemented commit tracking logs in the Raft cluster configuration. - Added tests for restoring snapshots on startup with commit tracking logs. - Introduced a fast recovery test to ensure logs are applied correctly after restart. - Updated `MakeClusterOpts` to include `CommitTrackingLogs` option. --- raft_test.go | 174 +++++++++++++++++++++++++++++++++++++++++++++++++++ testing.go | 17 ++--- 2 files changed, 184 insertions(+), 7 deletions(-) diff --git a/raft_test.go b/raft_test.go index 2db115b6..2908f9c7 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1095,6 +1095,180 @@ func TestRaft_RestoreSnapshotOnStartup_Monotonic(t *testing.T) { assert.Equal(t, lastIdx, last) } +func TestRaft_RestoreSnapshotOnStartup_CommitTrackingLogs(t *testing.T) { + // Make the cluster + conf := inmemConfig(t) + conf.TrailingLogs = 10 + opts := &MakeClusterOpts{ + Peers: 1, + Bootstrap: true, + Conf: conf, + CommitTrackingLogs: true, + } + c := MakeClusterCustom(t, opts) + defer c.Close() + + leader := c.Leader() + + // Commit a lot of things + var future Future + for i := 0; i < 100; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + } + + // Wait for the last future to apply + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Take a snapshot + snapFuture := leader.Snapshot() + if err := snapFuture.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Check for snapshot + snaps, _ := leader.snapshots.List() + if len(snaps) != 1 { + t.Fatalf("should have a snapshot") + } + snap := snaps[0] + + // Logs should be trimmed + firstIdx, err := leader.logs.FirstIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + lastIdx, err := leader.logs.LastIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + + if firstIdx != snap.Index-conf.TrailingLogs+1 { + t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, firstIdx) + } + + // Shutdown + shutdown := leader.Shutdown() + if err := shutdown.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Restart the Raft + r := leader + // Can't just reuse the old transport as it will be closed + _, trans2 := NewInmemTransport(r.trans.LocalAddr()) + cfg := r.config() + r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) + if err != nil { + t.Fatalf("err: %v", err) + } + c.rafts[0] = r + + // We should have restored from the snapshot! + if last := r.getLastApplied(); last != snap.Index { + t.Fatalf("bad last index: %d, expecting %d", last, snap.Index) + } + + // Verify that logs have not been reset + first, _ := r.logs.FirstIndex() + last, _ := r.logs.LastIndex() + assert.Equal(t, firstIdx, first) + assert.Equal(t, lastIdx, last) +} + +func TestRaft_FastRecovery(t *testing.T) { + // Make the cluster + conf := inmemConfig(t) + conf.TrailingLogs = 10 + conf.FastRecovery = true + opts := &MakeClusterOpts{ + Peers: 1, + Bootstrap: true, + Conf: conf, + CommitTrackingLogs: true, + } + c := MakeClusterCustom(t, opts) + defer c.Close() + + leader := c.Leader() + + // Commit a lot of things + var future Future + for i := 0; i < 100; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + } + + // Wait for the last future to apply + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Take a snapshot + snapFuture := leader.Snapshot() + if err := snapFuture.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Check for snapshot + snaps, _ := leader.snapshots.List() + if len(snaps) != 1 { + t.Fatalf("should have a snapshot") + } + snap := snaps[0] + + // Logs should be trimmed + firstIdx, err := leader.logs.FirstIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + + if firstIdx != snap.Index-conf.TrailingLogs+1 { + t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, firstIdx) + } + + // Commit a lot of things (for fast recovery test) + for i := 0; i < 100; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + } + + // Wait for the last future to apply + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Shutdown + shutdown := leader.Shutdown() + if err := shutdown.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Restart the Raft + r := leader + // Can't just reuse the old transport as it will be closed + _, trans2 := NewInmemTransport(r.trans.LocalAddr()) + cfg := r.config() + r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) + if err != nil { + t.Fatalf("err: %v", err) + } + c.rafts[0] = r + + commitIdx, err := r.logs.(CommitTrackingLogStore).GetCommitIndex() + // We should have applied all committed logs + if last := r.getLastApplied(); last != commitIdx { + t.Fatalf("bad last index: %d, expecting %d", last, commitIdx) + } + + // Expect: snap.Index --- commitIdx --- lastIdx + lastIdx, err := r.logs.LastIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + assert.LessOrEqual(t, snap.Index, commitIdx) + assert.LessOrEqual(t, commitIdx, lastIdx) +} + func TestRaft_SnapshotRestore_Progress(t *testing.T) { // Make the cluster conf := inmemConfig(t) diff --git a/testing.go b/testing.go index 351a9aba..b8647e75 100644 --- a/testing.go +++ b/testing.go @@ -717,13 +717,14 @@ WAIT: // NOTE: This is exposed for middleware testing purposes and is not a stable API type MakeClusterOpts struct { - Peers int - Bootstrap bool - Conf *Config - ConfigStoreFSM bool - MakeFSMFunc func() FSM - LongstopTimeout time.Duration - MonotonicLogs bool + Peers int + Bootstrap bool + Conf *Config + ConfigStoreFSM bool + MakeFSMFunc func() FSM + LongstopTimeout time.Duration + MonotonicLogs bool + CommitTrackingLogs bool } // makeCluster will return a cluster with the given config and number of peers. @@ -807,6 +808,8 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { if opts.MonotonicLogs { logs = &MockMonotonicLogStore{s: logs} + } else if opts.CommitTrackingLogs { + logs = NewInmemCommitTrackingStore() } peerConf := opts.Conf From fe57b32ed74ea6baa9bace56705f7922825449db Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 19 Sep 2024 09:56:33 +0800 Subject: [PATCH 14/37] docs(config): update comments for FastRecovery mechanism --- config.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/config.go b/config.go index 5cf4bc56..50ca188c 100644 --- a/config.go +++ b/config.go @@ -236,8 +236,13 @@ type Config struct { PreVoteDisabled bool // FastRecovery controls if the Raft server should use the fast recovery - // mechanism. This mechanism allows a server to apply logs to the FSM till - // the last committed log + // mechanism. Fast recovery requires a LogStore implementation that + // support commit tracking. When such a store is used and this config + // enabled, raft nodes will replay all known-committed logs on disk + // before completing `NewRaft` on startup. This is mainly useful where + // the application allows relaxed-consistency reads from followers as it + // will reduce how far behind the follower's FSM is when it starts. If all reads + // are forwarded to the leader then there won't be observable benefit from this feature. FastRecovery bool // skipStartup allows NewRaft() to bypass all background work goroutines From 20e8701025259190bd96627d6ecfc91792f6a675 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 19 Sep 2024 10:01:45 +0800 Subject: [PATCH 15/37] refactor(inmem-commit-tracking-store): simplify in-mem log tracking store implement --- inmem_store.go | 129 +++---------------------------------------------- 1 file changed, 6 insertions(+), 123 deletions(-) diff --git a/inmem_store.go b/inmem_store.go index f3fb3fc8..a7088f5e 100644 --- a/inmem_store.go +++ b/inmem_store.go @@ -6,6 +6,7 @@ package raft import ( "errors" "sync" + "sync/atomic" ) // InmemStore implements the LogStore and StableStore interface. @@ -137,142 +138,24 @@ type commitIndexTrackingLog struct { CommitIndex uint64 } type InmemCommitTrackingStore struct { - l sync.RWMutex - lowIndex uint64 - highIndex uint64 - logs map[uint64]*commitIndexTrackingLog - kv map[string][]byte - kvInt map[string]uint64 - - commitIndexLock sync.RWMutex - commitIndex uint64 + InmemStore + commitIndex atomic.Uint64 } // NewInmemCommitTrackingStore returns a new in-memory backend that tracks the commit index. Do not ever // use for production. Only for testing. func NewInmemCommitTrackingStore() *InmemCommitTrackingStore { i := &InmemCommitTrackingStore{ - logs: make(map[uint64]*commitIndexTrackingLog), - kv: make(map[string][]byte), - kvInt: make(map[string]uint64), + InmemStore: *NewInmemStore(), } return i } -// FirstIndex implements the CommitTrackingLogStore interface. -func (i *InmemCommitTrackingStore) FirstIndex() (uint64, error) { - i.l.RLock() - defer i.l.RUnlock() - return i.lowIndex, nil -} - -// LastIndex implements the CommitTrackingLogStore interface. -func (i *InmemCommitTrackingStore) LastIndex() (uint64, error) { - i.l.RLock() - defer i.l.RUnlock() - return i.highIndex, nil -} - -// GetLog implements the CommitTrackingLogStore interface. -func (i *InmemCommitTrackingStore) GetLog(index uint64, log *Log) error { - i.l.RLock() - defer i.l.RUnlock() - l, ok := i.logs[index] - if !ok { - return ErrLogNotFound - } - *log = *l.log - return nil -} - -// StoreLog implements the LogStore interface. -func (i *InmemCommitTrackingStore) StoreLog(log *Log) error { - return i.StoreLogs([]*Log{log}) -} - -// StoreLogs implements the LogStore interface. -func (i *InmemCommitTrackingStore) StoreLogs(logs []*Log) error { - i.l.Lock() - defer i.l.Unlock() - for _, l := range logs { - i.logs[l.Index] = &commitIndexTrackingLog{log: l, CommitIndex: i.commitIndex} - if i.lowIndex == 0 { - i.lowIndex = l.Index - } - if l.Index > i.highIndex { - i.highIndex = l.Index - } - } - return nil -} - -// DeleteRange implements the LogStore interface. -func (i *InmemCommitTrackingStore) DeleteRange(min, max uint64) error { - i.l.Lock() - defer i.l.Unlock() - for j := min; j <= max; j++ { - delete(i.logs, j) - } - if min <= i.lowIndex { - i.lowIndex = max + 1 - } - if max >= i.highIndex { - i.highIndex = min - 1 - } - if i.lowIndex > i.highIndex { - i.lowIndex = 0 - i.highIndex = 0 - } - return nil -} - -// Set implements the StableStore interface. -func (i *InmemCommitTrackingStore) Set(key []byte, val []byte) error { - i.l.Lock() - defer i.l.Unlock() - i.kv[string(key)] = val - return nil -} - -// Get implements the StableStore interface. -func (i *InmemCommitTrackingStore) Get(key []byte) ([]byte, error) { - i.l.RLock() - defer i.l.RUnlock() - val := i.kv[string(key)] - if val == nil { - return nil, errors.New("not found") - } - return val, nil -} - -// SetUint64 implements the StableStore interface. -func (i *InmemCommitTrackingStore) SetUint64(key []byte, val uint64) error { - i.l.Lock() - defer i.l.Unlock() - i.kvInt[string(key)] = val - return nil -} - -// GetUint64 implements the StableStore interface. -func (i *InmemCommitTrackingStore) GetUint64(key []byte) (uint64, error) { - i.l.RLock() - defer i.l.RUnlock() - return i.kvInt[string(key)], nil -} - func (i *InmemCommitTrackingStore) SetCommitIndex(index uint64) error { - i.commitIndexLock.Lock() - defer i.commitIndexLock.Unlock() - i.commitIndex = index + i.commitIndex.Store(index) return nil } func (i *InmemCommitTrackingStore) GetCommitIndex() (uint64, error) { - i.l.RLock() - defer i.l.RUnlock() - log, ok := i.logs[i.highIndex] - if !ok { - return 0, ErrLogNotFound - } - return log.CommitIndex, nil + return i.commitIndex.Load(), nil } From 6f146e19517ab619a53b88bc0964fa11922e6cce Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 19 Sep 2024 10:05:48 +0800 Subject: [PATCH 16/37] fix: rename persistCommitIndex to tryPersistCommitIndex Updated the function name from `persistCommitIndex` to `tryPersistCommitIndex` to better reflect its behavior. This function now updates the commit index in the persistent store only if fast recovery is enabled and if the log store implements `CommitTrackingLogStore`. Adjusted references to this function in `dispatchLogs` and `appendEntries`. --- raft.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/raft.go b/raft.go index faf276c9..cb22d863 100644 --- a/raft.go +++ b/raft.go @@ -1263,7 +1263,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { } commitIndex := r.getCommitIndex() - r.persistCommitIndex(commitIndex) + r.tryPersistCommitIndex(commitIndex) // Write the log entry locally if err := r.logs.StoreLogs(logs); err != nil { @@ -1388,8 +1388,8 @@ func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple { return nil } -// persistCommitIndex updates the commit index in persist store if fast recovery is enabled. -func (r *Raft) persistCommitIndex(commitIndex uint64) { +// tryPersistCommitIndex updates the commit index in persist store if fast recovery is enabled and log store implements CommitTrackingLogStore. +func (r *Raft) tryPersistCommitIndex(commitIndex uint64) { if !r.fastRecovery { return } @@ -1553,7 +1553,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { if n := len(newEntries); n > 0 { commitIndex := r.getCommitIndex() - r.persistCommitIndex(commitIndex) + r.tryPersistCommitIndex(commitIndex) // Append the new entries if err := r.logs.StoreLogs(newEntries); err != nil { From a8438b04caac69555b1709c6cdb22c7436da5cea Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 20 Sep 2024 13:43:03 +0800 Subject: [PATCH 17/37] chore(raft): rename tryPersistCommitIndex to tryStageCommitIndex for clarity --- raft.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/raft.go b/raft.go index cb22d863..d306083d 100644 --- a/raft.go +++ b/raft.go @@ -1263,7 +1263,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { } commitIndex := r.getCommitIndex() - r.tryPersistCommitIndex(commitIndex) + r.tryStageCommitIndex(commitIndex) // Write the log entry locally if err := r.logs.StoreLogs(logs); err != nil { @@ -1388,8 +1388,8 @@ func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple { return nil } -// tryPersistCommitIndex updates the commit index in persist store if fast recovery is enabled and log store implements CommitTrackingLogStore. -func (r *Raft) tryPersistCommitIndex(commitIndex uint64) { +// tryStageCommitIndex updates the commit index in persist store if fast recovery is enabled and log store implements CommitTrackingLogStore. +func (r *Raft) tryStageCommitIndex(commitIndex uint64) { if !r.fastRecovery { return } From 5e6d8a471bb004e945cf7375ece2d1065613d425 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 20 Sep 2024 13:45:54 +0800 Subject: [PATCH 18/37] refactor(log): introduce StagCommitIndex for optimized atomic persistence - Rename SetCommitIndex to StagCommitIndex in CommitTrackingLogStore interface - Add detailed documentation for StagCommitIndex method - Update raft.go to use StagCommitIndex instead of SetCommitIndex - Optimize commit index staging in appendEntries using min(lastNewIndex, leaderCommitIndex) This change ensures commit index updates are only persisted atomically with the following StoreLogs call, preventing inconsistencies between the commit index and log entries in case of crashes. It also optimizes the staged commit index to be as up-to-date as possible without exceeding the last new entry index, reducing commit index lag in the CommitTrackingStore. The contract for implementations is clarified, specifying that GetCommitIndex must never return a value higher than the last index in the log. --- log.go | 16 +++++++++++++++- raft.go | 8 +++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/log.go b/log.go index be39299b..18a917e4 100644 --- a/log.go +++ b/log.go @@ -192,6 +192,20 @@ func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, st } type CommitTrackingLogStore interface { - SetCommitIndex(idx uint64) error + // StageCommitIndex stages a new commit index to be persisted. + // The staged commit index MUST only be persisted in a manner that is atomic + // with the following StoreLogs call in the face of a crash. + // This allows the Raft implementation to optimize commit index updates + // without risking inconsistency between the commit index and the log entries. + // + // The implementation MUST NOT persist this value separately from the log entries. + // Instead, it should stage the value to be written atomically with the next + // StoreLogs call. + // + // GetCommitIndex MUST never return a value higher than the last index in the log, + // even if a higher value has been staged with this method. + // + // idx is the new commit index to stage. + StagCommitIndex(idx uint64) error GetCommitIndex() (uint64, error) } diff --git a/raft.go b/raft.go index d306083d..524d929d 100644 --- a/raft.go +++ b/raft.go @@ -1397,7 +1397,7 @@ func (r *Raft) tryStageCommitIndex(commitIndex uint64) { if !ok { return } - if err := store.SetCommitIndex(commitIndex); err != nil { + if err := store.StagCommitIndex(commitIndex); err != nil { r.logger.Error("failed to set commit index in commit tracking log store", "index", commitIndex, "error", err) } } @@ -1552,8 +1552,10 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } if n := len(newEntries); n > 0 { - commitIndex := r.getCommitIndex() - r.tryPersistCommitIndex(commitIndex) + // Stage the future commit index if possible + lastNewIndex := newEntries[len(newEntries)-1].Index + commitIndex := min(a.LeaderCommitIndex, lastNewIndex) + r.tryStageCommitIndex(commitIndex) // Append the new entries if err := r.logs.StoreLogs(newEntries); err != nil { From e248f004727311c04cf4331d889241432f43750e Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 24 Sep 2024 09:56:16 +0800 Subject: [PATCH 19/37] fix(raft): correct CommitTrackingLogStore implementation - Rename SetCommitIndex to StageCommitIndex in InmemCommitTrackingStore - Fix typo in CommitTrackingLogStore interface (StagCommitIndex to StageCommitIndex) - Update error message in tryStageCommitIndex for consistency - Ensure CommitTrackingLogStore extends LogStore interface --- inmem_store.go | 4 ++-- log.go | 4 +++- raft.go | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/inmem_store.go b/inmem_store.go index a7088f5e..02eee4ad 100644 --- a/inmem_store.go +++ b/inmem_store.go @@ -144,14 +144,14 @@ type InmemCommitTrackingStore struct { // NewInmemCommitTrackingStore returns a new in-memory backend that tracks the commit index. Do not ever // use for production. Only for testing. -func NewInmemCommitTrackingStore() *InmemCommitTrackingStore { +func NewInmemCommitTrackingStore() CommitTrackingLogStore { i := &InmemCommitTrackingStore{ InmemStore: *NewInmemStore(), } return i } -func (i *InmemCommitTrackingStore) SetCommitIndex(index uint64) error { +func (i *InmemCommitTrackingStore) StageCommitIndex(index uint64) error { i.commitIndex.Store(index) return nil } diff --git a/log.go b/log.go index 18a917e4..a11b26af 100644 --- a/log.go +++ b/log.go @@ -192,6 +192,8 @@ func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, st } type CommitTrackingLogStore interface { + LogStore + // StageCommitIndex stages a new commit index to be persisted. // The staged commit index MUST only be persisted in a manner that is atomic // with the following StoreLogs call in the face of a crash. @@ -206,6 +208,6 @@ type CommitTrackingLogStore interface { // even if a higher value has been staged with this method. // // idx is the new commit index to stage. - StagCommitIndex(idx uint64) error + StageCommitIndex(idx uint64) error GetCommitIndex() (uint64, error) } diff --git a/raft.go b/raft.go index 524d929d..304c5aee 100644 --- a/raft.go +++ b/raft.go @@ -1397,8 +1397,8 @@ func (r *Raft) tryStageCommitIndex(commitIndex uint64) { if !ok { return } - if err := store.StagCommitIndex(commitIndex); err != nil { - r.logger.Error("failed to set commit index in commit tracking log store", "index", commitIndex, "error", err) + if err := store.StageCommitIndex(commitIndex); err != nil { + r.logger.Error("failed to stage commit index in commit tracking log store", "index", commitIndex, "error", err) } } From 2a913ab5406ae4fdbbbd86c54e2a0f854ecbf42d Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 24 Sep 2024 10:03:25 +0800 Subject: [PATCH 20/37] feat(raft): improve fast recovery error handling and commit index validation - Add error logging and panic for critical errors during recovery - Replace lastApplied check with lastIndex comparison - Ensure commitIndex does not exceed lastIndex - Improve code structure and readability --- api.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/api.go b/api.go index 3e98d9f3..77a4800a 100644 --- a/api.go +++ b/api.go @@ -710,19 +710,27 @@ func (r *Raft) recoverFromCommittedLogs() error { if !r.fastRecovery { return nil } + // If the store implements CommitTrackingLogStore, we can read the commit index from the store. // This is useful when the store is able to track the commit index and we can avoid replaying logs. store, ok := r.logs.(CommitTrackingLogStore) if !ok { return nil } + commitIndex, err := store.GetCommitIndex() if err != nil { - return fmt.Errorf("failed to read commit index from store: %w", err) + r.logger.Error("failed to get commit index from store", "error", err) + panic(err) } - lastApplied := r.getLastApplied() - if commitIndex <= lastApplied { - return nil + + lastIndex, err := r.logs.LastIndex() + if err != nil { + r.logger.Error("failed to get last log index from store", "error", err) + panic(err) + } + if commitIndex > lastIndex { + commitIndex = lastIndex } r.setCommitIndex(commitIndex) From 7cd673285b466f04c9d2215bab514f04a97756db Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 9 Oct 2024 16:02:22 +0800 Subject: [PATCH 21/37] feat: add `CommitTrackingLogStore` interface check and adjust return type This commit adds a compile-time interface check for InmemCommitTrackingStore to ensure it implements the CommitTrackingLogStore interface. It also changes the return type of NewInmemCommitTrackingStore to be more specific. - Add interface check: var _ CommitTrackingLogStore = &InmemCommitTrackingStore{} - Change NewInmemCommitTrackingStore return type from CommitTrackingLogStore to *InmemCommitTrackingStore --- inmem_store.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/inmem_store.go b/inmem_store.go index 02eee4ad..14cc5c5e 100644 --- a/inmem_store.go +++ b/inmem_store.go @@ -9,6 +9,8 @@ import ( "sync/atomic" ) +var _ CommitTrackingLogStore = &InmemCommitTrackingStore{} + // InmemStore implements the LogStore and StableStore interface. // It should NOT EVER be used for production. It is used only for // unit tests. Use the MDBStore implementation instead. @@ -144,7 +146,7 @@ type InmemCommitTrackingStore struct { // NewInmemCommitTrackingStore returns a new in-memory backend that tracks the commit index. Do not ever // use for production. Only for testing. -func NewInmemCommitTrackingStore() CommitTrackingLogStore { +func NewInmemCommitTrackingStore() *InmemCommitTrackingStore { i := &InmemCommitTrackingStore{ InmemStore: *NewInmemStore(), } From 92c04a03757ba571de476b4467cd1704a647f9dd Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 9 Oct 2024 16:03:06 +0800 Subject: [PATCH 22/37] refactor: improve type assertion for log store in TestRaft_FastRecovery This commit enhances the type assertion for CommitTrackingLogStore in the TestRaft_FastRecovery function, adding explicit error handling for better test reliability and clarity. - Add explicit type assertion check for CommitTrackingLogStore interface - Introduce new variable 'store' to hold the asserted CommitTrackingLogStore - Add error handling to fail the test if the type assertion is unsuccessful --- raft_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/raft_test.go b/raft_test.go index 2908f9c7..88715b4a 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1254,7 +1254,11 @@ func TestRaft_FastRecovery(t *testing.T) { } c.rafts[0] = r - commitIdx, err := r.logs.(CommitTrackingLogStore).GetCommitIndex() + store, ok := r.logs.(CommitTrackingLogStore) + if !ok { + t.Fatal("err: raft log store does not implement CommitTrackingLogStore interface") + } + commitIdx, err := store.GetCommitIndex() // We should have applied all committed logs if last := r.getLastApplied(); last != commitIdx { t.Fatalf("bad last index: %d, expecting %d", last, commitIdx) From 8e8ba071fd7483d32c833d1df7921d3af2fb92bc Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 10 Oct 2024 20:37:04 +0800 Subject: [PATCH 23/37] feat: add warning log for unsupported fast recovery This commit adds a warning log message when fast recovery is enabled but the log store does not support the CommitTrackingLogStore interface. This provides better visibility into potential configuration issues. - Add warning log in recoverFromCommittedLogs when log store doesn't implement CommitTrackingLogStore - Include the type of the log store in the warning message for easier debugging --- api.go | 1 + 1 file changed, 1 insertion(+) diff --git a/api.go b/api.go index 77a4800a..9941dec8 100644 --- a/api.go +++ b/api.go @@ -715,6 +715,7 @@ func (r *Raft) recoverFromCommittedLogs() error { // This is useful when the store is able to track the commit index and we can avoid replaying logs. store, ok := r.logs.(CommitTrackingLogStore) if !ok { + r.logger.Warn("fast recovery enabled but log store does not support it", "log_store", fmt.Sprintf("%T", r.logs)) return nil } From 2020cab1ddbe20fcda6b276e1aafe5fcc9fcfc6d Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 10 Oct 2024 20:41:12 +0800 Subject: [PATCH 24/37] refactor: move commitIndex retrieve into `tryStageCommitIndex` --- raft.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/raft.go b/raft.go index 304c5aee..913b3ab8 100644 --- a/raft.go +++ b/raft.go @@ -1262,8 +1262,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { r.leaderState.inflight.PushBack(applyLog) } - commitIndex := r.getCommitIndex() - r.tryStageCommitIndex(commitIndex) + r.tryStageCommitIndex() // Write the log entry locally if err := r.logs.StoreLogs(logs); err != nil { @@ -1389,7 +1388,8 @@ func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple { } // tryStageCommitIndex updates the commit index in persist store if fast recovery is enabled and log store implements CommitTrackingLogStore. -func (r *Raft) tryStageCommitIndex(commitIndex uint64) { +func (r *Raft) tryStageCommitIndex() { + commitIndex := r.getCommitIndex() if !r.fastRecovery { return } From 2a7d584e15e44a58c4cc94abfd731bad57d6648f Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 10 Oct 2024 20:43:16 +0800 Subject: [PATCH 25/37] refactor: remove error from return field of recoverFromCommittedLogs --- api.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/api.go b/api.go index 9941dec8..c7d28a6b 100644 --- a/api.go +++ b/api.go @@ -706,9 +706,9 @@ func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool { } // recoverFromCommittedLogs recovers the Raft node from committed logs. -func (r *Raft) recoverFromCommittedLogs() error { +func (r *Raft) recoverFromCommittedLogs() { if !r.fastRecovery { - return nil + return } // If the store implements CommitTrackingLogStore, we can read the commit index from the store. @@ -716,7 +716,7 @@ func (r *Raft) recoverFromCommittedLogs() error { store, ok := r.logs.(CommitTrackingLogStore) if !ok { r.logger.Warn("fast recovery enabled but log store does not support it", "log_store", fmt.Sprintf("%T", r.logs)) - return nil + return } commitIndex, err := store.GetCommitIndex() @@ -736,7 +736,6 @@ func (r *Raft) recoverFromCommittedLogs() error { r.setCommitIndex(commitIndex) r.processLogs(commitIndex, nil) - return nil } func (r *Raft) config() Config { From bdac45bbccfdfa13a6e90ad660473a2f16ea13c2 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 11 Oct 2024 14:37:40 +0800 Subject: [PATCH 26/37] refactor: rename FastRecovery and revert the stageCommittedIdx change This commit renames the FastRecovery feature to RestoreCommittedLogs for better clarity and consistency. This is a breaking change as it modifies public API elements. - Rename Config.FastRecovery to Config.RestoreCommittedLogs - Rename Raft.fastRecovery to Raft.RestoreCommittedLogs - Update all references to fastRecovery in the codebase - Rename TestRaft_FastRecovery to TestRaft_RestoreCommittedLogs - Update comments to reflect the new terminology - Refactor tryStageCommitIndex to accept commitIndex as a parameter BREAKING CHANGE: Config.FastRecovery has been renamed to Config.RestoreCommittedLogs --- api.go | 8 ++++---- config.go | 4 ++-- raft.go | 8 ++++---- raft_test.go | 6 +++--- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/api.go b/api.go index c7d28a6b..008d6e6a 100644 --- a/api.go +++ b/api.go @@ -218,9 +218,9 @@ type Raft struct { // prevote feature is disabled if set to true. preVoteDisabled bool - // fastRecovery is used to enable fast recovery mode + // RestoreCommittedLogs is used to enable fast recovery mode // fast recovery mode is disabled if set to false. - fastRecovery bool + RestoreCommittedLogs bool } // BootstrapCluster initializes a server's storage with the given cluster @@ -570,7 +570,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna followerNotifyCh: make(chan struct{}, 1), mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second), preVoteDisabled: conf.PreVoteDisabled || !transportSupportPreVote, - fastRecovery: conf.FastRecovery, + RestoreCommittedLogs: conf.RestoreCommittedLogs, } if !transportSupportPreVote && !conf.PreVoteDisabled { r.logger.Warn("pre-vote is disabled because it is not supported by the Transport") @@ -707,7 +707,7 @@ func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool { // recoverFromCommittedLogs recovers the Raft node from committed logs. func (r *Raft) recoverFromCommittedLogs() { - if !r.fastRecovery { + if !r.RestoreCommittedLogs { return } diff --git a/config.go b/config.go index 50ca188c..975c5287 100644 --- a/config.go +++ b/config.go @@ -235,7 +235,7 @@ type Config struct { // PreVoteDisabled deactivate the pre-vote feature when set to true PreVoteDisabled bool - // FastRecovery controls if the Raft server should use the fast recovery + // RestoreCommittedLogs controls if the Raft server should use the fast recovery // mechanism. Fast recovery requires a LogStore implementation that // support commit tracking. When such a store is used and this config // enabled, raft nodes will replay all known-committed logs on disk @@ -243,7 +243,7 @@ type Config struct { // the application allows relaxed-consistency reads from followers as it // will reduce how far behind the follower's FSM is when it starts. If all reads // are forwarded to the leader then there won't be observable benefit from this feature. - FastRecovery bool + RestoreCommittedLogs bool // skipStartup allows NewRaft() to bypass all background work goroutines skipStartup bool diff --git a/raft.go b/raft.go index 913b3ab8..9554750f 100644 --- a/raft.go +++ b/raft.go @@ -1262,7 +1262,8 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { r.leaderState.inflight.PushBack(applyLog) } - r.tryStageCommitIndex() + commitIndex := r.getCommitIndex() + r.tryStageCommitIndex(commitIndex) // Write the log entry locally if err := r.logs.StoreLogs(logs); err != nil { @@ -1388,9 +1389,8 @@ func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple { } // tryStageCommitIndex updates the commit index in persist store if fast recovery is enabled and log store implements CommitTrackingLogStore. -func (r *Raft) tryStageCommitIndex() { - commitIndex := r.getCommitIndex() - if !r.fastRecovery { +func (r *Raft) tryStageCommitIndex(commitIndex uint64) { + if !r.RestoreCommittedLogs { return } store, ok := r.logs.(CommitTrackingLogStore) diff --git a/raft_test.go b/raft_test.go index 88715b4a..38b0730b 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1177,11 +1177,11 @@ func TestRaft_RestoreSnapshotOnStartup_CommitTrackingLogs(t *testing.T) { assert.Equal(t, lastIdx, last) } -func TestRaft_FastRecovery(t *testing.T) { +func TestRaft_RestoreCommittedLogs(t *testing.T) { // Make the cluster conf := inmemConfig(t) conf.TrailingLogs = 10 - conf.FastRecovery = true + conf.RestoreCommittedLogs = true opts := &MakeClusterOpts{ Peers: 1, Bootstrap: true, @@ -1227,7 +1227,7 @@ func TestRaft_FastRecovery(t *testing.T) { t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, firstIdx) } - // Commit a lot of things (for fast recovery test) + // Commit a lot of things (for restore committed logs test) for i := 0; i < 100; i++ { future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) } From ed47a253fce113220f50b0aec81d553c6f0d3ecc Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 11 Oct 2024 14:49:49 +0800 Subject: [PATCH 27/37] docs: documented GetCommitIndex in CommitTrackingLogStore interface --- log.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/log.go b/log.go index a11b26af..179a2a62 100644 --- a/log.go +++ b/log.go @@ -209,5 +209,9 @@ type CommitTrackingLogStore interface { // // idx is the new commit index to stage. StageCommitIndex(idx uint64) error + + // GetCommitIndex returns the latest persisted commit index from the latest log entry + // in the store at startup. + // It is ok to return a value higher than the last index in the log (But it should never happen). GetCommitIndex() (uint64, error) } From ad87d867cd15140e499b3536f0435f7146affef7 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 11 Oct 2024 15:12:38 +0800 Subject: [PATCH 28/37] docs: change fastRecovery flag to recoverCommittedLog in all documented comments --- api.go | 4 ++-- config.go | 4 ++-- raft.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/api.go b/api.go index 008d6e6a..b86b96fb 100644 --- a/api.go +++ b/api.go @@ -218,8 +218,8 @@ type Raft struct { // prevote feature is disabled if set to true. preVoteDisabled bool - // RestoreCommittedLogs is used to enable fast recovery mode - // fast recovery mode is disabled if set to false. + // RestoreCommittedLogs is used to enable restore committed logs mode + // restore committed logs mode is disabled if set to false. RestoreCommittedLogs bool } diff --git a/config.go b/config.go index 975c5287..c4260085 100644 --- a/config.go +++ b/config.go @@ -235,8 +235,8 @@ type Config struct { // PreVoteDisabled deactivate the pre-vote feature when set to true PreVoteDisabled bool - // RestoreCommittedLogs controls if the Raft server should use the fast recovery - // mechanism. Fast recovery requires a LogStore implementation that + // RestoreCommittedLogs controls if the Raft server should use the restore committed logs + // mechanism. Restore committed logs requires a LogStore implementation that // support commit tracking. When such a store is used and this config // enabled, raft nodes will replay all known-committed logs on disk // before completing `NewRaft` on startup. This is mainly useful where diff --git a/raft.go b/raft.go index 9554750f..d34a2b70 100644 --- a/raft.go +++ b/raft.go @@ -1388,7 +1388,7 @@ func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple { return nil } -// tryStageCommitIndex updates the commit index in persist store if fast recovery is enabled and log store implements CommitTrackingLogStore. +// tryStageCommitIndex updates the commit index in persist store if restore committed logs is enabled and log store implements CommitTrackingLogStore. func (r *Raft) tryStageCommitIndex(commitIndex uint64) { if !r.RestoreCommittedLogs { return From 30fc43eda5c27903e6400a9eb1232b99f357e540 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 11 Oct 2024 15:14:09 +0800 Subject: [PATCH 29/37] refactor: add a new ErrIncompatibleLogStore for recoverFromCommittedLogs --- api.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/api.go b/api.go index b86b96fb..9a9043e2 100644 --- a/api.go +++ b/api.go @@ -73,6 +73,10 @@ var ( // ErrLeadershipTransferInProgress is returned when the leader is rejecting // client requests because it is attempting to transfer leadership. ErrLeadershipTransferInProgress = errors.New("leadership transfer in progress") + + // ErrIncompatibleLogStore is returned when the log store does not support + // or implement some required methods. + ErrIncompatibleLogStore = errors.New("log store does not implement some required methods or malformed") ) // Raft implements a Raft node. @@ -590,7 +594,9 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna return nil, err } - r.recoverFromCommittedLogs() + if err := r.recoverFromCommittedLogs(); err != nil { + return nil, err + } // Scan through the log for any configuration change entries. snapshotIndex, _ := r.getLastSnapshot() @@ -706,29 +712,29 @@ func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool { } // recoverFromCommittedLogs recovers the Raft node from committed logs. -func (r *Raft) recoverFromCommittedLogs() { +func (r *Raft) recoverFromCommittedLogs() error { if !r.RestoreCommittedLogs { - return + return nil } // If the store implements CommitTrackingLogStore, we can read the commit index from the store. // This is useful when the store is able to track the commit index and we can avoid replaying logs. store, ok := r.logs.(CommitTrackingLogStore) if !ok { - r.logger.Warn("fast recovery enabled but log store does not support it", "log_store", fmt.Sprintf("%T", r.logs)) - return + r.logger.Warn("restore committed logs enabled but log store does not support it", "log_store", fmt.Sprintf("%T", r.logs)) + return ErrIncompatibleLogStore } commitIndex, err := store.GetCommitIndex() if err != nil { r.logger.Error("failed to get commit index from store", "error", err) - panic(err) + return err } lastIndex, err := r.logs.LastIndex() if err != nil { r.logger.Error("failed to get last log index from store", "error", err) - panic(err) + return err } if commitIndex > lastIndex { commitIndex = lastIndex @@ -736,6 +742,7 @@ func (r *Raft) recoverFromCommittedLogs() { r.setCommitIndex(commitIndex) r.processLogs(commitIndex, nil) + return nil } func (r *Raft) config() Config { From e797962bc23abf1b4139157dac7f646f7af3a8dc Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 11 Oct 2024 15:38:48 +0800 Subject: [PATCH 30/37] docs: clarify RestoreCommittedLogs configuration requirement - Add a notice in the Config struct documentation for RestoreCommittedLogs - Specify that Raft will fail to start with ErrIncompatibleLogStore if the requirement is not met --- config.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/config.go b/config.go index c4260085..94d25865 100644 --- a/config.go +++ b/config.go @@ -243,6 +243,9 @@ type Config struct { // the application allows relaxed-consistency reads from followers as it // will reduce how far behind the follower's FSM is when it starts. If all reads // are forwarded to the leader then there won't be observable benefit from this feature. + // + // Notice: If this is enabled, the log store MUST implement the CommitTrackingLogStore + // interface. Otherwise, Raft will fail to start and return ErrIncompatibleLogStore. RestoreCommittedLogs bool // skipStartup allows NewRaft() to bypass all background work goroutines From 500567fca085e868ced6ed2aa1fe9572ab6d0358 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 11 Oct 2024 16:23:25 +0800 Subject: [PATCH 31/37] refactor: rename recoverFromCommittedLogs to restoreFromCommittedLogs --- api.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api.go b/api.go index 9a9043e2..44cf561f 100644 --- a/api.go +++ b/api.go @@ -594,7 +594,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna return nil, err } - if err := r.recoverFromCommittedLogs(); err != nil { + if err := r.restoreFromCommittedLogs(); err != nil { return nil, err } @@ -711,8 +711,8 @@ func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool { return true } -// recoverFromCommittedLogs recovers the Raft node from committed logs. -func (r *Raft) recoverFromCommittedLogs() error { +// restoreFromCommittedLogs recovers the Raft node from committed logs. +func (r *Raft) restoreFromCommittedLogs() error { if !r.RestoreCommittedLogs { return nil } From cfffcb5b7e506de502c3c52e9c88ea87958e2737 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 11 Oct 2024 16:26:40 +0800 Subject: [PATCH 32/37] refactor!: update MakeCluster functions to return error - Update makeCluster to return (*cluster, error) - Modify MakeCluster, MakeClusterNoBootstrap, and MakeClusterCustom to return error - Update all test cases to handle potential errors from cluster creation - Replace t.Fatalf() calls with t.Logf() and error returns in makeCluster BREAKING CHANGE: MakeCluster, MakeClusterNoBootstrap, and MakeClusterCustom now return an additional error value, which needs to be handled in existing tests. --- raft_test.go | 278 +++++++++++++++++++++++++++++++++------------------ testing.go | 22 ++-- 2 files changed, 192 insertions(+), 108 deletions(-) diff --git a/raft_test.go b/raft_test.go index 38b0730b..b7dd8757 100644 --- a/raft_test.go +++ b/raft_test.go @@ -23,12 +23,14 @@ import ( ) func TestRaft_StartStop(t *testing.T) { - c := MakeCluster(1, t, nil) + c, err := MakeCluster(1, t, nil) + require.NoError(t, err) c.Close() } func TestRaft_AfterShutdown(t *testing.T) { - c := MakeCluster(1, t, nil) + c, err := MakeCluster(1, t, nil) + require.NoError(t, err) c.Close() raft := c.rafts[0] @@ -64,7 +66,8 @@ func TestRaft_AfterShutdown(t *testing.T) { func TestRaft_LiveBootstrap(t *testing.T) { // Make the cluster. - c := MakeClusterNoBootstrap(3, t, nil) + c, err := MakeClusterNoBootstrap(3, t, nil) + require.NoError(t, err) defer c.Close() // Build the configuration. @@ -104,7 +107,8 @@ func TestRaft_LiveBootstrap(t *testing.T) { func TestRaft_LiveBootstrap_From_NonVoter(t *testing.T) { // Make the cluster. - c := MakeClusterNoBootstrap(2, t, nil) + c, err := MakeClusterNoBootstrap(2, t, nil) + require.NoError(t, err) defer c.Close() // Build the configuration. @@ -128,7 +132,8 @@ func TestRaft_LiveBootstrap_From_NonVoter(t *testing.T) { } func TestRaft_RecoverCluster_NoState(t *testing.T) { - c := MakeClusterNoBootstrap(1, t, nil) + c, err := MakeClusterNoBootstrap(1, t, nil) + require.NoError(t, err) defer c.Close() r := c.rafts[0] @@ -141,8 +146,9 @@ func TestRaft_RecoverCluster_NoState(t *testing.T) { }, } cfg := r.config() - err := RecoverCluster(&cfg, &MockFSM{}, r.logs, r.stable, + err = RecoverCluster(&cfg, &MockFSM{}, r.logs, r.stable, r.snapshots, r.trans, configuration) + require.Error(t, err) if err == nil || !strings.Contains(err.Error(), "no initial state") { t.Fatalf("should have failed for no initial state: %v", err) } @@ -155,7 +161,8 @@ func TestRaft_RecoverCluster(t *testing.T) { conf := inmemConfig(t) conf.TrailingLogs = 10 conf.SnapshotThreshold = uint64(snapshotThreshold) - c := MakeCluster(3, t, conf) + c, err := MakeCluster(3, t, conf) + require.NoError(t, err) defer c.Close() // Perform some commits. @@ -255,11 +262,13 @@ func TestRaft_RecoverCluster(t *testing.T) { func TestRaft_HasExistingState(t *testing.T) { var err error // Make a cluster. - c := MakeCluster(2, t, nil) + c, err := MakeCluster(2, t, nil) + require.NoError(t, err) defer c.Close() // Make a new cluster of 1. - c1 := MakeClusterNoBootstrap(1, t, nil) + c1, err := MakeClusterNoBootstrap(1, t, nil) + require.NoError(t, err) // Make sure the initial state is clean. var hasState bool @@ -296,7 +305,8 @@ func TestRaft_HasExistingState(t *testing.T) { func TestRaft_SingleNode(t *testing.T) { conf := inmemConfig(t) - c := MakeCluster(1, t, conf) + c, err := MakeCluster(1, t, conf) + require.NoError(t, err) defer c.Close() raft := c.rafts[0] @@ -339,7 +349,8 @@ func TestRaft_SingleNode(t *testing.T) { func TestRaft_TripleNode(t *testing.T) { // Make the cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Should be one leader @@ -357,7 +368,8 @@ func TestRaft_TripleNode(t *testing.T) { func TestRaft_LeaderFail(t *testing.T) { // Make the cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Should be one leader @@ -434,7 +446,8 @@ func TestRaft_LeaderFail(t *testing.T) { func TestRaft_BehindFollower(t *testing.T) { // Make the cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Disconnect one follower @@ -474,7 +487,8 @@ func TestRaft_BehindFollower(t *testing.T) { func TestRaft_ApplyNonLeader(t *testing.T) { // Make the cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Wait for a leader @@ -504,7 +518,8 @@ func TestRaft_ApplyConcurrent(t *testing.T) { conf := inmemConfig(t) conf.HeartbeatTimeout = 2 * conf.HeartbeatTimeout conf.ElectionTimeout = 2 * conf.ElectionTimeout - c := MakeCluster(3, t, conf) + c, err := MakeCluster(3, t, conf) + require.NoError(t, err) defer c.Close() // Wait for a leader @@ -556,7 +571,8 @@ func TestRaft_ApplyConcurrent_Timeout(t *testing.T) { conf.CommitTimeout = 1 * time.Millisecond conf.HeartbeatTimeout = 2 * conf.HeartbeatTimeout conf.ElectionTimeout = 2 * conf.ElectionTimeout - c := MakeCluster(1, t, conf) + c, err := MakeCluster(1, t, conf) + require.NoError(t, err) defer c.Close() // Wait for a leader @@ -592,11 +608,13 @@ func TestRaft_ApplyConcurrent_Timeout(t *testing.T) { func TestRaft_JoinNode(t *testing.T) { // Make a cluster - c := MakeCluster(2, t, nil) + c, err := MakeCluster(2, t, nil) + require.NoError(t, err) defer c.Close() // Make a new cluster of 1 - c1 := MakeClusterNoBootstrap(1, t, nil) + c1, err := MakeClusterNoBootstrap(1, t, nil) + require.NoError(t, err) // Merge clusters c.Merge(c1) @@ -621,28 +639,30 @@ func TestRaft_JoinNode(t *testing.T) { func TestRaft_JoinNode_ConfigStore(t *testing.T) { // Make a cluster conf := inmemConfig(t) - c := makeCluster(t, &MakeClusterOpts{ + c, err := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: true, Conf: conf, ConfigStoreFSM: true, }) + require.NoError(t, err) defer c.Close() // Make a new nodes - c1 := makeCluster(t, &MakeClusterOpts{ + c1, err := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: false, Conf: conf, ConfigStoreFSM: true, }) - c2 := makeCluster(t, &MakeClusterOpts{ + require.NoError(t, err) + c2, err := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: false, Conf: conf, ConfigStoreFSM: true, }) - + require.NoError(t, err) // Merge clusters c.Merge(c1) c.Merge(c2) @@ -688,7 +708,8 @@ func TestRaft_JoinNode_ConfigStore(t *testing.T) { func TestRaft_RemoveFollower(t *testing.T) { // Make a cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Get the leader @@ -729,7 +750,8 @@ func TestRaft_RemoveFollower(t *testing.T) { func TestRaft_RemoveLeader(t *testing.T) { // Make a cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Get the leader @@ -779,7 +801,8 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { // Make a cluster conf := inmemConfig(t) conf.ShutdownOnRemove = false - c := MakeCluster(3, t, conf) + c, err := MakeCluster(3, t, conf) + require.NoError(t, err) defer c.Close() // Get the leader @@ -841,7 +864,8 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { func TestRaft_RemoveFollower_SplitCluster(t *testing.T) { // Make a cluster. conf := inmemConfig(t) - c := MakeCluster(4, t, conf) + c, err := MakeCluster(4, t, conf) + require.NoError(t, err) defer c.Close() // Wait for a leader to get elected. @@ -877,7 +901,8 @@ func TestRaft_RemoveFollower_SplitCluster(t *testing.T) { func TestRaft_AddKnownPeer(t *testing.T) { // Make a cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Get the leader @@ -916,7 +941,8 @@ func TestRaft_AddKnownPeer(t *testing.T) { func TestRaft_RemoveUnknownPeer(t *testing.T) { // Make a cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Get the leader @@ -957,7 +983,8 @@ func TestRaft_SnapshotRestore(t *testing.T) { // Make the cluster conf := inmemConfig(t) conf.TrailingLogs = 10 - c := MakeCluster(1, t, conf) + c, err := MakeCluster(1, t, conf) + require.NoError(t, err) defer c.Close() // Commit a lot of things @@ -1001,7 +1028,7 @@ func TestRaft_SnapshotRestore(t *testing.T) { // Can't just reuse the old transport as it will be closed _, trans2 := NewInmemTransport(r.trans.LocalAddr()) cfg := r.config() - r, err := NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) + r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) if err != nil { t.Fatalf("err: %v", err) } @@ -1023,7 +1050,8 @@ func TestRaft_RestoreSnapshotOnStartup_Monotonic(t *testing.T) { Conf: conf, MonotonicLogs: true, } - c := MakeClusterCustom(t, opts) + c, err := MakeClusterCustom(t, opts) + require.NoError(t, err) defer c.Close() leader := c.Leader() @@ -1105,7 +1133,8 @@ func TestRaft_RestoreSnapshotOnStartup_CommitTrackingLogs(t *testing.T) { Conf: conf, CommitTrackingLogs: true, } - c := MakeClusterCustom(t, opts) + c, err := MakeClusterCustom(t, opts) + require.NoError(t, err) defer c.Close() leader := c.Leader() @@ -1188,7 +1217,8 @@ func TestRaft_RestoreCommittedLogs(t *testing.T) { Conf: conf, CommitTrackingLogs: true, } - c := MakeClusterCustom(t, opts) + c, err := MakeClusterCustom(t, opts) + require.NoError(t, err) defer c.Close() leader := c.Leader() @@ -1277,7 +1307,8 @@ func TestRaft_SnapshotRestore_Progress(t *testing.T) { // Make the cluster conf := inmemConfig(t) conf.TrailingLogs = 10 - c := MakeCluster(1, t, conf) + c, err := MakeCluster(1, t, conf) + require.NoError(t, err) defer c.Close() // Commit a lot of things @@ -1330,7 +1361,7 @@ func TestRaft_SnapshotRestore_Progress(t *testing.T) { Level: hclog.Info, Output: &logbuf, }) - r, err := NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) + r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) if err != nil { t.Fatalf("err: %v", err) } @@ -1396,7 +1427,8 @@ func TestRaft_NoRestoreOnStart(t *testing.T) { conf := inmemConfig(t) conf.TrailingLogs = 10 conf.NoSnapshotRestoreOnStart = true - c := MakeCluster(1, t, conf) + c, err := MakeCluster(1, t, conf) + require.NoError(t, err) // Commit a lot of things. leader := c.Leader() @@ -1425,7 +1457,7 @@ func TestRaft_NoRestoreOnStart(t *testing.T) { _, trans := NewInmemTransport(leader.localAddr) newFSM := &MockFSM{} cfg := leader.config() - _, err := NewRaft(&cfg, newFSM, leader.logs, leader.stable, leader.snapshots, trans) + _, err = NewRaft(&cfg, newFSM, leader.logs, leader.stable, leader.snapshots, trans) if err != nil { t.Fatalf("err: %v", err) } @@ -1441,7 +1473,8 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { conf := inmemConfig(t) conf.ProtocolVersion = 1 conf.TrailingLogs = 10 - c := MakeCluster(3, t, conf) + c, err := MakeCluster(3, t, conf) + require.NoError(t, err) defer c.Close() // Commit a lot of things. @@ -1469,7 +1502,8 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { } // Make a separate cluster. - c2 := MakeClusterNoBootstrap(2, t, conf) + c2, err := MakeClusterNoBootstrap(2, t, conf) + require.NoError(t, err) defer c2.Close() // Kill the old cluster. @@ -1547,7 +1581,8 @@ func TestRaft_AutoSnapshot(t *testing.T) { conf.SnapshotInterval = conf.CommitTimeout * 2 conf.SnapshotThreshold = 50 conf.TrailingLogs = 10 - c := MakeCluster(1, t, conf) + c, err := MakeCluster(1, t, conf) + require.NoError(t, err) defer c.Close() // Commit a lot of things @@ -1576,7 +1611,8 @@ func TestRaft_UserSnapshot(t *testing.T) { conf := inmemConfig(t) conf.SnapshotThreshold = 50 conf.TrailingLogs = 10 - c := MakeCluster(1, t, conf) + c, err := MakeCluster(1, t, conf) + require.NoError(t, err) defer c.Close() // With nothing committed, asking for a snapshot should return an error. @@ -1622,6 +1658,7 @@ func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, res conf.LeaderLeaseTimeout = 500 * time.Millisecond var c *cluster + var err error numPeers := 3 optsMonotonic := &MakeClusterOpts{ Peers: numPeers, @@ -1630,9 +1667,11 @@ func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, res MonotonicLogs: true, } if monotonicLogStore { - c = MakeClusterCustom(t, optsMonotonic) + c, err = MakeClusterCustom(t, optsMonotonic) + require.NoError(t, err) } else { - c = MakeCluster(numPeers, t, conf) + c, err = MakeCluster(numPeers, t, conf) + require.NoError(t, err) } defer c.Close() @@ -1666,9 +1705,11 @@ func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, res if restoreNewCluster { var c2 *cluster if monotonicLogStore { - c2 = MakeClusterCustom(t, optsMonotonic) + c2, err = MakeClusterCustom(t, optsMonotonic) + require.NoError(t, err) } else { - c2 = MakeCluster(numPeers, t, conf) + c2, err = MakeCluster(numPeers, t, conf) + require.NoError(t, err) } c = c2 leader = c.Leader() @@ -1777,7 +1818,8 @@ func TestRaft_SendSnapshotFollower(t *testing.T) { // Make the cluster conf := inmemConfig(t) conf.TrailingLogs = 10 - c := MakeCluster(3, t, conf) + c, err := MakeCluster(3, t, conf) + require.NoError(t, err) defer c.Close() // Disconnect one follower @@ -1819,7 +1861,8 @@ func TestRaft_SendSnapshotAndLogsFollower(t *testing.T) { // Make the cluster conf := inmemConfig(t) conf.TrailingLogs = 10 - c := MakeCluster(3, t, conf) + c, err := MakeCluster(3, t, conf) + require.NoError(t, err) defer c.Close() // Disconnect one follower @@ -1873,7 +1916,8 @@ func TestRaft_ReJoinFollower(t *testing.T) { // Enable operation after a remove. conf := inmemConfig(t) conf.ShutdownOnRemove = false - c := MakeCluster(3, t, conf) + c, err := MakeCluster(3, t, conf) + require.NoError(t, err) defer c.Close() // Get the leader. @@ -1949,7 +1993,8 @@ func TestRaft_ReJoinFollower(t *testing.T) { func TestRaft_LeaderLeaseExpire(t *testing.T) { // Make a cluster conf := inmemConfig(t) - c := MakeCluster(2, t, conf) + c, err := MakeCluster(2, t, conf) + require.NoError(t, err) defer c.Close() // Get the leader @@ -2015,7 +2060,8 @@ LOOP: func TestRaft_Barrier(t *testing.T) { // Make the cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Get the leader @@ -2043,7 +2089,8 @@ func TestRaft_Barrier(t *testing.T) { func TestRaft_VerifyLeader(t *testing.T) { // Make the cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Get the leader @@ -2060,7 +2107,8 @@ func TestRaft_VerifyLeader(t *testing.T) { func TestRaft_VerifyLeader_Single(t *testing.T) { // Make the cluster - c := MakeCluster(1, t, nil) + c, err := MakeCluster(1, t, nil) + require.NoError(t, err) defer c.Close() // Get the leader @@ -2078,7 +2126,8 @@ func TestRaft_VerifyLeader_Single(t *testing.T) { func TestRaft_VerifyLeader_Fail(t *testing.T) { // Make a cluster conf := inmemConfig(t) - c := MakeCluster(2, t, conf) + c, err := MakeCluster(2, t, conf) + require.NoError(t, err) defer c.Close() leader := c.Leader() @@ -2118,7 +2167,8 @@ func TestRaft_VerifyLeader_Fail(t *testing.T) { func TestRaft_VerifyLeader_PartialConnect(t *testing.T) { // Make a cluster conf := inmemConfig(t) - c := MakeCluster(3, t, conf) + c, err := MakeCluster(3, t, conf) + require.NoError(t, err) defer c.Close() // Get the leader @@ -2153,7 +2203,8 @@ func TestRaft_NotifyCh(t *testing.T) { ch := make(chan bool, 1) conf := inmemConfig(t) conf.NotifyCh = ch - c := MakeCluster(1, t, conf) + c, err := MakeCluster(1, t, conf) + require.NoError(t, err) defer c.Close() // Watch leaderCh for change @@ -2181,7 +2232,8 @@ func TestRaft_NotifyCh(t *testing.T) { } func TestRaft_AppendEntry(t *testing.T) { - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() followers := c.Followers() ldr := c.Leader() @@ -2270,13 +2322,15 @@ func TestRaft_PreVoteMixedCluster(t *testing.T) { conf := inmemConfig(t) conf.PreVoteDisabled = tc.prevoteNum <= tc.noprevoteNum - c := MakeCluster(majority, t, conf) + c, err := MakeCluster(majority, t, conf) + require.NoError(t, err) defer c.Close() // Set up another server speaking protocol version 2. conf = inmemConfig(t) conf.PreVoteDisabled = tc.prevoteNum >= tc.noprevoteNum - c1 := MakeClusterNoBootstrap(minority, t, conf) + c1, err := MakeClusterNoBootstrap(minority, t, conf) + require.NoError(t, err) // Merge clusters. c.Merge(c1) @@ -2305,7 +2359,8 @@ func TestRaft_PreVoteAvoidElectionWithPartition(t *testing.T) { // Make a prevote cluster. conf := inmemConfig(t) conf.PreVoteDisabled = false - c := MakeCluster(5, t, conf) + c, err := MakeCluster(5, t, conf) + require.NoError(t, err) defer c.Close() oldLeaderTerm := c.Leader().getCurrentTerm() @@ -2335,7 +2390,8 @@ func TestRaft_PreVoteAvoidElectionWithPartition(t *testing.T) { func TestRaft_VotingGrant_WhenLeaderAvailable(t *testing.T) { conf := inmemConfig(t) conf.ProtocolVersion = 3 - c := MakeCluster(3, t, conf) + c, err := MakeCluster(3, t, conf) + require.NoError(t, err) defer c.Close() followers := c.Followers() ldr := c.Leader() @@ -2380,7 +2436,8 @@ func TestRaft_VotingGrant_WhenLeaderAvailable(t *testing.T) { } func TestRaft_ProtocolVersion_RejectRPC(t *testing.T) { - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() followers := c.Followers() ldr := c.Leader() @@ -2398,7 +2455,7 @@ func TestRaft_ProtocolVersion_RejectRPC(t *testing.T) { // Reject a message from a future version we don't understand. var resp RequestVoteResponse - err := ldrT.RequestVote(followers[0].localID, followers[0].localAddr, &reqVote, &resp) + err = ldrT.RequestVote(followers[0].localID, followers[0].localAddr, &reqVote, &resp) if err == nil || !strings.Contains(err.Error(), "protocol version") { t.Fatalf("expected RPC to get rejected: %v", err) } @@ -2415,13 +2472,15 @@ func TestRaft_ProtocolVersion_Upgrade_1_2(t *testing.T) { // Make a cluster back on protocol version 1. conf := inmemConfig(t) conf.ProtocolVersion = 1 - c := MakeCluster(2, t, conf) + c, err := MakeCluster(2, t, conf) + require.NoError(t, err) defer c.Close() // Set up another server speaking protocol version 2. conf = inmemConfig(t) conf.ProtocolVersion = 2 - c1 := MakeClusterNoBootstrap(1, t, conf) + c1, err := MakeClusterNoBootstrap(1, t, conf) + require.NoError(t, err) // Merge clusters. c.Merge(c1) @@ -2458,14 +2517,16 @@ func TestRaft_ProtocolVersion_Upgrade_2_3(t *testing.T) { // Make a cluster back on protocol version 2. conf := inmemConfig(t) conf.ProtocolVersion = 2 - c := MakeCluster(2, t, conf) + c, err := MakeCluster(2, t, conf) + require.NoError(t, err) defer c.Close() oldAddr := c.Followers()[0].localAddr // Set up another server speaking protocol version 3. conf = inmemConfig(t) conf.ProtocolVersion = 3 - c1 := MakeClusterNoBootstrap(1, t, conf) + c1, err := MakeClusterNoBootstrap(1, t, conf) + require.NoError(t, err) // Merge clusters. c.Merge(c1) @@ -2491,9 +2552,10 @@ func TestRaft_ProtocolVersion_Upgrade_2_3(t *testing.T) { func TestRaft_LeaderID_Propagated(t *testing.T) { // Make a cluster on protocol version 3. conf := inmemConfig(t) - c := MakeCluster(3, t, conf) + c, err := MakeCluster(3, t, conf) + require.NoError(t, err) defer c.Close() - err := waitForLeader(c) + err = waitForLeader(c) require.NoError(t, err) for _, n := range c.rafts { @@ -2578,13 +2640,14 @@ func TestRaft_LeadershipTransferPickServer(t *testing.T) { } func TestRaft_LeadershipTransfer(t *testing.T) { - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() oldLeader := string(c.Leader().localID) - err := c.Leader().LeadershipTransfer() - if err.Error() != nil { - t.Fatalf("Didn't expect error: %v", err.Error()) + future := c.Leader().LeadershipTransfer() + if future.Error() != nil { + t.Fatalf("Didn't expect error: %v", future.Error()) } newLeader := string(c.Leader().localID) if oldLeader == newLeader { @@ -2593,7 +2656,8 @@ func TestRaft_LeadershipTransfer(t *testing.T) { } func TestRaft_LeadershipTransferWithOneNode(t *testing.T) { - c := MakeCluster(1, t, nil) + c, err := MakeCluster(1, t, nil) + require.NoError(t, err) defer c.Close() future := c.Leader().LeadershipTransfer() @@ -2611,7 +2675,8 @@ func TestRaft_LeadershipTransferWithOneNode(t *testing.T) { func TestRaft_LeadershipTransferWithWrites(t *testing.T) { conf := inmemConfig(t) conf.Logger = hclog.New(&hclog.LoggerOptions{Level: hclog.Trace}) - c := MakeCluster(7, t, conf) + c, err := MakeCluster(7, t, conf) + require.NoError(t, err) defer c.Close() doneCh := make(chan struct{}) @@ -2664,7 +2729,8 @@ func TestRaft_LeadershipTransferWithWrites(t *testing.T) { } func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { - c := MakeCluster(7, t, nil) + c, err := MakeCluster(7, t, nil) + require.NoError(t, err) defer c.Close() follower := c.GetInState(Follower)[0] @@ -2678,7 +2744,8 @@ func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { } func TestRaft_LeadershipTransferToInvalidID(t *testing.T) { - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() future := c.Leader().LeadershipTransferToServer(ServerID("abc"), ServerAddress("localhost")) @@ -2694,7 +2761,8 @@ func TestRaft_LeadershipTransferToInvalidID(t *testing.T) { } func TestRaft_LeadershipTransferToInvalidAddress(t *testing.T) { - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() follower := c.GetInState(Follower)[0] @@ -2710,7 +2778,8 @@ func TestRaft_LeadershipTransferToInvalidAddress(t *testing.T) { } func TestRaft_LeadershipTransferToBehindServer(t *testing.T) { - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() l := c.Leader() @@ -2731,7 +2800,8 @@ func TestRaft_LeadershipTransferToBehindServer(t *testing.T) { } func TestRaft_LeadershipTransferToItself(t *testing.T) { - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() l := c.Leader() @@ -2748,7 +2818,8 @@ func TestRaft_LeadershipTransferToItself(t *testing.T) { } func TestRaft_LeadershipTransferLeaderRejectsClientRequests(t *testing.T) { - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() l := c.Leader() l.setLeadershipTransferInProgress(true) @@ -2782,7 +2853,8 @@ func TestRaft_LeadershipTransferLeaderRejectsClientRequests(t *testing.T) { } func TestRaft_LeadershipTransferLeaderReplicationTimeout(t *testing.T) { - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() l := c.Leader() @@ -2812,7 +2884,8 @@ func TestRaft_LeadershipTransferLeaderReplicationTimeout(t *testing.T) { } func TestRaft_LeadershipTransferIgnoresNonvoters(t *testing.T) { - c := MakeCluster(2, t, nil) + c, err := MakeCluster(2, t, nil) + require.NoError(t, err) defer c.Close() follower := c.Followers()[0] @@ -2849,7 +2922,8 @@ func TestRaft_LeadershipTransferStopRightAway(t *testing.T) { } func TestRaft_GetConfigurationNoBootstrap(t *testing.T) { - c := MakeCluster(2, t, nil) + c, err := MakeCluster(2, t, nil) + require.NoError(t, err) defer c.Close() // Should be one leader @@ -2886,7 +2960,8 @@ func TestRaft_GetConfigurationNoBootstrap(t *testing.T) { } func TestRaft_LogStoreIsMonotonic(t *testing.T) { - c := MakeCluster(1, t, nil) + c, err := MakeCluster(1, t, nil) + require.NoError(t, err) defer c.Close() // Should be one leader @@ -2921,7 +2996,8 @@ func TestRaft_LogStoreIsMonotonic(t *testing.T) { } func TestRaft_CacheLogWithStoreError(t *testing.T) { - c := MakeCluster(2, t, nil) + c, err := MakeCluster(2, t, nil) + require.NoError(t, err) defer c.Close() // Should be one leader @@ -2973,7 +3049,8 @@ func TestRaft_CacheLogWithStoreError(t *testing.T) { func TestRaft_ReloadConfig(t *testing.T) { conf := inmemConfig(t) conf.LeaderLeaseTimeout = 40 * time.Millisecond - c := MakeCluster(1, t, conf) + c, err := MakeCluster(1, t, conf) + require.NoError(t, err) defer c.Close() raft := c.rafts[0] @@ -3003,7 +3080,8 @@ func TestRaft_ReloadConfig(t *testing.T) { func TestRaft_ReloadConfigValidates(t *testing.T) { conf := inmemConfig(t) - c := MakeCluster(1, t, conf) + c, err := MakeCluster(1, t, conf) + require.NoError(t, err) defer c.Close() raft := c.rafts[0] @@ -3079,8 +3157,8 @@ func TestRaft_InstallSnapshot_InvalidPeers(t *testing.T) { func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { // Make a cluster - c := MakeCluster(3, t, nil) - + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Get the leader @@ -3151,8 +3229,8 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { func TestRaft_ClusterCanRegainStability_WhenNonVoterWithHigherTermJoin(t *testing.T) { // Make a cluster - c := MakeCluster(3, t, nil) - + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() // Get the leader @@ -3246,10 +3324,10 @@ func TestRaft_FollowerRemovalNoElection(t *testing.T) { inmemConf := inmemConfig(t) inmemConf.HeartbeatTimeout = 100 * time.Millisecond inmemConf.ElectionTimeout = 100 * time.Millisecond - c := MakeCluster(3, t, inmemConf) - + c, err := MakeCluster(3, t, inmemConf) + require.NoError(t, err) defer c.Close() - err := waitForLeader(c) + err = waitForLeader(c) require.NoError(t, err) leader := c.Leader() @@ -3399,9 +3477,10 @@ func TestRaft_runFollower_ReloadTimeoutConfigs(t *testing.T) { func TestRaft_PreVote_ShouldNotRejectLeader(t *testing.T) { // Make a cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() - err := waitForLeader(c) + err = waitForLeader(c) require.NoError(t, err) leader := c.Leader() @@ -3429,7 +3508,7 @@ func TestRaft_PreVote_ShouldNotRejectLeader(t *testing.T) { var resp RequestPreVoteResponse leaderT := c.trans[c.IndexOf(leader)] - if err := leaderT.RequestPreVote(follower.localID, follower.localAddr, &reqPreVote, &resp); err != nil { + if err = leaderT.RequestPreVote(follower.localID, follower.localAddr, &reqPreVote, &resp); err != nil { t.Fatalf("RequestPreVote RPC failed %v", err) } @@ -3441,9 +3520,10 @@ func TestRaft_PreVote_ShouldNotRejectLeader(t *testing.T) { func TestRaft_PreVote_ShouldRejectNonLeader(t *testing.T) { // Make a cluster - c := MakeCluster(3, t, nil) + c, err := MakeCluster(3, t, nil) + require.NoError(t, err) defer c.Close() - err := waitForLeader(c) + err = waitForLeader(c) require.NoError(t, err) // Wait until we have 2 followers diff --git a/testing.go b/testing.go index b8647e75..9c75a4e4 100644 --- a/testing.go +++ b/testing.go @@ -731,7 +731,7 @@ type MakeClusterOpts struct { // If bootstrap is true, the servers will know about each other before starting, // otherwise their transports will be wired up but they won't yet have configured // each other. -func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { +func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { if opts.Conf == nil { opts.Conf = inmemConfig(t) } @@ -757,7 +757,8 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { for i := 0; i < opts.Peers; i++ { dir, err := os.MkdirTemp("", "raft") if err != nil { - t.Fatalf("err: %v", err) + t.Logf("err: %v", err) + return nil, err } store := NewInmemStore() @@ -819,27 +820,30 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { if opts.Bootstrap { err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration) if err != nil { - t.Fatalf("BootstrapCluster failed: %v", err) + t.Logf("BootstrapCluster failed: %v", err) + return nil, err } } raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans) if err != nil { - t.Fatalf("NewRaft failed: %v", err) + t.Logf("NewRaft failed: %v", err) + return nil, err } raft.RegisterObserver(NewObserver(c.observationCh, false, nil)) if err != nil { - t.Fatalf("RegisterObserver failed: %v", err) + t.Logf("RegisterObserver failed: %v", err) + return nil, err } c.rafts = append(c.rafts, raft) } - return c + return c, nil } // NOTE: This is exposed for middleware testing purposes and is not a stable API -func MakeCluster(n int, t *testing.T, conf *Config) *cluster { +func MakeCluster(n int, t *testing.T, conf *Config) (*cluster, error) { return makeCluster(t, &MakeClusterOpts{ Peers: n, Bootstrap: true, @@ -848,7 +852,7 @@ func MakeCluster(n int, t *testing.T, conf *Config) *cluster { } // NOTE: This is exposed for middleware testing purposes and is not a stable API -func MakeClusterNoBootstrap(n int, t *testing.T, conf *Config) *cluster { +func MakeClusterNoBootstrap(n int, t *testing.T, conf *Config) (*cluster, error) { return makeCluster(t, &MakeClusterOpts{ Peers: n, Conf: conf, @@ -856,7 +860,7 @@ func MakeClusterNoBootstrap(n int, t *testing.T, conf *Config) *cluster { } // NOTE: This is exposed for middleware testing purposes and is not a stable API -func MakeClusterCustom(t *testing.T, opts *MakeClusterOpts) *cluster { +func MakeClusterCustom(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { return makeCluster(t, opts) } From 560c0b924b9a423fc4cc23f74867b680d830d36d Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 11 Oct 2024 16:27:37 +0800 Subject: [PATCH 33/37] test: add test for RestoreCommittedLogs with incompatible log store --- raft_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/raft_test.go b/raft_test.go index b7dd8757..a9b74acf 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1303,6 +1303,21 @@ func TestRaft_RestoreCommittedLogs(t *testing.T) { assert.LessOrEqual(t, commitIdx, lastIdx) } +func TestRaft_RestoreCommittedLogs_IncompatibleLogStore(t *testing.T) { + // Make the cluster + conf := inmemConfig(t) + conf.TrailingLogs = 10 + conf.RestoreCommittedLogs = true + opts := &MakeClusterOpts{ + Peers: 1, + Bootstrap: true, + Conf: conf, + CommitTrackingLogs: false, + } + _, err := MakeClusterCustom(t, opts) + require.ErrorIs(t, err, ErrIncompatibleLogStore) +} + func TestRaft_SnapshotRestore_Progress(t *testing.T) { // Make the cluster conf := inmemConfig(t) From 8c722fafaa88170cbeea1fc7a2ada657b3d15d84 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 12 Oct 2024 23:47:35 +0800 Subject: [PATCH 34/37] Revert "refactor!: update MakeCluster functions to return error" This reverts commit cfffcb5b7e506de502c3c52e9c88ea87958e2737. --- raft_test.go | 278 ++++++++++++++++++--------------------------------- testing.go | 22 ++-- 2 files changed, 108 insertions(+), 192 deletions(-) diff --git a/raft_test.go b/raft_test.go index a9b74acf..0bd22e67 100644 --- a/raft_test.go +++ b/raft_test.go @@ -23,14 +23,12 @@ import ( ) func TestRaft_StartStop(t *testing.T) { - c, err := MakeCluster(1, t, nil) - require.NoError(t, err) + c := MakeCluster(1, t, nil) c.Close() } func TestRaft_AfterShutdown(t *testing.T) { - c, err := MakeCluster(1, t, nil) - require.NoError(t, err) + c := MakeCluster(1, t, nil) c.Close() raft := c.rafts[0] @@ -66,8 +64,7 @@ func TestRaft_AfterShutdown(t *testing.T) { func TestRaft_LiveBootstrap(t *testing.T) { // Make the cluster. - c, err := MakeClusterNoBootstrap(3, t, nil) - require.NoError(t, err) + c := MakeClusterNoBootstrap(3, t, nil) defer c.Close() // Build the configuration. @@ -107,8 +104,7 @@ func TestRaft_LiveBootstrap(t *testing.T) { func TestRaft_LiveBootstrap_From_NonVoter(t *testing.T) { // Make the cluster. - c, err := MakeClusterNoBootstrap(2, t, nil) - require.NoError(t, err) + c := MakeClusterNoBootstrap(2, t, nil) defer c.Close() // Build the configuration. @@ -132,8 +128,7 @@ func TestRaft_LiveBootstrap_From_NonVoter(t *testing.T) { } func TestRaft_RecoverCluster_NoState(t *testing.T) { - c, err := MakeClusterNoBootstrap(1, t, nil) - require.NoError(t, err) + c := MakeClusterNoBootstrap(1, t, nil) defer c.Close() r := c.rafts[0] @@ -146,9 +141,8 @@ func TestRaft_RecoverCluster_NoState(t *testing.T) { }, } cfg := r.config() - err = RecoverCluster(&cfg, &MockFSM{}, r.logs, r.stable, + err := RecoverCluster(&cfg, &MockFSM{}, r.logs, r.stable, r.snapshots, r.trans, configuration) - require.Error(t, err) if err == nil || !strings.Contains(err.Error(), "no initial state") { t.Fatalf("should have failed for no initial state: %v", err) } @@ -161,8 +155,7 @@ func TestRaft_RecoverCluster(t *testing.T) { conf := inmemConfig(t) conf.TrailingLogs = 10 conf.SnapshotThreshold = uint64(snapshotThreshold) - c, err := MakeCluster(3, t, conf) - require.NoError(t, err) + c := MakeCluster(3, t, conf) defer c.Close() // Perform some commits. @@ -262,13 +255,11 @@ func TestRaft_RecoverCluster(t *testing.T) { func TestRaft_HasExistingState(t *testing.T) { var err error // Make a cluster. - c, err := MakeCluster(2, t, nil) - require.NoError(t, err) + c := MakeCluster(2, t, nil) defer c.Close() // Make a new cluster of 1. - c1, err := MakeClusterNoBootstrap(1, t, nil) - require.NoError(t, err) + c1 := MakeClusterNoBootstrap(1, t, nil) // Make sure the initial state is clean. var hasState bool @@ -305,8 +296,7 @@ func TestRaft_HasExistingState(t *testing.T) { func TestRaft_SingleNode(t *testing.T) { conf := inmemConfig(t) - c, err := MakeCluster(1, t, conf) - require.NoError(t, err) + c := MakeCluster(1, t, conf) defer c.Close() raft := c.rafts[0] @@ -349,8 +339,7 @@ func TestRaft_SingleNode(t *testing.T) { func TestRaft_TripleNode(t *testing.T) { // Make the cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() // Should be one leader @@ -368,8 +357,7 @@ func TestRaft_TripleNode(t *testing.T) { func TestRaft_LeaderFail(t *testing.T) { // Make the cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() // Should be one leader @@ -446,8 +434,7 @@ func TestRaft_LeaderFail(t *testing.T) { func TestRaft_BehindFollower(t *testing.T) { // Make the cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() // Disconnect one follower @@ -487,8 +474,7 @@ func TestRaft_BehindFollower(t *testing.T) { func TestRaft_ApplyNonLeader(t *testing.T) { // Make the cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() // Wait for a leader @@ -518,8 +504,7 @@ func TestRaft_ApplyConcurrent(t *testing.T) { conf := inmemConfig(t) conf.HeartbeatTimeout = 2 * conf.HeartbeatTimeout conf.ElectionTimeout = 2 * conf.ElectionTimeout - c, err := MakeCluster(3, t, conf) - require.NoError(t, err) + c := MakeCluster(3, t, conf) defer c.Close() // Wait for a leader @@ -571,8 +556,7 @@ func TestRaft_ApplyConcurrent_Timeout(t *testing.T) { conf.CommitTimeout = 1 * time.Millisecond conf.HeartbeatTimeout = 2 * conf.HeartbeatTimeout conf.ElectionTimeout = 2 * conf.ElectionTimeout - c, err := MakeCluster(1, t, conf) - require.NoError(t, err) + c := MakeCluster(1, t, conf) defer c.Close() // Wait for a leader @@ -608,13 +592,11 @@ func TestRaft_ApplyConcurrent_Timeout(t *testing.T) { func TestRaft_JoinNode(t *testing.T) { // Make a cluster - c, err := MakeCluster(2, t, nil) - require.NoError(t, err) + c := MakeCluster(2, t, nil) defer c.Close() // Make a new cluster of 1 - c1, err := MakeClusterNoBootstrap(1, t, nil) - require.NoError(t, err) + c1 := MakeClusterNoBootstrap(1, t, nil) // Merge clusters c.Merge(c1) @@ -639,30 +621,28 @@ func TestRaft_JoinNode(t *testing.T) { func TestRaft_JoinNode_ConfigStore(t *testing.T) { // Make a cluster conf := inmemConfig(t) - c, err := makeCluster(t, &MakeClusterOpts{ + c := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: true, Conf: conf, ConfigStoreFSM: true, }) - require.NoError(t, err) defer c.Close() // Make a new nodes - c1, err := makeCluster(t, &MakeClusterOpts{ + c1 := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: false, Conf: conf, ConfigStoreFSM: true, }) - require.NoError(t, err) - c2, err := makeCluster(t, &MakeClusterOpts{ + c2 := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: false, Conf: conf, ConfigStoreFSM: true, }) - require.NoError(t, err) + // Merge clusters c.Merge(c1) c.Merge(c2) @@ -708,8 +688,7 @@ func TestRaft_JoinNode_ConfigStore(t *testing.T) { func TestRaft_RemoveFollower(t *testing.T) { // Make a cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() // Get the leader @@ -750,8 +729,7 @@ func TestRaft_RemoveFollower(t *testing.T) { func TestRaft_RemoveLeader(t *testing.T) { // Make a cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() // Get the leader @@ -801,8 +779,7 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { // Make a cluster conf := inmemConfig(t) conf.ShutdownOnRemove = false - c, err := MakeCluster(3, t, conf) - require.NoError(t, err) + c := MakeCluster(3, t, conf) defer c.Close() // Get the leader @@ -864,8 +841,7 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { func TestRaft_RemoveFollower_SplitCluster(t *testing.T) { // Make a cluster. conf := inmemConfig(t) - c, err := MakeCluster(4, t, conf) - require.NoError(t, err) + c := MakeCluster(4, t, conf) defer c.Close() // Wait for a leader to get elected. @@ -901,8 +877,7 @@ func TestRaft_RemoveFollower_SplitCluster(t *testing.T) { func TestRaft_AddKnownPeer(t *testing.T) { // Make a cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() // Get the leader @@ -941,8 +916,7 @@ func TestRaft_AddKnownPeer(t *testing.T) { func TestRaft_RemoveUnknownPeer(t *testing.T) { // Make a cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() // Get the leader @@ -983,8 +957,7 @@ func TestRaft_SnapshotRestore(t *testing.T) { // Make the cluster conf := inmemConfig(t) conf.TrailingLogs = 10 - c, err := MakeCluster(1, t, conf) - require.NoError(t, err) + c := MakeCluster(1, t, conf) defer c.Close() // Commit a lot of things @@ -1028,7 +1001,7 @@ func TestRaft_SnapshotRestore(t *testing.T) { // Can't just reuse the old transport as it will be closed _, trans2 := NewInmemTransport(r.trans.LocalAddr()) cfg := r.config() - r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) + r, err := NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) if err != nil { t.Fatalf("err: %v", err) } @@ -1050,8 +1023,7 @@ func TestRaft_RestoreSnapshotOnStartup_Monotonic(t *testing.T) { Conf: conf, MonotonicLogs: true, } - c, err := MakeClusterCustom(t, opts) - require.NoError(t, err) + c := MakeClusterCustom(t, opts) defer c.Close() leader := c.Leader() @@ -1133,8 +1105,7 @@ func TestRaft_RestoreSnapshotOnStartup_CommitTrackingLogs(t *testing.T) { Conf: conf, CommitTrackingLogs: true, } - c, err := MakeClusterCustom(t, opts) - require.NoError(t, err) + c := MakeClusterCustom(t, opts) defer c.Close() leader := c.Leader() @@ -1217,8 +1188,7 @@ func TestRaft_RestoreCommittedLogs(t *testing.T) { Conf: conf, CommitTrackingLogs: true, } - c, err := MakeClusterCustom(t, opts) - require.NoError(t, err) + c := MakeClusterCustom(t, opts) defer c.Close() leader := c.Leader() @@ -1322,8 +1292,7 @@ func TestRaft_SnapshotRestore_Progress(t *testing.T) { // Make the cluster conf := inmemConfig(t) conf.TrailingLogs = 10 - c, err := MakeCluster(1, t, conf) - require.NoError(t, err) + c := MakeCluster(1, t, conf) defer c.Close() // Commit a lot of things @@ -1376,7 +1345,7 @@ func TestRaft_SnapshotRestore_Progress(t *testing.T) { Level: hclog.Info, Output: &logbuf, }) - r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) + r, err := NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) if err != nil { t.Fatalf("err: %v", err) } @@ -1442,8 +1411,7 @@ func TestRaft_NoRestoreOnStart(t *testing.T) { conf := inmemConfig(t) conf.TrailingLogs = 10 conf.NoSnapshotRestoreOnStart = true - c, err := MakeCluster(1, t, conf) - require.NoError(t, err) + c := MakeCluster(1, t, conf) // Commit a lot of things. leader := c.Leader() @@ -1472,7 +1440,7 @@ func TestRaft_NoRestoreOnStart(t *testing.T) { _, trans := NewInmemTransport(leader.localAddr) newFSM := &MockFSM{} cfg := leader.config() - _, err = NewRaft(&cfg, newFSM, leader.logs, leader.stable, leader.snapshots, trans) + _, err := NewRaft(&cfg, newFSM, leader.logs, leader.stable, leader.snapshots, trans) if err != nil { t.Fatalf("err: %v", err) } @@ -1488,8 +1456,7 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { conf := inmemConfig(t) conf.ProtocolVersion = 1 conf.TrailingLogs = 10 - c, err := MakeCluster(3, t, conf) - require.NoError(t, err) + c := MakeCluster(3, t, conf) defer c.Close() // Commit a lot of things. @@ -1517,8 +1484,7 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { } // Make a separate cluster. - c2, err := MakeClusterNoBootstrap(2, t, conf) - require.NoError(t, err) + c2 := MakeClusterNoBootstrap(2, t, conf) defer c2.Close() // Kill the old cluster. @@ -1596,8 +1562,7 @@ func TestRaft_AutoSnapshot(t *testing.T) { conf.SnapshotInterval = conf.CommitTimeout * 2 conf.SnapshotThreshold = 50 conf.TrailingLogs = 10 - c, err := MakeCluster(1, t, conf) - require.NoError(t, err) + c := MakeCluster(1, t, conf) defer c.Close() // Commit a lot of things @@ -1626,8 +1591,7 @@ func TestRaft_UserSnapshot(t *testing.T) { conf := inmemConfig(t) conf.SnapshotThreshold = 50 conf.TrailingLogs = 10 - c, err := MakeCluster(1, t, conf) - require.NoError(t, err) + c := MakeCluster(1, t, conf) defer c.Close() // With nothing committed, asking for a snapshot should return an error. @@ -1673,7 +1637,6 @@ func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, res conf.LeaderLeaseTimeout = 500 * time.Millisecond var c *cluster - var err error numPeers := 3 optsMonotonic := &MakeClusterOpts{ Peers: numPeers, @@ -1682,11 +1645,9 @@ func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, res MonotonicLogs: true, } if monotonicLogStore { - c, err = MakeClusterCustom(t, optsMonotonic) - require.NoError(t, err) + c = MakeClusterCustom(t, optsMonotonic) } else { - c, err = MakeCluster(numPeers, t, conf) - require.NoError(t, err) + c = MakeCluster(numPeers, t, conf) } defer c.Close() @@ -1720,11 +1681,9 @@ func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, res if restoreNewCluster { var c2 *cluster if monotonicLogStore { - c2, err = MakeClusterCustom(t, optsMonotonic) - require.NoError(t, err) + c2 = MakeClusterCustom(t, optsMonotonic) } else { - c2, err = MakeCluster(numPeers, t, conf) - require.NoError(t, err) + c2 = MakeCluster(numPeers, t, conf) } c = c2 leader = c.Leader() @@ -1833,8 +1792,7 @@ func TestRaft_SendSnapshotFollower(t *testing.T) { // Make the cluster conf := inmemConfig(t) conf.TrailingLogs = 10 - c, err := MakeCluster(3, t, conf) - require.NoError(t, err) + c := MakeCluster(3, t, conf) defer c.Close() // Disconnect one follower @@ -1876,8 +1834,7 @@ func TestRaft_SendSnapshotAndLogsFollower(t *testing.T) { // Make the cluster conf := inmemConfig(t) conf.TrailingLogs = 10 - c, err := MakeCluster(3, t, conf) - require.NoError(t, err) + c := MakeCluster(3, t, conf) defer c.Close() // Disconnect one follower @@ -1931,8 +1888,7 @@ func TestRaft_ReJoinFollower(t *testing.T) { // Enable operation after a remove. conf := inmemConfig(t) conf.ShutdownOnRemove = false - c, err := MakeCluster(3, t, conf) - require.NoError(t, err) + c := MakeCluster(3, t, conf) defer c.Close() // Get the leader. @@ -2008,8 +1964,7 @@ func TestRaft_ReJoinFollower(t *testing.T) { func TestRaft_LeaderLeaseExpire(t *testing.T) { // Make a cluster conf := inmemConfig(t) - c, err := MakeCluster(2, t, conf) - require.NoError(t, err) + c := MakeCluster(2, t, conf) defer c.Close() // Get the leader @@ -2075,8 +2030,7 @@ LOOP: func TestRaft_Barrier(t *testing.T) { // Make the cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() // Get the leader @@ -2104,8 +2058,7 @@ func TestRaft_Barrier(t *testing.T) { func TestRaft_VerifyLeader(t *testing.T) { // Make the cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() // Get the leader @@ -2122,8 +2075,7 @@ func TestRaft_VerifyLeader(t *testing.T) { func TestRaft_VerifyLeader_Single(t *testing.T) { // Make the cluster - c, err := MakeCluster(1, t, nil) - require.NoError(t, err) + c := MakeCluster(1, t, nil) defer c.Close() // Get the leader @@ -2141,8 +2093,7 @@ func TestRaft_VerifyLeader_Single(t *testing.T) { func TestRaft_VerifyLeader_Fail(t *testing.T) { // Make a cluster conf := inmemConfig(t) - c, err := MakeCluster(2, t, conf) - require.NoError(t, err) + c := MakeCluster(2, t, conf) defer c.Close() leader := c.Leader() @@ -2182,8 +2133,7 @@ func TestRaft_VerifyLeader_Fail(t *testing.T) { func TestRaft_VerifyLeader_PartialConnect(t *testing.T) { // Make a cluster conf := inmemConfig(t) - c, err := MakeCluster(3, t, conf) - require.NoError(t, err) + c := MakeCluster(3, t, conf) defer c.Close() // Get the leader @@ -2218,8 +2168,7 @@ func TestRaft_NotifyCh(t *testing.T) { ch := make(chan bool, 1) conf := inmemConfig(t) conf.NotifyCh = ch - c, err := MakeCluster(1, t, conf) - require.NoError(t, err) + c := MakeCluster(1, t, conf) defer c.Close() // Watch leaderCh for change @@ -2247,8 +2196,7 @@ func TestRaft_NotifyCh(t *testing.T) { } func TestRaft_AppendEntry(t *testing.T) { - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() followers := c.Followers() ldr := c.Leader() @@ -2337,15 +2285,13 @@ func TestRaft_PreVoteMixedCluster(t *testing.T) { conf := inmemConfig(t) conf.PreVoteDisabled = tc.prevoteNum <= tc.noprevoteNum - c, err := MakeCluster(majority, t, conf) - require.NoError(t, err) + c := MakeCluster(majority, t, conf) defer c.Close() // Set up another server speaking protocol version 2. conf = inmemConfig(t) conf.PreVoteDisabled = tc.prevoteNum >= tc.noprevoteNum - c1, err := MakeClusterNoBootstrap(minority, t, conf) - require.NoError(t, err) + c1 := MakeClusterNoBootstrap(minority, t, conf) // Merge clusters. c.Merge(c1) @@ -2374,8 +2320,7 @@ func TestRaft_PreVoteAvoidElectionWithPartition(t *testing.T) { // Make a prevote cluster. conf := inmemConfig(t) conf.PreVoteDisabled = false - c, err := MakeCluster(5, t, conf) - require.NoError(t, err) + c := MakeCluster(5, t, conf) defer c.Close() oldLeaderTerm := c.Leader().getCurrentTerm() @@ -2405,8 +2350,7 @@ func TestRaft_PreVoteAvoidElectionWithPartition(t *testing.T) { func TestRaft_VotingGrant_WhenLeaderAvailable(t *testing.T) { conf := inmemConfig(t) conf.ProtocolVersion = 3 - c, err := MakeCluster(3, t, conf) - require.NoError(t, err) + c := MakeCluster(3, t, conf) defer c.Close() followers := c.Followers() ldr := c.Leader() @@ -2451,8 +2395,7 @@ func TestRaft_VotingGrant_WhenLeaderAvailable(t *testing.T) { } func TestRaft_ProtocolVersion_RejectRPC(t *testing.T) { - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() followers := c.Followers() ldr := c.Leader() @@ -2470,7 +2413,7 @@ func TestRaft_ProtocolVersion_RejectRPC(t *testing.T) { // Reject a message from a future version we don't understand. var resp RequestVoteResponse - err = ldrT.RequestVote(followers[0].localID, followers[0].localAddr, &reqVote, &resp) + err := ldrT.RequestVote(followers[0].localID, followers[0].localAddr, &reqVote, &resp) if err == nil || !strings.Contains(err.Error(), "protocol version") { t.Fatalf("expected RPC to get rejected: %v", err) } @@ -2487,15 +2430,13 @@ func TestRaft_ProtocolVersion_Upgrade_1_2(t *testing.T) { // Make a cluster back on protocol version 1. conf := inmemConfig(t) conf.ProtocolVersion = 1 - c, err := MakeCluster(2, t, conf) - require.NoError(t, err) + c := MakeCluster(2, t, conf) defer c.Close() // Set up another server speaking protocol version 2. conf = inmemConfig(t) conf.ProtocolVersion = 2 - c1, err := MakeClusterNoBootstrap(1, t, conf) - require.NoError(t, err) + c1 := MakeClusterNoBootstrap(1, t, conf) // Merge clusters. c.Merge(c1) @@ -2532,16 +2473,14 @@ func TestRaft_ProtocolVersion_Upgrade_2_3(t *testing.T) { // Make a cluster back on protocol version 2. conf := inmemConfig(t) conf.ProtocolVersion = 2 - c, err := MakeCluster(2, t, conf) - require.NoError(t, err) + c := MakeCluster(2, t, conf) defer c.Close() oldAddr := c.Followers()[0].localAddr // Set up another server speaking protocol version 3. conf = inmemConfig(t) conf.ProtocolVersion = 3 - c1, err := MakeClusterNoBootstrap(1, t, conf) - require.NoError(t, err) + c1 := MakeClusterNoBootstrap(1, t, conf) // Merge clusters. c.Merge(c1) @@ -2567,10 +2506,9 @@ func TestRaft_ProtocolVersion_Upgrade_2_3(t *testing.T) { func TestRaft_LeaderID_Propagated(t *testing.T) { // Make a cluster on protocol version 3. conf := inmemConfig(t) - c, err := MakeCluster(3, t, conf) - require.NoError(t, err) + c := MakeCluster(3, t, conf) defer c.Close() - err = waitForLeader(c) + err := waitForLeader(c) require.NoError(t, err) for _, n := range c.rafts { @@ -2655,14 +2593,13 @@ func TestRaft_LeadershipTransferPickServer(t *testing.T) { } func TestRaft_LeadershipTransfer(t *testing.T) { - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() oldLeader := string(c.Leader().localID) - future := c.Leader().LeadershipTransfer() - if future.Error() != nil { - t.Fatalf("Didn't expect error: %v", future.Error()) + err := c.Leader().LeadershipTransfer() + if err.Error() != nil { + t.Fatalf("Didn't expect error: %v", err.Error()) } newLeader := string(c.Leader().localID) if oldLeader == newLeader { @@ -2671,8 +2608,7 @@ func TestRaft_LeadershipTransfer(t *testing.T) { } func TestRaft_LeadershipTransferWithOneNode(t *testing.T) { - c, err := MakeCluster(1, t, nil) - require.NoError(t, err) + c := MakeCluster(1, t, nil) defer c.Close() future := c.Leader().LeadershipTransfer() @@ -2690,8 +2626,7 @@ func TestRaft_LeadershipTransferWithOneNode(t *testing.T) { func TestRaft_LeadershipTransferWithWrites(t *testing.T) { conf := inmemConfig(t) conf.Logger = hclog.New(&hclog.LoggerOptions{Level: hclog.Trace}) - c, err := MakeCluster(7, t, conf) - require.NoError(t, err) + c := MakeCluster(7, t, conf) defer c.Close() doneCh := make(chan struct{}) @@ -2744,8 +2679,7 @@ func TestRaft_LeadershipTransferWithWrites(t *testing.T) { } func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { - c, err := MakeCluster(7, t, nil) - require.NoError(t, err) + c := MakeCluster(7, t, nil) defer c.Close() follower := c.GetInState(Follower)[0] @@ -2759,8 +2693,7 @@ func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { } func TestRaft_LeadershipTransferToInvalidID(t *testing.T) { - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() future := c.Leader().LeadershipTransferToServer(ServerID("abc"), ServerAddress("localhost")) @@ -2776,8 +2709,7 @@ func TestRaft_LeadershipTransferToInvalidID(t *testing.T) { } func TestRaft_LeadershipTransferToInvalidAddress(t *testing.T) { - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() follower := c.GetInState(Follower)[0] @@ -2793,8 +2725,7 @@ func TestRaft_LeadershipTransferToInvalidAddress(t *testing.T) { } func TestRaft_LeadershipTransferToBehindServer(t *testing.T) { - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() l := c.Leader() @@ -2815,8 +2746,7 @@ func TestRaft_LeadershipTransferToBehindServer(t *testing.T) { } func TestRaft_LeadershipTransferToItself(t *testing.T) { - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() l := c.Leader() @@ -2833,8 +2763,7 @@ func TestRaft_LeadershipTransferToItself(t *testing.T) { } func TestRaft_LeadershipTransferLeaderRejectsClientRequests(t *testing.T) { - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() l := c.Leader() l.setLeadershipTransferInProgress(true) @@ -2868,8 +2797,7 @@ func TestRaft_LeadershipTransferLeaderRejectsClientRequests(t *testing.T) { } func TestRaft_LeadershipTransferLeaderReplicationTimeout(t *testing.T) { - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() l := c.Leader() @@ -2899,8 +2827,7 @@ func TestRaft_LeadershipTransferLeaderReplicationTimeout(t *testing.T) { } func TestRaft_LeadershipTransferIgnoresNonvoters(t *testing.T) { - c, err := MakeCluster(2, t, nil) - require.NoError(t, err) + c := MakeCluster(2, t, nil) defer c.Close() follower := c.Followers()[0] @@ -2937,8 +2864,7 @@ func TestRaft_LeadershipTransferStopRightAway(t *testing.T) { } func TestRaft_GetConfigurationNoBootstrap(t *testing.T) { - c, err := MakeCluster(2, t, nil) - require.NoError(t, err) + c := MakeCluster(2, t, nil) defer c.Close() // Should be one leader @@ -2975,8 +2901,7 @@ func TestRaft_GetConfigurationNoBootstrap(t *testing.T) { } func TestRaft_LogStoreIsMonotonic(t *testing.T) { - c, err := MakeCluster(1, t, nil) - require.NoError(t, err) + c := MakeCluster(1, t, nil) defer c.Close() // Should be one leader @@ -3011,8 +2936,7 @@ func TestRaft_LogStoreIsMonotonic(t *testing.T) { } func TestRaft_CacheLogWithStoreError(t *testing.T) { - c, err := MakeCluster(2, t, nil) - require.NoError(t, err) + c := MakeCluster(2, t, nil) defer c.Close() // Should be one leader @@ -3064,8 +2988,7 @@ func TestRaft_CacheLogWithStoreError(t *testing.T) { func TestRaft_ReloadConfig(t *testing.T) { conf := inmemConfig(t) conf.LeaderLeaseTimeout = 40 * time.Millisecond - c, err := MakeCluster(1, t, conf) - require.NoError(t, err) + c := MakeCluster(1, t, conf) defer c.Close() raft := c.rafts[0] @@ -3095,8 +3018,7 @@ func TestRaft_ReloadConfig(t *testing.T) { func TestRaft_ReloadConfigValidates(t *testing.T) { conf := inmemConfig(t) - c, err := MakeCluster(1, t, conf) - require.NoError(t, err) + c := MakeCluster(1, t, conf) defer c.Close() raft := c.rafts[0] @@ -3172,8 +3094,8 @@ func TestRaft_InstallSnapshot_InvalidPeers(t *testing.T) { func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { // Make a cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) + defer c.Close() // Get the leader @@ -3244,8 +3166,8 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { func TestRaft_ClusterCanRegainStability_WhenNonVoterWithHigherTermJoin(t *testing.T) { // Make a cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) + defer c.Close() // Get the leader @@ -3339,10 +3261,10 @@ func TestRaft_FollowerRemovalNoElection(t *testing.T) { inmemConf := inmemConfig(t) inmemConf.HeartbeatTimeout = 100 * time.Millisecond inmemConf.ElectionTimeout = 100 * time.Millisecond - c, err := MakeCluster(3, t, inmemConf) - require.NoError(t, err) + c := MakeCluster(3, t, inmemConf) + defer c.Close() - err = waitForLeader(c) + err := waitForLeader(c) require.NoError(t, err) leader := c.Leader() @@ -3492,10 +3414,9 @@ func TestRaft_runFollower_ReloadTimeoutConfigs(t *testing.T) { func TestRaft_PreVote_ShouldNotRejectLeader(t *testing.T) { // Make a cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() - err = waitForLeader(c) + err := waitForLeader(c) require.NoError(t, err) leader := c.Leader() @@ -3523,7 +3444,7 @@ func TestRaft_PreVote_ShouldNotRejectLeader(t *testing.T) { var resp RequestPreVoteResponse leaderT := c.trans[c.IndexOf(leader)] - if err = leaderT.RequestPreVote(follower.localID, follower.localAddr, &reqPreVote, &resp); err != nil { + if err := leaderT.RequestPreVote(follower.localID, follower.localAddr, &reqPreVote, &resp); err != nil { t.Fatalf("RequestPreVote RPC failed %v", err) } @@ -3535,10 +3456,9 @@ func TestRaft_PreVote_ShouldNotRejectLeader(t *testing.T) { func TestRaft_PreVote_ShouldRejectNonLeader(t *testing.T) { // Make a cluster - c, err := MakeCluster(3, t, nil) - require.NoError(t, err) + c := MakeCluster(3, t, nil) defer c.Close() - err = waitForLeader(c) + err := waitForLeader(c) require.NoError(t, err) // Wait until we have 2 followers diff --git a/testing.go b/testing.go index 9c75a4e4..b8647e75 100644 --- a/testing.go +++ b/testing.go @@ -731,7 +731,7 @@ type MakeClusterOpts struct { // If bootstrap is true, the servers will know about each other before starting, // otherwise their transports will be wired up but they won't yet have configured // each other. -func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { +func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { if opts.Conf == nil { opts.Conf = inmemConfig(t) } @@ -757,8 +757,7 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { for i := 0; i < opts.Peers; i++ { dir, err := os.MkdirTemp("", "raft") if err != nil { - t.Logf("err: %v", err) - return nil, err + t.Fatalf("err: %v", err) } store := NewInmemStore() @@ -820,30 +819,27 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { if opts.Bootstrap { err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration) if err != nil { - t.Logf("BootstrapCluster failed: %v", err) - return nil, err + t.Fatalf("BootstrapCluster failed: %v", err) } } raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans) if err != nil { - t.Logf("NewRaft failed: %v", err) - return nil, err + t.Fatalf("NewRaft failed: %v", err) } raft.RegisterObserver(NewObserver(c.observationCh, false, nil)) if err != nil { - t.Logf("RegisterObserver failed: %v", err) - return nil, err + t.Fatalf("RegisterObserver failed: %v", err) } c.rafts = append(c.rafts, raft) } - return c, nil + return c } // NOTE: This is exposed for middleware testing purposes and is not a stable API -func MakeCluster(n int, t *testing.T, conf *Config) (*cluster, error) { +func MakeCluster(n int, t *testing.T, conf *Config) *cluster { return makeCluster(t, &MakeClusterOpts{ Peers: n, Bootstrap: true, @@ -852,7 +848,7 @@ func MakeCluster(n int, t *testing.T, conf *Config) (*cluster, error) { } // NOTE: This is exposed for middleware testing purposes and is not a stable API -func MakeClusterNoBootstrap(n int, t *testing.T, conf *Config) (*cluster, error) { +func MakeClusterNoBootstrap(n int, t *testing.T, conf *Config) *cluster { return makeCluster(t, &MakeClusterOpts{ Peers: n, Conf: conf, @@ -860,7 +856,7 @@ func MakeClusterNoBootstrap(n int, t *testing.T, conf *Config) (*cluster, error) } // NOTE: This is exposed for middleware testing purposes and is not a stable API -func MakeClusterCustom(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { +func MakeClusterCustom(t *testing.T, opts *MakeClusterOpts) *cluster { return makeCluster(t, opts) } From 300a6e7924dad9c2833611b21198ad71e263ee5d Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 12 Oct 2024 23:58:35 +0800 Subject: [PATCH 35/37] refactor: update makeCluster to return errors - Add PropagateError option to MakeClusterOpts - Update makeCluster to return (*cluster, error) - Modify MakeCluster, MakeClusterNo --- raft_test.go | 11 +++++++---- testing.go | 36 ++++++++++++++++++++++++++++++++---- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/raft_test.go b/raft_test.go index 0bd22e67..53505100 100644 --- a/raft_test.go +++ b/raft_test.go @@ -621,27 +621,30 @@ func TestRaft_JoinNode(t *testing.T) { func TestRaft_JoinNode_ConfigStore(t *testing.T) { // Make a cluster conf := inmemConfig(t) - c := makeCluster(t, &MakeClusterOpts{ + c, err := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: true, Conf: conf, ConfigStoreFSM: true, }) + require.NoError(t, err) defer c.Close() // Make a new nodes - c1 := makeCluster(t, &MakeClusterOpts{ + c1, err := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: false, Conf: conf, ConfigStoreFSM: true, }) - c2 := makeCluster(t, &MakeClusterOpts{ + require.NoError(t, err) + c2, err := makeCluster(t, &MakeClusterOpts{ Peers: 1, Bootstrap: false, Conf: conf, ConfigStoreFSM: true, }) + require.NoError(t, err) // Merge clusters c.Merge(c1) @@ -1284,7 +1287,7 @@ func TestRaft_RestoreCommittedLogs_IncompatibleLogStore(t *testing.T) { Conf: conf, CommitTrackingLogs: false, } - _, err := MakeClusterCustom(t, opts) + _, err := MakeClusterCustomWithErr(t, opts) require.ErrorIs(t, err, ErrIncompatibleLogStore) } diff --git a/testing.go b/testing.go index b8647e75..82877aae 100644 --- a/testing.go +++ b/testing.go @@ -725,13 +725,14 @@ type MakeClusterOpts struct { LongstopTimeout time.Duration MonotonicLogs bool CommitTrackingLogs bool + PropagateError bool // If true, return errors instead of calling t.Fatal } // makeCluster will return a cluster with the given config and number of peers. // If bootstrap is true, the servers will know about each other before starting, // otherwise their transports will be wired up but they won't yet have configured // each other. -func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { +func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { if opts.Conf == nil { opts.Conf = inmemConfig(t) } @@ -757,6 +758,9 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { for i := 0; i < opts.Peers; i++ { dir, err := os.MkdirTemp("", "raft") if err != nil { + if opts.PropagateError { + return nil, fmt.Errorf("failed to create temp dir: %v", err) + } t.Fatalf("err: %v", err) } @@ -819,12 +823,18 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { if opts.Bootstrap { err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration) if err != nil { + if opts.PropagateError { + return nil, fmt.Errorf("BootstrapCluster failed: %v", err) + } t.Fatalf("BootstrapCluster failed: %v", err) } } raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans) if err != nil { + if opts.PropagateError { + return nil, fmt.Errorf("NewRaft failed: %v", err) + } t.Fatalf("NewRaft failed: %v", err) } @@ -835,28 +845,46 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { c.rafts = append(c.rafts, raft) } - return c + return c, nil } // NOTE: This is exposed for middleware testing purposes and is not a stable API func MakeCluster(n int, t *testing.T, conf *Config) *cluster { - return makeCluster(t, &MakeClusterOpts{ + c, err := makeCluster(t, &MakeClusterOpts{ Peers: n, Bootstrap: true, Conf: conf, }) + if err != nil { + t.Fatalf("failed to make cluster: %v", err) + } + return c } // NOTE: This is exposed for middleware testing purposes and is not a stable API func MakeClusterNoBootstrap(n int, t *testing.T, conf *Config) *cluster { - return makeCluster(t, &MakeClusterOpts{ + c, err := makeCluster(t, &MakeClusterOpts{ Peers: n, Conf: conf, }) + if err != nil { + t.Fatalf("failed to make cluster: %v", err) + } + return c } // NOTE: This is exposed for middleware testing purposes and is not a stable API func MakeClusterCustom(t *testing.T, opts *MakeClusterOpts) *cluster { + c, err := makeCluster(t, opts) + if err != nil { + t.Fatalf("failed to make cluster: %v", err) + } + return c +} + +// NOTE: This is exposed for middleware testing purposes and is not a stable API +func MakeClusterCustomWithErr(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { + opts.PropagateError = true return makeCluster(t, opts) } From 8d11a28582d268e8a9aa366425313a940b44f340 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sun, 13 Oct 2024 00:09:49 +0800 Subject: [PATCH 36/37] Use wrapped err --- testing.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/testing.go b/testing.go index 82877aae..0683155f 100644 --- a/testing.go +++ b/testing.go @@ -759,7 +759,7 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { dir, err := os.MkdirTemp("", "raft") if err != nil { if opts.PropagateError { - return nil, fmt.Errorf("failed to create temp dir: %v", err) + return nil, fmt.Errorf("failed to create temp dir: %w", err) } t.Fatalf("err: %v", err) } @@ -824,7 +824,7 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration) if err != nil { if opts.PropagateError { - return nil, fmt.Errorf("BootstrapCluster failed: %v", err) + return nil, fmt.Errorf("BootstrapCluster failed: %w", err) } t.Fatalf("BootstrapCluster failed: %v", err) } @@ -833,13 +833,16 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) { raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans) if err != nil { if opts.PropagateError { - return nil, fmt.Errorf("NewRaft failed: %v", err) + return nil, fmt.Errorf("NewRaft failed: %w", err) } t.Fatalf("NewRaft failed: %v", err) } raft.RegisterObserver(NewObserver(c.observationCh, false, nil)) if err != nil { + if opts.PropagateError { + return nil, fmt.Errorf("RegisterObserver failed: %w", err) + } t.Fatalf("RegisterObserver failed: %v", err) } c.rafts = append(c.rafts, raft) From 1bdf1610a8b880f53ca307f68000bbe978279779 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 15 Oct 2024 21:39:54 +0800 Subject: [PATCH 37/37] docs: clarify GetCommitIndex behavior in CommitTrackingLogStore interface - Specify that GetCommitIndex should not return a value higher than the last index in the log - Clarify that if a higher value is returned, the last index in the log will be used instead - Add instruction to return (0, nil) when no commit index is found in the log store --- log.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/log.go b/log.go index 179a2a62..6856680b 100644 --- a/log.go +++ b/log.go @@ -212,6 +212,10 @@ type CommitTrackingLogStore interface { // GetCommitIndex returns the latest persisted commit index from the latest log entry // in the store at startup. - // It is ok to return a value higher than the last index in the log (But it should never happen). + // + // GetCommitIndex should not return a value higher than the last index in the log. + // If that happens, the last index in the log will be used. + // + // When no commit index is found in the log store, return (0, nil). GetCommitIndex() (uint64, error) }