Skip to content

Commit

Permalink
Merge pull request etcd-io#16761 from serathius/robustness-wal
Browse files Browse the repository at this point in the history
Robustness wal
  • Loading branch information
serathius committed Apr 14, 2024
2 parents 452445e + 569693b commit 3f20c2a
Show file tree
Hide file tree
Showing 9 changed files with 759 additions and 325 deletions.
26 changes: 15 additions & 11 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,21 @@ func TestRobustnessRegression(t *testing.T) {
}

func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) {
report := report.TestReport{Logger: lg}
r := report.TestReport{Logger: lg}
var err error
report.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
r.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
if err != nil {
t.Fatal(err)
}
defer report.Cluster.Close()
defer r.Cluster.Close()

if s.failpoint == nil {
s.failpoint, err = failpoint.PickRandom(report.Cluster)
s.failpoint, err = failpoint.PickRandom(r.Cluster)
if err != nil {
t.Fatal(err)
}
}
err = failpoint.Validate(report.Cluster, s.failpoint)
err = failpoint.Validate(r.Cluster, s.failpoint)
if err != nil {
t.Fatal(err)
}
Expand All @@ -88,15 +88,19 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
// Refer to: https://github.com/golang/go/issues/49929
panicked := true
defer func() {
report.Report(t, panicked)
r.Report(t, panicked)
}()
report.Client = s.run(ctx, t, lg, report.Cluster)
forcestopCluster(report.Cluster)
r.Client = s.run(ctx, t, lg, r.Cluster)
persistedRequests, err := report.PersistedRequestsCluster(lg, r.Cluster)
if err != nil {
t.Fatal(err)
}
forcestopCluster(r.Cluster)

watchProgressNotifyEnabled := report.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client, 5*time.Minute)
r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute)

panicked = false
}
Expand Down
127 changes: 77 additions & 50 deletions tests/robustness/model/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,82 +19,109 @@ import (
"strings"
)

func NewReplay(eventHistory []PersistedEvent) *EtcdReplay {
var lastEventRevision int64 = 1
for _, event := range eventHistory {
if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 {
panic("Replay requires a complete event history")
}
lastEventRevision = event.Revision
}
func NewReplay(persistedRequests []EtcdRequest) *EtcdReplay {
return &EtcdReplay{
eventHistory: eventHistory,
persistedRequests: persistedRequests,
}
}

type EtcdReplay struct {
eventHistory []PersistedEvent
persistedRequests []EtcdRequest

// Cached state and event index used for it's calculation
cachedState *EtcdState
eventHistoryIndex int
cacheIndex int
cacheState *EtcdState
}

func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) {
if revision < 1 {
return EtcdState{}, fmt.Errorf("invalid revision: %d", revision)
}
if r.cachedState == nil || r.cachedState.Revision > revision {
r.reset()
r.moveToRevision(revision)
if r.cacheIndex > len(r.persistedRequests) && r.cacheState.Revision < revision {
return EtcdState{}, fmt.Errorf("requested revision higher then available in even history, requested: %d, model: %d", revision, r.cacheState.Revision)
}
return *r.cacheState, nil
}

for r.eventHistoryIndex < len(r.eventHistory) && r.cachedState.Revision < revision {
nextRequest, nextRevision, nextIndex := r.next()
newState, _ := r.cachedState.Step(nextRequest)
if newState.Revision != nextRevision {
return EtcdState{}, fmt.Errorf("model returned different revision than one present in event history, model: %d, event: %d", newState.Revision, nextRevision)
func (r *EtcdReplay) EventsForWatch(watch WatchRequest) (events []PersistedEvent) {
if watch.Revision == 0 {
r.reset()
} else {
r.moveToRevision(watch.Revision - 1)
}
for r.cacheIndex < len(r.persistedRequests) {
request := r.persistedRequests[r.cacheIndex]
newState, response := r.cacheState.Step(request)
for _, e := range toWatchEvents(r.cacheState, request, response) {
if e.Match(watch) {
events = append(events, e)
}
}
r.cachedState = &newState
r.eventHistoryIndex = nextIndex
r.cacheState = &newState
r.cacheIndex++
}
if r.eventHistoryIndex > len(r.eventHistory) && r.cachedState.Revision < revision {
return EtcdState{}, fmt.Errorf("requested revision higher then available in even history, requested: %d, model: %d", revision, r.cachedState.Revision)
return events
}

func (r *EtcdReplay) moveToRevision(revision int64) {
if r.cacheState == nil || r.cacheState.Revision > revision {
r.reset()
}
for r.cacheIndex < len(r.persistedRequests) && r.cacheState.Revision < revision {
newState, _ := r.cacheState.Step(r.persistedRequests[r.cacheIndex])
r.cacheState = &newState
r.cacheIndex++
}
return *r.cachedState, nil
}

func (r *EtcdReplay) reset() {
state := freshEtcdState()
r.cachedState = &state
r.eventHistoryIndex = 0
r.cacheState = &state
r.cacheIndex = 0
}

func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) {
revision = r.eventHistory[r.eventHistoryIndex].Revision
index = r.eventHistoryIndex
operations := []EtcdOperation{}
for index < len(r.eventHistory) && r.eventHistory[index].Revision == revision {
event := r.eventHistory[index]
switch event.Type {
case PutOperation:
operations = append(operations, EtcdOperation{
Type: event.Type,
Put: PutOptions{Key: event.Key, Value: event.Value},
})
case DeleteOperation:
operations = append(operations, EtcdOperation{
Type: event.Type,
Delete: DeleteOptions{Key: event.Key},
})
func toWatchEvents(prevState *EtcdState, request EtcdRequest, response MaybeEtcdResponse) (events []PersistedEvent) {
if request.Type == Txn && response.Error == "" {
var ops []EtcdOperation
if response.Txn.Failure {
ops = request.Txn.OperationsOnFailure
} else {
ops = request.Txn.OperationsOnSuccess
}
for _, op := range ops {
switch op.Type {
case RangeOperation:
case DeleteOperation:
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Delete.Key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Delete.Key]; ok {
events = append(events, e)
}
case PutOperation:
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Put.Key,
Value: op.Put.Value,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Put.Key]; !ok {
e.IsCreate = true
}
events = append(events, e)
default:
panic(fmt.Sprintf("unsupported operation type: %v", op))
}
}
index++
}
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: operations,
},
}, revision, index
return events
}

type WatchEvent struct {
Expand Down
Loading

0 comments on commit 3f20c2a

Please sign in to comment.