diff --git a/server/etcdserver/txn/txn.go b/server/etcdserver/txn/txn.go index 220b7d31580..75fe55a91e5 100644 --- a/server/etcdserver/txn/txn.go +++ b/server/etcdserver/txn/txn.go @@ -275,7 +275,11 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit if isWrite { trace.AddField(traceutil.Field{Key: "read_only", Value: false}) } - _, err := checkTxn(txnRead, rt, lessor, txnPath) + // checkTxn performs pre-flight checks on the txn to confirm that it's valid (e.g range operation is not asking for + // a future/compacted revision, put operations doesn't supply a lease which doesn't exist, etc) or returns an error. + // performsWrite says if the txn actually performs any write operations based on the compare choices we land with + // (i.e. txnPath) - which has been pre-computed. So even if isWrite was true for the txn, performsWrite may be false. + _, performsWrite, err := checkTxn(txnRead, rt, lessor, txnPath) if err != nil { txnRead.End() return nil, nil, err @@ -286,13 +290,13 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit // serialized on the raft loop, the revision in the read view will // be the revision of the write txnWrite. var txnWrite mvcc.TxnWrite - if isWrite { + if performsWrite { txnRead.End() txnWrite = kv.Write(trace) } else { txnWrite = mvcc.NewReadOnlyTxnWrite(txnRead) } - txnResp, err := txn(ctx, lg, txnWrite, rt, isWrite, txnPath) + txnResp, err := txn(ctx, lg, txnWrite, rt, performsWrite, txnPath) txnWrite.End() trace.AddField( @@ -314,6 +318,8 @@ func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.Txn // - data inconsistency across different etcd members if they applied the txn asymmetrically lg.Panic("unexpected error during txn with writes", zap.Error(err)) } else { + // Given the txn performs only read operations, it does not cause any apply layer side-effects. + // Therefore, it's safe for the server to return an error here (without panic) and continue applying subsequent records. lg.Error("unexpected error during readonly txn", zap.Error(err)) } } @@ -441,8 +447,9 @@ func checkRange(rv mvcc.ReadView, req *pb.RangeRequest) error { return nil } -func checkTxn(rv mvcc.ReadView, rt *pb.TxnRequest, lessor lease.Lessor, txnPath []bool) (int, error) { +func checkTxn(rv mvcc.ReadView, rt *pb.TxnRequest, lessor lease.Lessor, txnPath []bool) (int, bool, error) { txnCount := 0 + performsWrite := false reqs := rt.Success if !txnPath[0] { reqs = rt.Failure @@ -450,24 +457,28 @@ func checkTxn(rv mvcc.ReadView, rt *pb.TxnRequest, lessor lease.Lessor, txnPath for _, req := range reqs { var err error var txns int + var nestedTxnPerformsWrite bool switch tv := req.Request.(type) { case *pb.RequestOp_RequestRange: err = checkRange(rv, tv.RequestRange) case *pb.RequestOp_RequestPut: err = checkPut(rv, lessor, tv.RequestPut) + performsWrite = true case *pb.RequestOp_RequestDeleteRange: + performsWrite = true case *pb.RequestOp_RequestTxn: - txns, err = checkTxn(rv, tv.RequestTxn, lessor, txnPath[1:]) + txns, nestedTxnPerformsWrite, err = checkTxn(rv, tv.RequestTxn, lessor, txnPath[1:]) + performsWrite = performsWrite || nestedTxnPerformsWrite txnCount += txns + 1 txnPath = txnPath[txns+1:] default: // empty union } if err != nil { - return 0, err + return 0, performsWrite, err } } - return txnCount, nil + return txnCount, performsWrite, nil } // mkGteRange determines if the range end is a >= range. This works around grpc diff --git a/server/etcdserver/txn/txn_test.go b/server/etcdserver/txn/txn_test.go index 850c8a95b9b..7b506aba3a5 100644 --- a/server/etcdserver/txn/txn_test.go +++ b/server/etcdserver/txn/txn_test.go @@ -307,7 +307,7 @@ func setup(t *testing.T, setup testSetup) (mvcc.KV, lease.Lessor) { return s, lessor } -func TestReadonlyTxnError(t *testing.T) { +func TestReadonlyTxnError_WhenOperationFails(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, b) s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{}) @@ -322,6 +322,7 @@ func TestReadonlyTxnError(t *testing.T) { s.Put([]byte("foo"), []byte("bar"), lease.NoLease) txn := &pb.TxnRequest{ + // note: since there are no compare terms here, we default to the success path Success: []*pb.RequestOp{ { Request: &pb.RequestOp_RequestRange{ @@ -339,7 +340,7 @@ func TestReadonlyTxnError(t *testing.T) { } } -func TestWriteTxnPanicWithoutApply(t *testing.T) { +func TestWriteTxnPanic_WhenOperationFails(t *testing.T) { b, bePath := betesting.NewDefaultTmpBackend(t) s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{}) defer s.Close() @@ -350,6 +351,7 @@ func TestWriteTxnPanicWithoutApply(t *testing.T) { // write txn that puts some data and then fails in range due to cancelled context txn := &pb.TxnRequest{ + // note: since there are no compare terms here, we default to the success path Success: []*pb.RequestOp{ { Request: &pb.RequestOp_RequestPut{ @@ -374,7 +376,7 @@ func TestWriteTxnPanicWithoutApply(t *testing.T) { require.NoErrorf(t, err, "failed to compute DB file hash before txn") // we verify the following properties below: - // 1. server panics after a write txn aply fails (invariant: server should never try to move on from a failed write) + // 1. server panics after a write txn apply fails (invariant: server should never try to move on from a failed write) // 2. no writes from the txn are applied to the backend (invariant: failed write should have no side-effect on DB state besides panic) assert.Panicsf(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes") dbHashAfter, err := computeFileHash(bePath) @@ -382,6 +384,74 @@ func TestWriteTxnPanicWithoutApply(t *testing.T) { require.Equalf(t, dbHashBefore, dbHashAfter, "mismatch in DB hash before and after failed write txn") } +func TestWriteTxnError_WhenPerformsReadonlyOperations(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{}) + defer s.Close() + + // setup cancelled context + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + + // put some data to prevent early termination in rangeKeys + // we are expecting failure on cancelled context check + s.Put([]byte("foo"), []byte("bar"), lease.NoLease) + + txn := &pb.TxnRequest{ + // note: since there are no compare terms here, we default to the success path + Success: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestRange{ + RequestRange: &pb.RangeRequest{ + Key: []byte("foo"), + }, + }, + }, + { + // Including a nested write txn below that is also effectively read-only - to test recursive correctness. + Request: &pb.RequestOp_RequestTxn{ + RequestTxn: &pb.TxnRequest{ + Success: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestRange{ + RequestRange: &pb.RangeRequest{ + Key: []byte("foo"), + }, + }, + }, + }, + Failure: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte("foo2"), + Value: []byte("bar2"), + }, + }, + }, + }, + }, + }, + }, + }, + Failure: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte("foo3"), + Value: []byte("bar3"), + }, + }, + }, + }, + } + + _, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) + if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") { + t.Fatalf("Expected context canceled error, got %v", err) + } +} + func TestCheckTxnAuth(t *testing.T) { be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be)