Skip to content

Commit

Permalink
Utilize WAL to patch operation history
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Oct 16, 2023
1 parent f23e34a commit 5f45b66
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 459 deletions.
26 changes: 15 additions & 11 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,18 @@ type testScenario struct {
}

func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) {
report := report.TestReport{Logger: lg}
r := report.TestReport{Logger: lg}
var err error
report.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
r.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
if err != nil {
t.Fatal(err)
}
defer report.Cluster.Close()
defer r.Cluster.Close()

if s.failpoint == nil {
s.failpoint = pickRandomFailpoint(t, report.Cluster)
s.failpoint = pickRandomFailpoint(t, r.Cluster)
} else {
err = validateFailpoint(report.Cluster, s.failpoint)
err = validateFailpoint(r.Cluster, s.failpoint)
if err != nil {
t.Fatal(err)
}
Expand All @@ -200,15 +200,19 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
// Refer to: https://github.com/golang/go/issues/49929
panicked := true
defer func() {
report.Report(t, panicked)
r.Report(t, panicked)
}()
report.Client = s.run(ctx, t, lg, report.Cluster)
forcestopCluster(report.Cluster)
r.Client = s.run(ctx, t, lg, r.Cluster)
forcestopCluster(r.Cluster)

watchProgressNotifyEnabled := report.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
persistedRequests, err := report.PersistedRequestsCluster(lg, r.Cluster)
if err != nil {
t.Error(err)
}
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client, 5*time.Minute)
r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute)

panicked = false
}
Expand Down
Binary file not shown.
171 changes: 75 additions & 96 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,131 +15,110 @@
package validate

import (
"fmt"

"github.com/anishathalye/porcupine"

"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)

func patchedOperationHistory(reports []report.ClientReport) []porcupine.Operation {
allOperations := operations(reports)
uniqueEvents := uniqueWatchEvents(reports)
return patchOperationsWithWatchEvents(allOperations, uniqueEvents)
}

func operations(reports []report.ClientReport) []porcupine.Operation {
var ops []porcupine.Operation
for _, r := range reports {
ops = append(ops, r.KeyValue...)
func removeFailedNotPersistedOperations(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) []porcupine.Operation {
if len(persistedRequests) == 0 {
return allOperations
}
return ops
operationsReturnTime := persistedOperationsReturnTime(allOperations, persistedRequests)
return patchOperationsWithWatchEvents(allOperations, operationsReturnTime)
}

func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]traffic.TimedWatchEvent {
persisted := map[model.Event]traffic.TimedWatchEvent{}
for _, r := range reports {
for _, op := range r.Watch {
for _, resp := range op.Responses {
for _, event := range resp.Events {
responseTime := resp.Time
if prev, found := persisted[event.Event]; found && prev.Time < responseTime {
responseTime = prev.Time
}
persisted[event.Event] = traffic.TimedWatchEvent{Time: responseTime, WatchEvent: event}
func persistedOperationsReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[model.EtcdOperation]int64 {
operationReturnTime := operationReturnTime(allOperations)
persisted := map[model.EtcdOperation]int64{}

lastReturnTime := requestReturnTime(operationReturnTime, persistedRequests[len(persistedRequests)-1])
for i := len(persistedRequests) - 1; i >= 0; i-- {
request := persistedRequests[i]
switch request.Type {
case model.Txn:
for _, op := range request.Txn.OperationsOnSuccess {
if op.Type != model.PutOperation {
continue
}
if _, found := persisted[op]; found {
panic(fmt.Sprintf("Unexpected duplicate event in persisted requests. %d %+v", i, op))
}
persisted[op] = lastReturnTime - 1
}
newReturnTime := requestReturnTime(operationReturnTime, request)
lastReturnTime = min(lastReturnTime, newReturnTime)
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}
return persisted
}

func patchOperationsWithWatchEvents(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent) []porcupine.Operation {

newOperations := make([]porcupine.Operation, 0, len(operations))
lastObservedOperation := lastOperationObservedInWatch(operations, watchEvents)

func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int64 {
newOperations := map[model.EtcdOperation]int64{}
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
resp := op.Output.(model.MaybeEtcdResponse)
if resp.Error == "" || op.Call > lastObservedOperation.Call || request.Type != model.Txn {
// Cannot patch those requests.
newOperations = append(newOperations, op)
continue
}
event := matchWatchEvent(request.Txn, watchEvents)
if event != nil {
// Set revision and time based on watchEvent.
op.Return = event.Time.Nanoseconds()
op.Output = model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: event.Revision}}
newOperations = append(newOperations, op)
continue
}
if !canBeDiscarded(request.Txn) {
// Leave operation as it is as we cannot discard it.
newOperations = append(newOperations, op)
continue
switch request.Type {
case model.Txn:
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if etcdOp.Type != model.PutOperation {
continue
}
if _, found := newOperations[etcdOp]; found {
panic("Unexpected duplicate event in persisted requests.")
}
newOperations[etcdOp] = op.Return
}
case model.Range:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
// Remove non persisted operations
}
return newOperations
}

func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent) porcupine.Operation {
var maxCallTime int64
var lastOperation porcupine.Operation
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
if request.Type != model.Txn {
continue
}
event := matchWatchEvent(request.Txn, watchEvents)
if event != nil && op.Call > maxCallTime {
maxCallTime = op.Call
lastOperation = op
}
}
return lastOperation
}

func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]traffic.TimedWatchEvent) *traffic.TimedWatchEvent {
for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) {
if etcdOp.Type == model.PutOperation {
event, ok := watchEvents[model.Event{
Type: etcdOp.Type,
Key: etcdOp.Put.Key,
Value: etcdOp.Put.Value,
}]
if ok {
return &event
func requestReturnTime(operationTime map[model.EtcdOperation]int64, request model.EtcdRequest) int64 {
switch request.Type {
case model.Txn:
for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if op.Type != model.PutOperation {
continue
}
if time, found := operationTime[op]; found {
return time
}
}
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
return nil
}

func canBeDiscarded(request *model.TxnRequest) bool {
return operationsCanBeDiscarded(request.OperationsOnSuccess) && operationsCanBeDiscarded(request.OperationsOnFailure)
panic(fmt.Sprintf("Unknown request: %+v", request))
}

func operationsCanBeDiscarded(ops []model.EtcdOperation) bool {
return hasUniqueWriteOperation(ops) || !hasWriteOperation(ops)
}
func patchOperationsWithWatchEvents(operations []porcupine.Operation, persistedOperations map[model.EtcdOperation]int64) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))

func hasWriteOperation(ops []model.EtcdOperation) bool {
for _, etcdOp := range ops {
if etcdOp.Type == model.PutOperation || etcdOp.Type == model.DeleteOperation {
return true
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
resp := op.Output.(model.MaybeEtcdResponse)
if resp.Error == "" || request.Type != model.Txn {
// Not patching successful requests or non-txn requests
newOperations = append(newOperations, op)
continue
}
}
return false
}

func hasUniqueWriteOperation(ops []model.EtcdOperation) bool {
for _, etcdOp := range ops {
if etcdOp.Type == model.PutOperation {
return true
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if etcdOp.Type != model.PutOperation {
continue
}
if returnTime, found := persistedOperations[etcdOp]; found {
op.Return = returnTime
newOperations = append(newOperations, op)
break
}
}
// Remove non persisted operations
}
return false
return newOperations
}
Loading

0 comments on commit 5f45b66

Please sign in to comment.