Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle read-only write txn failures gracefully #18803

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,11 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
if isWrite {
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
}
_, err := checkTxn(txnRead, rt, lessor, txnPath)
// checkTxn performs pre-flight checks on the txn to confirm that it's valid (e.g range operation is not asking for
// a future/compacted revision, put operations doesn't supply a lease which doesn't exist, etc) or returns an error.
// performsWrite says if the txn actually performs any write operations based on the compare choices we land with
// (i.e. txnPath) - which has been pre-computed. So even if isWrite was true for the txn, performsWrite may be false.
_, performsWrite, err := checkTxn(txnRead, rt, lessor, txnPath)
if err != nil {
txnRead.End()
return nil, nil, err
Expand All @@ -286,13 +290,13 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
// serialized on the raft loop, the revision in the read view will
// be the revision of the write txnWrite.
var txnWrite mvcc.TxnWrite
if isWrite {
if performsWrite {
txnRead.End()
txnWrite = kv.Write(trace)
} else {
txnWrite = mvcc.NewReadOnlyTxnWrite(txnRead)
}
txnResp, err := txn(ctx, lg, txnWrite, rt, isWrite, txnPath)
txnResp, err := txn(ctx, lg, txnWrite, rt, performsWrite, txnPath)
txnWrite.End()

trace.AddField(
Expand All @@ -314,6 +318,8 @@ func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.Txn
// - data inconsistency across different etcd members if they applied the txn asymmetrically
lg.Panic("unexpected error during txn with writes", zap.Error(err))
} else {
// Given the txn performs only read operations, it does not cause any apply layer side-effects.
// Therefore, it's safe for the server to return an error here (without panic) and continue applying subsequent records.
lg.Error("unexpected error during readonly txn", zap.Error(err))
}
}
Expand Down Expand Up @@ -441,33 +447,38 @@ func checkRange(rv mvcc.ReadView, req *pb.RangeRequest) error {
return nil
}

func checkTxn(rv mvcc.ReadView, rt *pb.TxnRequest, lessor lease.Lessor, txnPath []bool) (int, error) {
func checkTxn(rv mvcc.ReadView, rt *pb.TxnRequest, lessor lease.Lessor, txnPath []bool) (int, bool, error) {
txnCount := 0
performsWrite := false
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
}
for _, req := range reqs {
var err error
var txns int
var nestedTxnPerformsWrite bool
switch tv := req.Request.(type) {
case *pb.RequestOp_RequestRange:
err = checkRange(rv, tv.RequestRange)
case *pb.RequestOp_RequestPut:
err = checkPut(rv, lessor, tv.RequestPut)
performsWrite = true
case *pb.RequestOp_RequestDeleteRange:
performsWrite = true
case *pb.RequestOp_RequestTxn:
txns, err = checkTxn(rv, tv.RequestTxn, lessor, txnPath[1:])
txns, nestedTxnPerformsWrite, err = checkTxn(rv, tv.RequestTxn, lessor, txnPath[1:])
performsWrite = performsWrite || nestedTxnPerformsWrite
txnCount += txns + 1
txnPath = txnPath[txns+1:]
default:
// empty union
}
if err != nil {
return 0, err
return 0, performsWrite, err
}
}
return txnCount, nil
return txnCount, performsWrite, nil
}

// mkGteRange determines if the range end is a >= range. This works around grpc
Expand Down
76 changes: 73 additions & 3 deletions server/etcdserver/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func setup(t *testing.T, setup testSetup) (mvcc.KV, lease.Lessor) {
return s, lessor
}

func TestReadonlyTxnError(t *testing.T) {
func TestReadonlyTxnError_WhenOperationFails(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
Expand All @@ -322,6 +322,7 @@ func TestReadonlyTxnError(t *testing.T) {
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

txn := &pb.TxnRequest{
// note: since there are no compare terms here, we default to the success path
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestRange{
Expand All @@ -339,7 +340,7 @@ func TestReadonlyTxnError(t *testing.T) {
}
}

func TestWriteTxnPanicWithoutApply(t *testing.T) {
func TestWriteTxnPanic_WhenOperationFails(t *testing.T) {
b, bePath := betesting.NewDefaultTmpBackend(t)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()
Expand All @@ -350,6 +351,7 @@ func TestWriteTxnPanicWithoutApply(t *testing.T) {

// write txn that puts some data and then fails in range due to cancelled context
txn := &pb.TxnRequest{
// note: since there are no compare terms here, we default to the success path
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestPut{
Expand All @@ -374,14 +376,82 @@ func TestWriteTxnPanicWithoutApply(t *testing.T) {
require.NoErrorf(t, err, "failed to compute DB file hash before txn")

// we verify the following properties below:
// 1. server panics after a write txn aply fails (invariant: server should never try to move on from a failed write)
// 1. server panics after a write txn apply fails (invariant: server should never try to move on from a failed write)
// 2. no writes from the txn are applied to the backend (invariant: failed write should have no side-effect on DB state besides panic)
assert.Panicsf(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes")
dbHashAfter, err := computeFileHash(bePath)
require.NoErrorf(t, err, "failed to compute DB file hash after txn")
require.Equalf(t, dbHashBefore, dbHashAfter, "mismatch in DB hash before and after failed write txn")
}

func TestWriteTxnError_WhenPerformsReadonlyOperations(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// put some data to prevent early termination in rangeKeys
// we are expecting failure on cancelled context check
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

txn := &pb.TxnRequest{
// note: since there are no compare terms here, we default to the success path
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
{
// Including a nested write txn below that is also effectively read-only - to test recursive correctness.
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
Failure: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("foo2"),
Value: []byte("bar2"),
},
},
},
},
},
},
},
},
Failure: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("foo3"),
Value: []byte("bar3"),
},
},
},
},
}

_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{})
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
t.Fatalf("Expected context canceled error, got %v", err)
}
}

func TestCheckTxnAuth(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
Expand Down
Loading