From 5391e11b97a7c6d17610e4a056143e1c6530d750 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 15 Jan 2024 16:26:13 +0000 Subject: [PATCH 1/7] kvpb: mark `BarrierRequest` as `isAlone` Otherwise, `BatchRequest.RequiresConsensus()` may return `false` and not submit the barrier through Raft. Similarly, `shouldWaitOnLatchesWithoutAcquiring` will return `false` so it will contend with later writes. Barriers are not used in recent releases, so this does not have any mixed-version concerns. Epic: none Release note: None --- pkg/kv/kvpb/api.go | 2 +- pkg/kv/kvpb/batch.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index b9952375625f..f97a604e9ddc 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -1579,7 +1579,7 @@ func (*RangeStatsRequest) flags() flag { return isRead } func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot } -func (*BarrierRequest) flags() flag { return isWrite | isRange } +func (*BarrierRequest) flags() flag { return isWrite | isRange | isAlone } func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform diff --git a/pkg/kv/kvpb/batch.go b/pkg/kv/kvpb/batch.go index c3df7a5eb68f..2798d9db3b5f 100644 --- a/pkg/kv/kvpb/batch.go +++ b/pkg/kv/kvpb/batch.go @@ -215,6 +215,12 @@ func (ba *BatchRequest) IsSingleRequest() bool { return len(ba.Requests) == 1 } +// IsSingleBarrierRequest returns true iff the batch contains a single request, +// and that request is a Barrier. +func (ba *BatchRequest) IsSingleBarrierRequest() bool { + return ba.isSingleRequestWithMethod(Barrier) +} + // IsSingleSkipsLeaseCheckRequest returns true iff the batch contains a single // request, and that request has the skipsLeaseCheck flag set. func (ba *BatchRequest) IsSingleSkipsLeaseCheckRequest() bool { @@ -349,7 +355,7 @@ func (ba *BatchRequest) IsSingleExportRequest() bool { // a no-op. The Barrier request requires consensus even though its evaluation // is a no-op. func (ba *BatchRequest) RequiresConsensus() bool { - return ba.isSingleRequestWithMethod(Barrier) || ba.isSingleRequestWithMethod(Probe) + return ba.IsSingleBarrierRequest() || ba.IsSingleProbeRequest() } // IsCompleteTransaction determines whether a batch contains every write in a From b201a8bda00a01865788fadbebeb0c4462aad5f1 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 15 Jan 2024 16:29:43 +0000 Subject: [PATCH 2/7] batcheval: add `BarrierRequest.WithLeaseAppliedIndex` This can be used to detect whether a replica has applied the barrier command yet. Epic: none Release note: None --- pkg/kv/batch.go | 3 +- pkg/kv/db.go | 44 ++- pkg/kv/kvpb/api.go | 14 +- pkg/kv/kvpb/api.proto | 31 +- pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/cmd_barrier.go | 12 +- pkg/kv/kvserver/batcheval/cmd_barrier_test.go | 312 ++++++++++++++++++ pkg/kv/kvserver/batcheval/result/result.go | 27 +- pkg/kv/kvserver/replica_application_result.go | 11 + 9 files changed, 435 insertions(+), 20 deletions(-) create mode 100644 pkg/kv/kvserver/batcheval/cmd_barrier_test.go diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 183bc969a411..04aa2afa1f55 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -1052,7 +1052,7 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) { b.initResult(1, 0, notRaw, nil) } -func (b *Batch) barrier(s, e interface{}) { +func (b *Batch) barrier(s, e interface{}, withLAI bool) { begin, err := marshalKey(s) if err != nil { b.initResult(0, 0, notRaw, err) @@ -1068,6 +1068,7 @@ func (b *Batch) barrier(s, e interface{}) { Key: begin, EndKey: end, }, + WithLeaseAppliedIndex: withLAI, } b.appendReqs(req) b.initResult(1, 0, notRaw, nil) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 7cd87fd11081..a32a9d0904f2 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -803,23 +803,47 @@ func (db *DB) QueryResolvedTimestamp( // writes on the specified key range to finish. func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) { b := &Batch{} - b.barrier(begin, end) - err := getOneErr(db.Run(ctx, b), b) - if err != nil { + b.barrier(begin, end, false /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { return hlc.Timestamp{}, err } - responses := b.response.Responses - if len(responses) == 0 { - return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier") + if l := len(b.response.Responses); l != 1 { + return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l) } - resp, ok := responses[0].GetInner().(*kvpb.BarrierResponse) - if !ok { - return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier", - responses[0].GetInner()) + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) } return resp.Timestamp, nil } +// BarrierWithLAI is like Barrier, but also returns the lease applied index and +// range descriptor at which the barrier was applied. In this case, the barrier +// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned. +// +// NB: the protocol support for this was added in a patch release, and is not +// guaranteed to be present with nodes prior to 24.1. In this case, the request +// will return an empty result. +func (db *DB) BarrierWithLAI( + ctx context.Context, begin, end interface{}, +) (uint64, roachpb.RangeDescriptor, error) { + b := &Batch{} + b.barrier(begin, end, true /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { + return 0, roachpb.RangeDescriptor{}, err + } + if l := len(b.response.Responses); l != 1 { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l) + } + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) + } + return resp.LeaseAppliedIndex, resp.RangeDesc, nil +} + // sendAndFill is a helper which sends the given batch and fills its results, // returning the appropriate error which is either from the first failing call, // or an "internal" error. diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index f97a604e9ddc..97f85729ec36 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -600,6 +600,12 @@ func (r *BarrierResponse) combine(_ context.Context, c combinable, _ *BatchReque return err } r.Timestamp.Forward(otherR.Timestamp) + if r.LeaseAppliedIndex != 0 || otherR.LeaseAppliedIndex != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with LeaseAppliedIndex") + } + if r.RangeDesc.NextReplicaID != 0 || otherR.RangeDesc.NextReplicaID != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with RangeDesc") + } } return nil } @@ -1579,7 +1585,13 @@ func (*RangeStatsRequest) flags() flag { return isRead } func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot } -func (*BarrierRequest) flags() flag { return isWrite | isRange | isAlone } +func (r *BarrierRequest) flags() flag { + flags := isWrite | isRange | isAlone + if r.WithLeaseAppliedIndex { + flags |= isUnsplittable // the LAI is only valid for a single range + } + return flags +} func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 78db3eba50fe..c130829cc7a1 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2252,11 +2252,28 @@ message QueryResolvedTimestampResponse { (gogoproto.nullable) = false, (gogoproto.customname) = "ResolvedTS"]; } -// BarrierRequest is the request for a Barrier operation. This goes through Raft -// and has the purpose of waiting until all conflicting in-flight operations on -// this range have completed, without blocking any new operations. +// BarrierRequest is the request for a Barrier operation. This guarantees that +// all past and ongoing writes to a key span have completed and applied on the +// leaseholder. It does this by waiting for all conflicting write latches and +// then submitting a noop write through Raft, waiting for it to apply. Later +// writes are not affected -- in particular, it does not actually take out a +// latch, so writers don't have to wait for it to complete and can write below +// the barrier. message BarrierRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + + // WithLeaseAppliedIndex will return the LeaseAppliedIndex of the barrier + // command in the response, allowing the caller to wait for the barrier to + // apply on an arbitrary replica. It also returns the range descriptor, so the + // caller can detect any unexpected range changes. + // + // When enabled, the barrier request can no longer span multiple ranges, and + // will instead return RangeKeyMismatchError. The caller must be prepared to + // handle this. + // + // NB: This field was added in a patch release. Nodes prior to 24.1 are not + // guaranteed to support it, returning a zero LeaseAppliedIndex instead. + bool with_lease_applied_index = 2; } // BarrierResponse is the response for a Barrier operation. @@ -2266,6 +2283,14 @@ message BarrierResponse { // Timestamp at which this Barrier was evaluated. Can be used to guarantee // future operations happen on the same or newer leaseholders. util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + + // LeaseAppliedIndex at which this Barrier was applied. Only returned when + // requested via WithLeaseAppliedIndex. + uint64 lease_applied_index = 3; + + // RangeDesc at the time the barrier was applied. Only returned when requested + // via WithLeaseAppliedIndex. + RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false]; } // A RequestUnion contains exactly one of the requests. diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index c3507cd7075e..a604361299ee 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -100,6 +100,7 @@ go_test( size = "medium", srcs = [ "cmd_add_sstable_test.go", + "cmd_barrier_test.go", "cmd_clear_range_test.go", "cmd_delete_range_gchint_test.go", "cmd_delete_range_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier.go b/pkg/kv/kvserver/batcheval/cmd_barrier.go index 4b4b5e7b1e2c..45c233eab39c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_barrier.go +++ b/pkg/kv/kvserver/batcheval/cmd_barrier.go @@ -44,13 +44,19 @@ func declareKeysBarrier( latchSpans.AddNonMVCC(spanset.SpanReadWrite, req.Header().Span()) } -// Barrier evaluation is a no-op, as all the latch waiting happens in -// the latch manager. +// Barrier evaluation is a no-op, but it still goes through Raft because of +// BatchRequest.RequiresConsensus(). The latch waiting happens in the latch +// manager, and the WithLeaseAppliedIndex info is populated during application. func Barrier( _ context.Context, _ storage.ReadWriter, cArgs CommandArgs, response kvpb.Response, ) (result.Result, error) { + args := cArgs.Args.(*kvpb.BarrierRequest) resp := response.(*kvpb.BarrierResponse) resp.Timestamp = cArgs.EvalCtx.Clock().Now() - return result.Result{}, nil + return result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: args.WithLeaseAppliedIndex, + }, + }, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier_test.go b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go new file mode 100644 index 000000000000..6e38b592b4d9 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go @@ -0,0 +1,312 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval_test + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// TestBarrierEval tests basic Barrier evaluation. +func TestBarrierEval(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + start := roachpb.Key("a") + end := roachpb.Key("b") + + clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Now())) + ts := clock.Now() + evalCtx := (&batcheval.MockEvalCtx{Clock: clock}).EvalContext() + + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + resp := kvpb.BarrierResponse{} + res, err := batcheval.Barrier(ctx, nil, batcheval.CommandArgs{ + EvalCtx: evalCtx, + Args: &kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + }, + }, &resp) + require.NoError(t, err) + + require.Equal(t, result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: withLAI, + }, + }, res) + + // Ignore the logical timestamp component, which is incremented per reading. + resp.Timestamp.Logical = 0 + + require.Equal(t, kvpb.BarrierResponse{ + Timestamp: ts, + }, resp) + }) +} + +// TestBarrier is an integration test for Barrier. It tests that it processes +// the request and response properly, within a single range and across multiple +// ranges. +func TestBarrier(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Set up a test server. + srv, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + + store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID()) + require.NoError(t, err) + sender := kvDB.NonTransactionalSender() + + // We'll use /a to /z as our keyspace, and split off a range at /x. + prefix := srv.Codec().TenantPrefix() + _, _, err = srv.SplitRange(append(prefix, []byte("/x")...)) + require.NoError(t, err) + + // Send Barrier request with/without LeaseAppliedIndex, and within a single + // range or across multiple ranges. + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + testutils.RunTrueAndFalse(t, "crossRange", func(t *testing.T, crossRange bool) { + start := append(prefix, []byte("/a")...) + end := append(prefix, []byte("/b")...) + if crossRange { + end = append(prefix, []byte("/z")...) + } + repl := store.LookupReplica(roachpb.RKey(start)) + + tsBefore := srv.Clock().Now() + laiBefore := repl.GetLeaseAppliedIndex() + req := kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + } + respI, pErr := kv.SendWrapped(ctx, sender, &req) + + // WithLeaseAppliedIndex should return RangeKeyMismatchError when across + // multiple ranges. + if withLAI && crossRange { + require.Error(t, pErr.GoError()) + require.IsType(t, &kvpb.RangeKeyMismatchError{}, pErr.GoError()) + return + } + + require.NoError(t, pErr.GoError()) + resp, ok := respI.(*kvpb.BarrierResponse) + require.True(t, ok) + + // The timestamp must be after the request was sent. + require.True(t, tsBefore.LessEq(resp.Timestamp)) + + // If WithLeaseAppliedIndex is set, it also returns the LAI and range + // descriptor. + if withLAI { + require.GreaterOrEqual(t, resp.LeaseAppliedIndex, laiBefore) + require.GreaterOrEqual(t, repl.GetLeaseAppliedIndex(), resp.LeaseAppliedIndex) + require.Equal(t, *repl.Desc(), resp.RangeDesc) + } else { + require.Zero(t, resp.LeaseAppliedIndex) + require.Zero(t, resp.RangeDesc) + } + }) + }) +} + +// TestBarrierLatches tests Barrier latch interactions. Specifically, that it +// waits for in-flight requests to complete, but that it does not block later +// requests. +func TestBarrierLatches(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Too slow, times out. + skip.UnderRace(t) + skip.UnderDeadlock(t) + + // Use a timeout, to prevent blocking indefinitely if something goes wrong. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // We'll do reads and writes to two separate keys, with a range split in + // between. These keys will be under the tenant prefix. + readSuffix := roachpb.Key("/read") + splitSuffix := roachpb.Key("/split") + writeSuffix := roachpb.Key("/write") + + // Set up a request evaluation filter which will block Gets to /read and Puts + // to /write. These will signal that they're blocked via blockedC, and unblock + // when unblockC is closed. + // + // Unfortunately, we can't use a magic context to specify which requests to + // block, since this does not work with external process tenants which may be + // randomly enabled. We therefore have to match the actual keys. + blockedC := make(chan struct{}, 10) + unblockC := make(chan struct{}) + + evalFilter := func(args kvserverbase.FilterArgs) *kvpb.Error { + var shouldBlock bool + if key, err := keys.StripTenantPrefix(args.Req.Header().Key); err == nil { + if args.Req.Method() == kvpb.Get && bytes.Equal(key, readSuffix) { + shouldBlock = true + } + if args.Req.Method() == kvpb.Put && bytes.Equal(key, writeSuffix) { + shouldBlock = true + } + } + if shouldBlock { + // Notify callers that we're blocking. + select { + case blockedC <- struct{}{}: + t.Logf("blocked %s", args.Req) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + // Wait to unblock. + select { + case <-unblockC: + t.Logf("unblocked %s", args.Req) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + } + return nil + } + + // Set up a test server. + srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: evalFilter, + }, + }, + }, + }) + defer srv.Stopper().Stop(ctx) + + db := srv.DB() + store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID()) + require.NoError(t, err) + _ = store + + // Determine the tenant prefix and keys. + prefix := srv.Codec().TenantPrefix() + readKey := append(prefix, readSuffix...) + splitKey := append(prefix, splitSuffix...) + writeKey := append(prefix, writeSuffix...) + + // Set up helpers to run barriers, both sync and async. + barrier := func(ctx context.Context, start, end roachpb.Key, withLAI bool) (err error) { + if withLAI { + _, _, err = db.BarrierWithLAI(ctx, start, end) + } else { + _, err = db.Barrier(ctx, start, end) + } + return + } + + barrierAsync := func(ctx context.Context, start, end roachpb.Key, withLAI bool) <-chan error { + errC := make(chan error, 1) + go func() { + errC <- barrier(ctx, start, end, withLAI) + }() + return errC + } + + // Split off a range at /split, to test cross-range barriers. + _, _, err = srv.SplitRange(splitKey) + require.NoError(t, err) + + // Spawn read and write requests, and wait for them to block. + go func() { + _ = db.Put(ctx, writeKey, "value") + }() + go func() { + _, _ = db.Get(ctx, readKey) + }() + + for i := 0; i < 2; i++ { + select { + case <-blockedC: + case <-ctx.Done(): + require.NoError(t, ctx.Err()) + } + } + + // Barriers should not conflict outside of these keys. + require.NoError(t, barrier(ctx, readKey.Next(), splitKey, true /* withLAI */)) + require.NoError(t, barrier(ctx, splitKey, writeKey, true /* withLAI */)) + require.Error(t, barrier(ctx, readKey.Next(), writeKey, true /* withLAI */)) // can't span ranges + require.NoError(t, barrier(ctx, readKey.Next(), writeKey, false /* withLAI */)) + + // Barriers should not conflict with read requests. + require.NoError(t, barrier(ctx, readKey, readKey.Next(), true /* withLAI */)) + + // Barriers should conflict with write requests. We send off two barriers: one + // WithLAI in a single range, and another across ranges. Neither of these + // should return in a second. + withLAIC := barrierAsync(ctx, splitKey, writeKey.Next(), true /* withLAI */) + withoutLAIC := barrierAsync(ctx, readKey, writeKey.Next(), false /* withLAI */) + select { + case err := <-withLAIC: + t.Fatalf("WithLAI=true barrier returned prematurely: %v", err) + case err := <-withoutLAIC: + t.Fatalf("WithLAI=false barrier returned prematurely: %v", err) + case <-time.After(time.Second): + } + + // While the barriers are blocked, later overlapping requests should be able + // to proceed and evaluate below them. + require.NoError(t, db.Put(ctx, splitKey, "value")) + _, err = db.Get(ctx, splitKey) + require.NoError(t, err) + + // Unblock the requests. This should now unblock the barriers as well. + close(unblockC) + + select { + case err := <-withLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=true barrier did not return") + } + + select { + case err := <-withoutLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=false barrier did not return") + } +} diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 6a2136b03376..25fb2966d6f6 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -52,6 +52,9 @@ type LocalResult struct { // commit fails, or we may accidentally make uncommitted values // live. EndTxns []EndTxnIntents + // PopulateBarrierResponse will populate a BarrierResponse with the lease + // applied index and range descriptor when applied. + PopulateBarrierResponse bool // When set (in which case we better be the first range), call // GossipFirstRange if the Replica holds the lease. @@ -79,6 +82,7 @@ func (lResult *LocalResult) IsZero() bool { lResult.ResolvedLocks == nil && lResult.UpdatedTxns == nil && lResult.EndTxns == nil && + !lResult.PopulateBarrierResponse && !lResult.GossipFirstRange && !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && @@ -93,13 +97,13 @@ func (lResult *LocalResult) String() string { return fmt.Sprintf("LocalResult (reply: %v, "+ "#encountered intents: %d, #acquired locks: %d, #resolved locks: %d"+ "#updated txns: %d #end txns: %d, "+ - "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ + "PopulateBarrierResponse:%t GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ "MaybeGossipNodeLiveness:%s ", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), - lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, + lResult.PopulateBarrierResponse, lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, lResult.MaybeGossipNodeLiveness) } @@ -147,6 +151,17 @@ func (lResult *LocalResult) DetachEndTxns(alwaysOnly bool) []EndTxnIntents { return r } +// DetachPopulateBarrierResponse returns (and removes) the +// PopulateBarrierResponse value from the local result. +func (lResult *LocalResult) DetachPopulateBarrierResponse() bool { + if lResult == nil { + return false + } + r := lResult.PopulateBarrierResponse + lResult.PopulateBarrierResponse = false + return r +} + // Result is the result of evaluating a KV request. That is, the // proposer (which holds the lease, at least in the case in which the command // will complete successfully) has evaluated the request and is holding on to: @@ -368,6 +383,14 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Local.EndTxns = nil + if !p.Local.PopulateBarrierResponse { + p.Local.PopulateBarrierResponse = q.Local.PopulateBarrierResponse + } else { + // PopulateBarrierResponse is only valid for a single Barrier response. + return errors.AssertionFailedf("multiple PopulateBarrierResponse results") + } + q.Local.PopulateBarrierResponse = false + if p.Local.MaybeGossipNodeLiveness == nil { p.Local.MaybeGossipNodeLiveness = q.Local.MaybeGossipNodeLiveness } else if q.Local.MaybeGossipNodeLiveness != nil { diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index ece3765973e0..91d9259daf2f 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -218,6 +218,17 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { } cmd.response.EncounteredIntents = cmd.proposal.Local.DetachEncounteredIntents() cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + + // Populate BarrierResponse if requested. + if pErr == nil && cmd.proposal.Local.DetachPopulateBarrierResponse() { + if resp := cmd.response.Reply.Responses[0].GetBarrier(); resp != nil { + resp.LeaseAppliedIndex = cmd.LeaseIndex + resp.RangeDesc = *r.Desc() + } else { + log.Fatalf(ctx, "PopulateBarrierResponse for %T", cmd.response.Reply.Responses[0].GetInner()) + } + } + if pErr == nil { cmd.localResult = cmd.proposal.Local } else if cmd.localResult != nil { From 3dc880103feec11f4b7cbd086a9aeeb317c802a4 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 19 Jan 2024 10:28:30 +0000 Subject: [PATCH 3/7] kvnemsis: add support for `Barrier` operations This only executes random `Barrier` requests, but does not verify that the barrier guarantees are actually satisfied (i.e. that all past and concurrent writes are applied before it returns). At least we get some execution coverage, and verify that it does not have negative interactions with other operations. Epic: none Release note: None --- pkg/kv/kvnemesis/applier.go | 21 ++++++++++++ pkg/kv/kvnemesis/generator.go | 33 +++++++++++++++++++ pkg/kv/kvnemesis/generator_test.go | 5 ++- pkg/kv/kvnemesis/operations.go | 14 ++++++++ pkg/kv/kvnemesis/operations.proto | 8 +++++ pkg/kv/kvnemesis/operations_test.go | 2 ++ .../kvnemesis/testdata/TestOperationsFormat/5 | 3 ++ .../kvnemesis/testdata/TestOperationsFormat/6 | 3 ++ pkg/kv/kvnemesis/validator.go | 14 ++++++++ 9 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 create mode 100644 pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index 5cf894ddd88f..c0ef90c0afe1 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -136,6 +136,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { case *ChangeZoneOperation: err := updateZoneConfigInEnv(ctx, env, o.Type) o.Result = resultInit(ctx, err) + case *BarrierOperation: + var err error + if o.WithLeaseAppliedIndex { + _, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey) + } else { + _, err = db.Barrier(ctx, o.Key, o.EndKey) + } + o.Result = resultInit(ctx, err) case *ClosureTxnOperation: // Use a backoff loop to avoid thrashing on txn aborts. Don't wait between // epochs of the same transaction to avoid waiting while holding locks. @@ -370,6 +378,17 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { return } o.Result.OptionalTimestamp = ts + case *BarrierOperation: + _, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { + b.AddRawRequest(&kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: o.Key, + EndKey: o.EndKey, + }, + WithLeaseAppliedIndex: o.WithLeaseAppliedIndex, + }) + }) + o.Result = resultInit(ctx, err) case *BatchOperation: b := &kv.Batch{} applyBatchOp(ctx, b, db.Run, o) @@ -421,6 +440,8 @@ func applyBatchOp( setLastReqSeq(b, subO.Seq) case *AddSSTableOperation: panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`)) + case *BarrierOperation: + panic(errors.AssertionFailedf(`Barrier cannot be used in batches`)) default: panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO)) } diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index 691cca96bcc0..9a748548fefa 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -118,6 +118,8 @@ type ClientOperationConfig struct { DeleteRangeUsingTombstone int // AddSSTable is an operations that ingests an SSTable with random KV pairs. AddSSTable int + // Barrier is an operation that waits for in-flight writes to complete. + Barrier int } // BatchOperationConfig configures the relative probability of generating a @@ -199,6 +201,7 @@ func newAllOperationsConfig() GeneratorConfig { DeleteRange: 1, DeleteRangeUsingTombstone: 1, AddSSTable: 1, + Barrier: 1, } batchOpConfig := BatchOperationConfig{ Batch: 4, @@ -281,6 +284,12 @@ func NewDefaultConfig() GeneratorConfig { config.Ops.ClosureTxn.CommitBatchOps.AddSSTable = 0 config.Ops.ClosureTxn.TxnClientOps.AddSSTable = 0 config.Ops.ClosureTxn.TxnBatchOps.Ops.AddSSTable = 0 + // Barrier cannot be used in batches, and we omit it in txns too because it + // can result in spurious RangeKeyMismatchErrors that fail the txn operation. + config.Ops.Batch.Ops.Barrier = 0 + config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0 + config.Ops.ClosureTxn.TxnClientOps.Barrier = 0 + config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0 return config } @@ -474,6 +483,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig addOpGen(allowed, randDelRange, c.DeleteRange) addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone) addOpGen(allowed, randAddSSTable, c.AddSSTable) + addOpGen(allowed, randBarrier, c.Barrier) } func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) { @@ -659,6 +669,21 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation { return addSSTable(f.Data(), sstSpan, sstTimestamp, seq, asWrites) } +func randBarrier(g *generator, rng *rand.Rand) Operation { + withLAI := rng.Float64() < 0.5 + var key, endKey string + if withLAI { + // Barriers requesting LAIs can't span multiple ranges, so we try to fit + // them within an existing range. This may race with a concurrent split, in + // which case the Barrier will fail, but that's ok -- most should still + // succeed. These errors are ignored by the validator. + key, endKey = randRangeSpan(rng, g.currentSplits) + } else { + key, endKey = randSpan(rng) + } + return barrier(key, endKey, withLAI) +} + func randScan(g *generator, rng *rand.Rand) Operation { key, endKey := randSpan(rng) return scan(key, endKey) @@ -1098,3 +1123,11 @@ func addSSTable( AsWrites: asWrites, }} } + +func barrier(key, endKey string, withLAI bool) Operation { + return Operation{Barrier: &BarrierOperation{ + Key: []byte(key), + EndKey: []byte(endKey), + WithLeaseAppliedIndex: withLAI, + }} +} diff --git a/pkg/kv/kvnemesis/generator_test.go b/pkg/kv/kvnemesis/generator_test.go index 15504bad32ec..5d24f56af118 100644 --- a/pkg/kv/kvnemesis/generator_test.go +++ b/pkg/kv/kvnemesis/generator_test.go @@ -147,6 +147,8 @@ func TestRandStep(t *testing.T) { client.DeleteRangeUsingTombstone++ case *AddSSTableOperation: client.AddSSTable++ + case *BarrierOperation: + client.Barrier++ case *BatchOperation: batch.Batch++ countClientOps(&batch.Ops, nil, o.Ops...) @@ -167,7 +169,8 @@ func TestRandStep(t *testing.T) { *DeleteOperation, *DeleteRangeOperation, *DeleteRangeUsingTombstoneOperation, - *AddSSTableOperation: + *AddSSTableOperation, + *BarrierOperation: countClientOps(&counts.DB, &counts.Batch, step.Op) case *ClosureTxnOperation: countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...) diff --git a/pkg/kv/kvnemesis/operations.go b/pkg/kv/kvnemesis/operations.go index 74048534f847..28f5e600a5dd 100644 --- a/pkg/kv/kvnemesis/operations.go +++ b/pkg/kv/kvnemesis/operations.go @@ -41,6 +41,8 @@ func (op Operation) Result() *Result { return &o.Result case *AddSSTableOperation: return &o.Result + case *BarrierOperation: + return &o.Result case *SplitOperation: return &o.Result case *MergeOperation: @@ -131,6 +133,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) { o.format(w, fctx) case *AddSSTableOperation: o.format(w, fctx) + case *BarrierOperation: + o.format(w, fctx) case *SplitOperation: o.format(w, fctx) case *MergeOperation: @@ -311,6 +315,16 @@ func (op AddSSTableOperation) format(w *strings.Builder, fctx formatCtx) { } } +func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) { + if op.WithLeaseAppliedIndex { + fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`, + fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey)) + } else { + fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey)) + } + op.Result.format(w) +} + func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) { fmt.Fprintf(w, `%s.AdminSplit(ctx, %s)`, fctx.receiver, fmtKey(op.Key)) op.Result.format(w) diff --git a/pkg/kv/kvnemesis/operations.proto b/pkg/kv/kvnemesis/operations.proto index 19f4ac1f53af..0ef5ef22e67e 100644 --- a/pkg/kv/kvnemesis/operations.proto +++ b/pkg/kv/kvnemesis/operations.proto @@ -87,6 +87,13 @@ message AddSSTableOperation { Result result = 6 [(gogoproto.nullable) = false]; } +message BarrierOperation { + bytes key = 1; + bytes end_key = 2; + bool with_lease_applied_index = 3; + Result result = 4 [(gogoproto.nullable) = false]; +} + message SplitOperation { bytes key = 1; Result result = 2 [(gogoproto.nullable) = false]; @@ -142,6 +149,7 @@ message Operation { TransferLeaseOperation transfer_lease = 16; ChangeZoneOperation change_zone = 17; AddSSTableOperation add_sstable = 18 [(gogoproto.customname) = "AddSSTable"]; + BarrierOperation barrier = 22; } enum ResultType { diff --git a/pkg/kv/kvnemesis/operations_test.go b/pkg/kv/kvnemesis/operations_test.go index 6d435ca5d130..574b61a8caa8 100644 --- a/pkg/kv/kvnemesis/operations_test.go +++ b/pkg/kv/kvnemesis/operations_test.go @@ -85,6 +85,8 @@ func TestOperationsFormat(t *testing.T) { { step: step(addSSTable(sstFile.Data(), sstSpan, sstTS, sstValueHeader.KVNemesisSeq.Get(), true)), }, + {step: step(barrier(k1, k2, false /* withLAI */))}, + {step: step(barrier(k3, k4, true /* withLAI */))}, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 new file mode 100644 index 000000000000..b848afebfc9b --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 @@ -0,0 +1,3 @@ +echo +---- +···db0.Barrier(ctx, tk(1), tk(2)) diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 new file mode 100644 index 000000000000..92c0790f55de --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 @@ -0,0 +1,3 @@ +echo +---- +···db0.BarrierWithLAI(ctx, tk(3), tk(4)) diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 43f41a0c5731..391b64eb040c 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -648,6 +648,20 @@ func (v *validator) processOp(op Operation) { if v.buffering == bufferingSingle { v.checkAtomic(`addSSTable`, t.Result) } + case *BarrierOperation: + execTimestampStrictlyOptional = true + if op.Barrier.WithLeaseAppliedIndex && + resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) { + // Barriers requesting LAIs can't span ranges. The generator will + // optimistically try to fit the barrier inside one of the current ranges, + // but this may race with a split, so we ignore the error in this case and + // try again later. + } else { + // Fail or retry on other errors, depending on type. + v.checkNonAmbError(op, t.Result, exceptUnhandledRetry) + } + // We don't yet actually check the barrier guarantees here, i.e. that all + // concurrent writes are applied by the time it completes. Maybe later. case *ScanOperation: if _, isErr := v.checkError(op, t.Result); isErr { break From 49a1261200f977849093204588abc93d0bd3ade4 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 17 Jan 2024 13:31:38 +0000 Subject: [PATCH 4/7] kvserver: add `Replica.WaitForLeaseAppliedIndex()` This allows a caller to wait for a replica to reach a certain lease applied index. Similar functionality elsewhere is not migrated yet, out of caution. Epic: none Release note: None --- pkg/kv/kvserver/replica_command.go | 24 ++++++++ pkg/kv/kvserver/replica_command_test.go | 82 +++++++++++++++++++++++++ pkg/kv/kvserver/stores_server.go | 2 + 3 files changed, 108 insertions(+) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 47a24d7aefa3..8007dfcb25ef 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -895,6 +895,30 @@ func waitForReplicasInit( }) } +// WaitForLeaseAppliedIndex waits for the replica to reach the given lease +// applied index, or until the context is cancelled or the replica is destroyed. +// Note that the lease applied index may regress across restarts, since we don't +// sync state machine application to disk. +// +// TODO(erikgrinaker): it would be nice if we could be notified about LAI +// updates instead, but polling will do for now. +func (r *Replica) WaitForLeaseAppliedIndex(ctx context.Context, lai uint64) (uint64, error) { + retryOpts := retry.Options{ + InitialBackoff: 10 * time.Millisecond, + Multiplier: 2, + MaxBackoff: time.Second, + } + for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { + if currentLAI := r.GetLeaseAppliedIndex(); currentLAI >= lai { + return currentLAI, nil + } + if _, err := r.IsDestroyed(); err != nil { + return 0, err + } + } + return 0, ctx.Err() +} + // ChangeReplicas atomically changes the replicas that are members of a range. // The change is performed in a distributed transaction and takes effect when // that transaction is committed. This transaction confirms that the supplied diff --git a/pkg/kv/kvserver/replica_command_test.go b/pkg/kv/kvserver/replica_command_test.go index fc6ce2cd3c78..f78113d49878 100644 --- a/pkg/kv/kvserver/replica_command_test.go +++ b/pkg/kv/kvserver/replica_command_test.go @@ -13,7 +13,9 @@ package kvserver import ( "context" "encoding/binary" + math "math" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -23,6 +25,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -608,3 +613,80 @@ func TestSynthesizeTargetsByChangeType(t *testing.T) { }) } } + +func TestWaitForLeaseAppliedIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const maxLAI = math.MaxUint64 + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + tc := testContext{} + tc.Start(ctx, t, stopper) + db := tc.store.DB() + + // Submit a write and read it back to bump the initial LAI. + write := func(key, value string) { + require.NoError(t, db.Put(ctx, key, value)) + _, err := db.Get(ctx, key) + require.NoError(t, err) + } + write("foo", "bar") + + // Should return immediately when already at or past the LAI. + currentLAI := tc.repl.GetLeaseAppliedIndex() + require.NotZero(t, currentLAI) + resultLAI, err := tc.repl.WaitForLeaseAppliedIndex(ctx, currentLAI) + require.NoError(t, err) + require.GreaterOrEqual(t, resultLAI, currentLAI) + + // Should wait for a future LAI to be reached. + const numWrites = 10 + waitLAI := tc.repl.GetLeaseAppliedIndex() + numWrites + laiC := make(chan uint64, 1) + go func() { + lai, err := tc.repl.WaitForLeaseAppliedIndex(ctx, waitLAI) + assert.NoError(t, err) // can't use require in goroutine + laiC <- lai + }() + + select { + case lai := <-laiC: + t.Fatalf("unexpected early LAI %d", lai) + case <-time.After(time.Second): + } + + for i := 0; i < numWrites; i++ { + write("foo", "bar") + } + + select { + case lai := <-laiC: + require.GreaterOrEqual(t, lai, waitLAI) + require.GreaterOrEqual(t, tc.repl.GetLeaseAppliedIndex(), lai) + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for LAI %d", waitLAI) + } + + // Should error on context cancellation. + cancelCtx, cancel := context.WithCancel(ctx) + cancel() + _, err = tc.repl.WaitForLeaseAppliedIndex(cancelCtx, maxLAI) + require.Error(t, err) + require.Equal(t, cancelCtx.Err(), err) + + // Should error on destroyed replicas. + stopper.Stop(ctx) + + destroyErr := errors.New("destroyed") + tc.repl.mu.Lock() + tc.repl.mu.destroyStatus.Set(destroyErr, destroyReasonRemoved) + tc.repl.mu.Unlock() + + _, err = tc.repl.WaitForLeaseAppliedIndex(ctx, maxLAI) + require.Error(t, err) + require.Equal(t, destroyErr, err) +} diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index 4f2130ff7d53..944a1e4f2816 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -74,6 +74,8 @@ func (is Server) CollectChecksum( // // It is the caller's responsibility to cancel or set a timeout on the context. // If the context is never canceled, WaitForApplication will retry forever. +// +// TODO(erikgrinaker): consider using Replica.WaitForLeaseAppliedIndex(). func (is Server) WaitForApplication( ctx context.Context, req *WaitForApplicationRequest, ) (*WaitForApplicationResponse, error) { From a70825e3427e1d0aa1f41eb8366ca66961b7f328 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 16 Jan 2024 12:30:42 +0000 Subject: [PATCH 5/7] batcheval: add `PushTxnResponse.AmbiguousAbort` This indicates to the caller that the `ABORTED` status of the pushed transaction is ambiguous, and the transaction may in fact have been committed and GCed already. This information is also plumbed through the `IntentResolver` txn push APIs. Epic: none Release note: None --- pkg/kv/kvpb/api.proto | 9 ++ pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/cmd_push_txn.go | 4 + .../kvserver/batcheval/cmd_push_txn_test.go | 104 ++++++++++++++++++ .../concurrency/concurrency_manager_test.go | 22 ++-- .../kvserver/concurrency/lock_table_waiter.go | 6 +- .../concurrency/lock_table_waiter_test.go | 30 ++--- .../intentresolver/intent_resolver.go | 37 ++++--- pkg/kv/kvserver/replica_rangefeed.go | 2 +- 9 files changed, 172 insertions(+), 43 deletions(-) create mode 100644 pkg/kv/kvserver/batcheval/cmd_push_txn_test.go diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index c130829cc7a1..039b6d5a92f2 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -1231,6 +1231,15 @@ message PushTxnResponse { // TODO(tschottdorf): Maybe this can be a TxnMeta instead; probably requires // factoring out the new Priority. Transaction pushee_txn = 2 [(gogoproto.nullable) = false]; + // ambiguous_abort is true if pushee_txn has status ABORTED, but the + // transaction may in fact have been committed and GCed already. Concretely, + // this means that the transaction record does not exist, but it may have + // existed in the past (according to the timestamp cache), and we can't know + // whether it committed or aborted so we pessimistically assume it aborted. + // + // NB: this field was added in a patch release, and is not guaranteed to be + // populated prior to 24.1. + bool ambiguous_abort = 3; } // A RecoverTxnRequest is arguments to the RecoverTxn() method. It is sent diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index a604361299ee..f694038e39ad 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -109,6 +109,7 @@ go_test( "cmd_get_test.go", "cmd_is_span_empty_test.go", "cmd_lease_test.go", + "cmd_push_txn_test.go", "cmd_query_intent_test.go", "cmd_query_resolved_timestamp_test.go", "cmd_recover_txn_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go index fc9ad00c79b9..81d7efa00abc 100644 --- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go @@ -171,6 +171,10 @@ func PushTxn( // then we know we're in either the second or the third case. reply.PusheeTxn = SynthesizeTxnFromMeta(ctx, cArgs.EvalCtx, args.PusheeTxn) if reply.PusheeTxn.Status == roachpb.ABORTED { + // The transaction may actually have committed and already removed its + // intents and txn record, or it may have aborted and done the same. We + // can't know, so mark the abort as ambiguous. + reply.AmbiguousAbort = true // If the transaction is uncommittable, we don't even need to // persist an ABORTED transaction record, we can just consider it // aborted. This is good because it allows us to obey the invariant diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn_test.go b/pkg/kv/kvserver/batcheval/cmd_push_txn_test.go new file mode 100644 index 000000000000..2ddaff4741f8 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn_test.go @@ -0,0 +1,104 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// TestPushTxnAmbiguousAbort tests PushTxn behavior when the transaction record +// is missing. In this case, the timestamp cache can tell us whether the +// transaction record may have existed in the past -- if we know it hasn't, then +// the transaction is still pending (e.g. before the record is written), but +// otherwise the transaction record is pessimistically assumed to have aborted. +// However, this state is ambiguous, as the transaction may in fact have +// committed already and GCed its transaction record. Make sure this is +// reflected in the AmbiguousAbort field. +// +// TODO(erikgrinaker): generalize this to test PushTxn more broadly. +func TestPushTxnAmbiguousAbort(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Now())) + now := clock.Now() + engine := storage.NewDefaultInMemForTesting() + defer engine.Close() + + testutils.RunTrueAndFalse(t, "CanCreateTxnRecord", func(t *testing.T, canCreateTxnRecord bool) { + evalCtx := (&batcheval.MockEvalCtx{ + Clock: clock, + CanCreateTxnRecordFn: func() (bool, kvpb.TransactionAbortedReason) { + return canCreateTxnRecord, 0 // PushTxn doesn't care about the reason + }, + }).EvalContext() + + key := roachpb.Key("foo") + pusheeTxnMeta := enginepb.TxnMeta{ + ID: uuid.MakeV4(), + Key: key, + MinTimestamp: now, + } + + resp := kvpb.PushTxnResponse{} + res, err := batcheval.PushTxn(ctx, engine, batcheval.CommandArgs{ + EvalCtx: evalCtx, + Header: kvpb.Header{ + Timestamp: clock.Now(), + }, + Args: &kvpb.PushTxnRequest{ + RequestHeader: kvpb.RequestHeader{Key: key}, + PusheeTxn: pusheeTxnMeta, + }, + }, &resp) + require.NoError(t, err) + + // There is no txn record (the engine is empty). If we can't create a txn + // record, it's because the timestamp cache can't confirm that it didn't + // exist in the past. This will return an ambiguous abort. + var expectUpdatedTxns []*roachpb.Transaction + expectTxn := roachpb.Transaction{ + TxnMeta: pusheeTxnMeta, + LastHeartbeat: pusheeTxnMeta.MinTimestamp, + } + if !canCreateTxnRecord { + expectTxn.Status = roachpb.ABORTED + expectUpdatedTxns = append(expectUpdatedTxns, &expectTxn) + } + + require.Equal(t, result.Result{ + Local: result.LocalResult{ + UpdatedTxns: expectUpdatedTxns, + }, + }, res) + require.Equal(t, kvpb.PushTxnResponse{ + PusheeTxn: expectTxn, + AmbiguousAbort: !canCreateTxnRecord, + }, resp) + }) +} diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index c01a2e328683..7443d53ef77a 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -693,22 +693,22 @@ func (c *cluster) makeConfig() concurrency.Config { // PushTransaction implements the concurrency.IntentResolver interface. func (c *cluster) PushTransaction( ctx context.Context, pushee *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, -) (*roachpb.Transaction, *kvpb.Error) { +) (*roachpb.Transaction, bool, *kvpb.Error) { pusheeRecord, err := c.getTxnRecord(pushee.ID) if err != nil { - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } var pusherRecord *txnRecord if h.Txn != nil { pusherID := h.Txn.ID pusherRecord, err = c.getTxnRecord(pusherID) if err != nil { - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } push, err := c.registerPush(ctx, pusherID, pushee.ID) if err != nil { - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } defer c.unregisterPush(push) } @@ -727,10 +727,10 @@ func (c *cluster) PushTransaction( switch { case pusheeTxn.Status.IsFinalized(): // Already finalized. - return pusheeTxn, nil + return pusheeTxn, false, nil case pushType == kvpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp): // Already pushed. - return pusheeTxn, nil + return pusheeTxn, false, nil case pushType == kvpb.PUSH_TOUCH: pusherWins = false case txnwait.CanPushWithPriority(pusherPriority, pusheeTxn.Priority): @@ -750,16 +750,16 @@ func (c *cluster) PushTransaction( err = errors.Errorf("unexpected push type: %s", pushType) } if err != nil { - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } pusheeTxn, _ = pusheeRecord.asTxn() - return pusheeTxn, nil + return pusheeTxn, false, nil } // If PUSH_TOUCH, return error instead of waiting. if pushType == kvpb.PUSH_TOUCH { log.Eventf(ctx, "pushee not abandoned") err := kvpb.NewTransactionPushError(*pusheeTxn) - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } // Or the pusher aborted? var pusherRecordSig chan struct{} @@ -769,7 +769,7 @@ func (c *cluster) PushTransaction( if pusherTxn.Status == roachpb.ABORTED { log.Eventf(ctx, "detected pusher aborted") err := kvpb.NewTransactionAbortedError(kvpb.ABORT_REASON_PUSHER_ABORTED) - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } } // Wait until either record is updated. @@ -777,7 +777,7 @@ func (c *cluster) PushTransaction( case <-pusheeRecordSig: case <-pusherRecordSig: case <-ctx.Done(): - return nil, kvpb.NewError(ctx.Err()) + return nil, false, kvpb.NewError(ctx.Err()) } } } diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index 34370bf585c4..43fd48ed1c54 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -124,7 +124,7 @@ type IntentResolver interface { // pushed successfully. PushTransaction( context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) + ) (*roachpb.Transaction, bool, *Error) // ResolveIntent synchronously resolves the provided intent. ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error @@ -518,7 +518,7 @@ func (w *lockTableWaiterImpl) pushLockTxn( log.Fatalf(ctx, "unexpected WaitPolicy: %v", req.WaitPolicy) } - pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) + pusheeTxn, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) if err != nil { // If pushing with an Error WaitPolicy and the push fails, then the lock // holder is still active. Transform the error into a WriteIntentError. @@ -700,7 +700,7 @@ func (w *lockTableWaiterImpl) pushRequestTxn( pushType := kvpb.PUSH_ABORT log.VEventf(ctx, 3, "pushing txn %s to detect request deadlock", ws.txn.ID.Short()) - _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) + _, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) if err != nil { return err } diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index c13c9b789503..d4ffff40e764 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -38,7 +38,7 @@ import ( ) type mockIntentResolver struct { - pushTxn func(context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType) (*roachpb.Transaction, *Error) + pushTxn func(context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType) (*roachpb.Transaction, bool, *Error) resolveIntent func(context.Context, roachpb.LockUpdate) *Error resolveIntents func(context.Context, []roachpb.LockUpdate) *Error } @@ -46,7 +46,7 @@ type mockIntentResolver struct { // mockIntentResolver implements the IntentResolver interface. func (m *mockIntentResolver) PushTransaction( ctx context.Context, txn *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, -) (*roachpb.Transaction, *Error) { +) (*roachpb.Transaction, bool, *Error) { return m.pushTxn(ctx, txn, h, pushType) } @@ -351,7 +351,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl pusheeArg *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { + ) (*roachpb.Transaction, bool, *Error) { require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) require.Equal(t, req.Txn, h.Txn) require.Equal(t, expPushTS, h.Timestamp) @@ -384,7 +384,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl g.state = waitingState{kind: doneWaiting} g.notify() } - return resp, nil + return resp, false, nil } err := w.WaitOn(ctx, req, g) @@ -536,7 +536,7 @@ func testErrorWaitPush( pusheeArg *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { + ) (*roachpb.Transaction, bool, *Error) { require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) require.Equal(t, req.Txn, h.Txn) require.Equal(t, expPushTS, h.Timestamp) @@ -544,7 +544,7 @@ func testErrorWaitPush( resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING} if pusheeActive { - return nil, kvpb.NewError(&kvpb.TransactionPushError{ + return nil, false, kvpb.NewError(&kvpb.TransactionPushError{ PusheeTxn: *resp, }) } @@ -565,7 +565,7 @@ func testErrorWaitPush( return nil } resp.Status = roachpb.ABORTED - return resp, nil + return resp, false, nil } err := w.WaitOn(ctx, req, g) @@ -708,7 +708,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { pusheeArg *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { + ) (*roachpb.Transaction, bool, *Error) { require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) require.Equal(t, req.Txn, h.Txn) @@ -721,7 +721,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { // Wait for the context to hit its timeout. <-ctx.Done() - return nil, kvpb.NewError(ctx.Err()) + return nil, false, kvpb.NewError(ctx.Err()) } require.Equal(t, kvpb.PUSH_TOUCH, pushType) @@ -731,7 +731,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING} if pusheeActive { - return nil, kvpb.NewError(&kvpb.TransactionPushError{ + return nil, false, kvpb.NewError(&kvpb.TransactionPushError{ PusheeTxn: *resp, }) } @@ -752,7 +752,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { return nil } resp.Status = roachpb.ABORTED - return resp, nil + return resp, false, nil } err := w.WaitOn(ctx, req, g) @@ -808,8 +808,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { g.notify() ir.pushTxn = func( _ context.Context, _ *enginepb.TxnMeta, _ kvpb.Header, _ kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { - return nil, err1 + ) (*roachpb.Transaction, bool, *Error) { + return nil, false, err1 } err := w.WaitOn(ctx, req, g) require.Equal(t, err1, err) @@ -819,8 +819,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { g.notify() ir.pushTxn = func( _ context.Context, _ *enginepb.TxnMeta, _ kvpb.Header, _ kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { - return &pusheeTxn, nil + ) (*roachpb.Transaction, bool, *Error) { + return &pusheeTxn, false, nil } ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error { return err2 diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index 65ad954b0bbd..5a25b9a2a8e2 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -318,21 +318,26 @@ func updateIntentTxnStatus( // PushTransaction takes a transaction and pushes its record using the specified // push type and request header. It returns the transaction proto corresponding -// to the pushed transaction. +// to the pushed transaction, and in the case of an ABORTED transaction, a bool +// indicating whether the abort was ambiguous (see +// PushTxnResponse.AmbiguousAbort). +// +// NB: ambiguousAbort may be false with nodes <24.1. func (ir *IntentResolver) PushTransaction( ctx context.Context, pushTxn *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, -) (*roachpb.Transaction, *kvpb.Error) { +) (_ *roachpb.Transaction, ambiguousAbort bool, _ *kvpb.Error) { pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta, 1) pushTxns[pushTxn.ID] = pushTxn - pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, false /* skipIfInFlight */) + pushedTxns, ambiguousAbort, pErr := ir.MaybePushTransactions( + ctx, pushTxns, h, pushType, false /* skipIfInFlight */) if pErr != nil { - return nil, pErr + return nil, false, pErr } pushedTxn, ok := pushedTxns[pushTxn.ID] if !ok { log.Fatalf(ctx, "missing PushTxn responses for %s", pushTxn) } - return pushedTxn, nil + return pushedTxn, ambiguousAbort, nil } // MaybePushTransactions tries to push the conflicting transaction(s): @@ -340,8 +345,12 @@ func (ir *IntentResolver) PushTransaction( // it on a write/write conflict, or doing nothing if the transaction is no // longer pending. // -// Returns a set of transaction protos who correspond to the pushed -// transactions and whose intents can now be resolved, and an error. +// Returns a set of transaction protos who correspond to the pushed transactions +// and whose intents can now be resolved, along with a bool indicating whether +// any of the responses were an ambiguous abort (see +// PushTxnResponse.AmbiguousAbort), and an error. +// +// NB: anyAmbiguousAbort may be false with nodes <24.1. // // If skipIfInFlight is true, then no PushTxns will be sent and no intents // will be returned for any transaction for which there is another push in @@ -366,7 +375,7 @@ func (ir *IntentResolver) MaybePushTransactions( h kvpb.Header, pushType kvpb.PushTxnType, skipIfInFlight bool, -) (map[uuid.UUID]*roachpb.Transaction, *kvpb.Error) { +) (_ map[uuid.UUID]*roachpb.Transaction, anyAmbiguousAbort bool, _ *kvpb.Error) { // Decide which transactions to push and which to ignore because // of other in-flight requests. For those transactions that we // will be pushing, increment their ref count in the in-flight @@ -397,7 +406,7 @@ func (ir *IntentResolver) MaybePushTransactions( } ir.mu.Unlock() if len(pushTxns) == 0 { - return nil, nil + return nil, false, nil } pusherTxn := getPusherTxn(h) @@ -422,7 +431,7 @@ func (ir *IntentResolver) MaybePushTransactions( err := ir.db.Run(ctx, b) cleanupInFlightPushes() if err != nil { - return nil, b.MustPErr() + return nil, false, b.MustPErr() } // TODO(nvanbenschoten): if we succeed because the transaction has already @@ -432,14 +441,16 @@ func (ir *IntentResolver) MaybePushTransactions( br := b.RawResponse() pushedTxns := make(map[uuid.UUID]*roachpb.Transaction, len(br.Responses)) for _, resp := range br.Responses { - txn := &resp.GetInner().(*kvpb.PushTxnResponse).PusheeTxn + resp := resp.GetInner().(*kvpb.PushTxnResponse) + txn := &resp.PusheeTxn + anyAmbiguousAbort = anyAmbiguousAbort || resp.AmbiguousAbort if _, ok := pushedTxns[txn.ID]; ok { log.Fatalf(ctx, "have two PushTxn responses for %s", txn.ID) } pushedTxns[txn.ID] = txn log.Eventf(ctx, "%s is now %s", txn.ID, txn.Status) } - return pushedTxns, nil + return pushedTxns, anyAmbiguousAbort, nil } // runAsyncTask semi-synchronously runs a generic task function. If @@ -543,7 +554,7 @@ func (ir *IntentResolver) CleanupIntents( } } - pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, skipIfInFlight) + pushedTxns, _, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, skipIfInFlight) if pErr != nil { return 0, errors.Wrapf(pErr.GoError(), "failed to push during intent resolution") } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 99160d3244ce..b19f63b634f6 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -126,7 +126,7 @@ func (tp *rangefeedTxnPusher) PushTxns( }, } - pushedTxnMap, pErr := tp.ir.MaybePushTransactions( + pushedTxnMap, _, pErr := tp.ir.MaybePushTransactions( ctx, pushTxnMap, h, kvpb.PUSH_TIMESTAMP, false, /* skipIfInFlight */ ) if pErr != nil { From b06378f9e62a8aa59060314861f73e48488d2b0b Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 15 Nov 2023 12:23:20 +0000 Subject: [PATCH 6/7] rangefeed: fix premature checkpoint due to intent resolution race It was possible for rangefeeds to emit a premature checkpoint, before all writes below its timestamp had been emitted. This in turn would cause changefeeds to not emit these write events at all. This could happen because `PushTxn` may return a false `ABORTED` status for a transaction that has in fact been committed, if the transaction record has been GCed (after resolving all intents). The timestamp cache does not retain sufficient information to disambiguate a committed transaction from an aborted one in this case, so it pessimistically assumes an abort (see `Replica.CanCreateTxnRecord` and `batcheval.SynthesizeTxnFromMeta`). However, the rangefeed txn pusher trusted this `ABORTED` status, ignoring the pending txn intents and allowing the resolved timestamp to advance past them before emitting the committed intents. This can lead to the following scenario: - A rangefeed is running on a lagging follower. - A txn writes an intent, which is replicated to the follower. - The closed timestamp advances past the intent. - The txn commits and resolves the intent at the original write timestamp, then GCs its txn record. This is not yet applied on the follower. - The rangefeed pushes the txn to advance its resolved timestamp. - The txn is GCed, so the push returns ABORTED (it can't know whether the txn was committed or aborted after its record is GCed). - The rangefeed disregards the "aborted" txn and advances the resolved timestamp, emitting a checkpoint. - The follower applies the resolved intent and emits an event below the checkpoint, violating the checkpoint guarantee. - The changefeed sees an event below its frontier and drops it, never emitting these events at all. This patch fixes the bug by submitting a barrier command to the leaseholder which waits for all past and ongoing writes (including intent resolution) to complete and apply, and then waits for the local replica to apply the barrier as well. This ensures any committed intent resolution will be applied and emitted before the transaction is removed from resolved timestamp tracking. Epic: none Release note (bug fix): fixed a bug where a changefeed could omit events in rare cases, logging the error "cdc ux violation: detected timestamp ... that is less or equal to the local frontier". This can happen if a rangefeed runs on a follower replica that lags significantly behind the leaseholder, a transaction commits and removes its transaction record before its intent resolution is applied on the follower, the follower's closed timestamp has advanced past the transaction commit timestamp, and the rangefeed attempts to push the transaction to a new timestamp (at least 10 seconds after the transaction began). This may cause the rangefeed to prematurely emit a checkpoint before emitting writes at lower timestamps, which in turn may cause the changefeed to drop these events entirely, never emitting them. --- pkg/kv/kvclient/rangefeed/BUILD.bazel | 5 + .../rangefeed/rangefeed_external_test.go | 326 ++++++++++++++++++ pkg/kv/kvserver/rangefeed/BUILD.bazel | 1 + pkg/kv/kvserver/rangefeed/processor.go | 13 + pkg/kv/kvserver/rangefeed/processor_test.go | 24 +- pkg/kv/kvserver/rangefeed/task.go | 58 +++- pkg/kv/kvserver/rangefeed/task_test.go | 14 +- pkg/kv/kvserver/replica_rangefeed.go | 49 ++- 8 files changed, 463 insertions(+), 27 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index e3dad86f78db..3187eefb0d8b 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -71,6 +71,8 @@ go_test( "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvpb", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/closedts", + "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", @@ -82,14 +84,17 @@ go_test( "//pkg/storage", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/ctxgroup", "//pkg/util/encoding", + "//pkg/util/future", "//pkg/util/hlc", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/mon", "//pkg/util/retry", "//pkg/util/stop", diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index 5e677b2cc4b1..b96110936675 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -14,26 +14,34 @@ import ( "context" "runtime/pprof" "sync" + "sync/atomic" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -1155,3 +1163,321 @@ func TestRangeFeedStartTimeExclusive(t *testing.T) { t.Fatal("timed out waiting for event") } } + +// TestRangeFeedIntentResolutionRace is a regression test for +// https://github.com/cockroachdb/cockroach/issues/104309, i.e. the +// following scenario: +// +// - A rangefeed is running on a lagging follower. +// - A txn writes an intent, which is replicated to the follower. +// - The closed timestamp advances past the intent. +// - The txn commits and resolves the intent at the original write timestamp, +// then GCs its txn record. This is not yet applied on the follower. +// - The rangefeed pushes the txn to advance its resolved timestamp. +// - The txn is GCed, so the push returns ABORTED (it can't know whether the +// txn was committed or aborted after its record is GCed). +// - The rangefeed disregards the "aborted" txn and advances the resolved +// timestamp, emitting a checkpoint. +// - The follower applies the resolved intent and emits an event below +// the checkpoint, violating the checkpoint guarantee. +// +// This scenario is addressed by running a Barrier request through Raft and +// waiting for it to apply locally before removing the txn from resolved ts +// tracking. This ensures the pending intent resolution is applied before +// the resolved ts can advance. +func TestRangeFeedIntentResolutionRace(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) // too slow, times out + skip.UnderDeadlock(t) + + // Use a timeout, to prevent a hung test. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + // defer cancel() after Stopper.Stop(), so the context cancels first. + // Otherwise, the stopper will hang waiting for a rangefeed whose context is + // not yet cancelled. + + // Set up an apply filter that blocks Raft application on n3 (follower) for + // the given range. + var blockRangeOnN3 atomic.Int64 + unblockRangeC := make(chan struct{}) + applyFilter := func(args kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) { + if args.StoreID == 3 { + if rangeID := blockRangeOnN3.Load(); rangeID > 0 && rangeID == int64(args.RangeID) { + t.Logf("blocked r%d on s%d", args.RangeID, args.StoreID) + select { + case <-unblockRangeC: + t.Logf("unblocked r%d on s%d", args.RangeID, args.StoreID) + case <-ctx.Done(): + return 0, kvpb.NewError(ctx.Err()) + } + } + } + return 0, nil + } + + // Set up a request filter that blocks transaction pushes for a specific key. + // This is used to prevent the rangefeed txn pusher from pushing the txn + // timestamp above the closed timestamp before it commits, only allowing the + // push to happen after the transaction has committed and GCed below the + // closed timestamp. + var blockPush atomic.Pointer[roachpb.Key] + unblockPushC := make(chan struct{}) + reqFilter := func(ctx context.Context, br *kvpb.BatchRequest) *kvpb.Error { + if br.IsSinglePushTxnRequest() { + req := br.Requests[0].GetPushTxn() + if key := blockPush.Load(); key != nil && req.Key.Equal(*key) { + t.Logf("blocked push for txn %s", req.PusheeTxn) + select { + case <-unblockPushC: + t.Logf("unblocked push for txn %s", req.PusheeTxn) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + } + } + return nil + } + + // Speed up the test by reducing various closed/resolved timestamp intervals. + const interval = 100 * time.Millisecond + st := cluster.MakeTestingClusterSettings() + kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, interval) + closedts.SideTransportCloseInterval.Override(ctx, &st.SV, interval) + closedts.TargetDuration.Override(ctx, &st.SV, interval) + + // Start a cluster with 3 nodes. + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: reqFilter, + TestingApplyCalledTwiceFilter: applyFilter, + RangeFeedPushTxnsInterval: interval, + RangeFeedPushTxnsAge: interval, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + defer cancel() + + n1 := tc.Server(0) + s3 := tc.GetFirstStoreFromServer(t, 2) + clock := n1.Clock() + + // Determine the key/value we're going to write. + prefix := append(n1.Codec().TenantPrefix(), keys.ScratchRangeMin...) + key := append(prefix.Clone(), []byte("/foo")...) + value := []byte("value") + + // Split off a range and upreplicate it, with leaseholder on n1. + _, _, err := n1.SplitRange(prefix) + require.NoError(t, err) + desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...) + t.Logf("split off range %s", desc) + + repl1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(prefix)) // leaseholder + repl3 := tc.GetFirstStoreFromServer(t, 2).LookupReplica(roachpb.RKey(prefix)) // lagging follower + + require.True(t, repl1.OwnsValidLease(ctx, clock.NowAsClockTimestamp())) + + // Block pushes of the txn, to ensure it can write at a fixed timestamp. + // Otherwise, the rangefeed or someone else may push it beyond the closed + // timestamp. + blockPush.Store(&key) + + // We'll use n3 as our lagging follower. Start a rangefeed on it directly. + req := kvpb.RangeFeedRequest{ + Header: kvpb.Header{ + RangeID: desc.RangeID, + }, + Span: desc.RSpan().AsRawSpanWithNoLocals(), + } + eventC := make(chan *kvpb.RangeFeedEvent) + sink := newChannelSink(ctx, eventC) + fErr := future.MakeAwaitableFuture(s3.RangeFeed(&req, sink)) + require.NoError(t, fErr.Get()) // check if we've errored yet + t.Logf("started rangefeed on %s", repl3) + + // Spawn a rangefeed monitor, which posts checkpoint updates to checkpointC. + // This uses a buffered channel of size 1, and empties it out before posting a + // new update, such that it contains the latest known checkpoint and does not + // block the rangefeed. It also posts emitted values for our key to valueC, + // which should only happen once. + checkpointC := make(chan *kvpb.RangeFeedCheckpoint, 1) + valueC := make(chan *kvpb.RangeFeedValue, 1) + go func() { + defer close(checkpointC) + defer close(valueC) + for { + select { + case e := <-eventC: + switch { + case e.Checkpoint != nil: + // Clear checkpointC such that it always contains the latest. + select { + case <-checkpointC: + default: + } + checkpointC <- e.Checkpoint + case e.Val != nil && e.Val.Key.Equal(key): + select { + case valueC <- e.Val: + default: + t.Errorf("duplicate value event for %s: %s", key, e) + } + } + case <-ctx.Done(): + return + } + } + }() + + waitForCheckpoint := func(t *testing.T, ts hlc.Timestamp) hlc.Timestamp { + t.Helper() + timeoutC := time.After(3 * time.Second) + for { + select { + case c := <-checkpointC: + require.NotNil(t, c, "nil checkpoint") + if ts.LessEq(c.ResolvedTS) { + t.Logf("rangefeed checkpoint at %s >= %s", c.ResolvedTS, ts) + return c.ResolvedTS + } + case <-timeoutC: + require.Fail(t, "timed out waiting for checkpoint", "wanted %s", ts) + } + } + } + + // Wait for the initial checkpoint. + rts := waitForCheckpoint(t, clock.Now()) + t.Logf("initial checkpoint at %s", rts) + + // Start a new transaction on n1 with a fixed timestamp (to make sure it + // remains below the closed timestamp). Write an intent, and read it back to + // make sure it has applied. + writeTS := clock.Now() + txn := n1.DB().NewTxn(ctx, "test") + require.NoError(t, txn.SetFixedTimestamp(ctx, writeTS)) + require.NoError(t, txn.Put(ctx, key, value)) + _, err = txn.Get(ctx, key) + require.NoError(t, err) + t.Logf("wrote %s", key) + + // Wait for both the leaseholder and the follower to close the transaction's + // write timestamp. + waitForClosedTimestamp := func(t *testing.T, repl *kvserver.Replica, ts hlc.Timestamp) hlc.Timestamp { + t.Helper() + timeoutC := time.After(3 * time.Second) + for { + if closedTS := repl.GetCurrentClosedTimestamp(ctx); ts.LessEq(closedTS) { + t.Logf("replica %s closed timestamp at %s >= %s", repl, closedTS, ts) + return closedTS + } + select { + case <-time.After(10 * time.Millisecond): + case <-timeoutC: + require.Fail(t, "timeout out waiting for closed timestamp", "wanted %s", ts) + } + } + } + cts := waitForClosedTimestamp(t, repl1, writeTS) + _ = waitForClosedTimestamp(t, repl3, writeTS) + t.Logf("closed timestamp %s is above txn write timestamp %s", cts, writeTS) + + // Wait for the rangefeed checkpoint to reach writeTS.Prev(). This ensures the + // rangefeed's view of the closed timestamp has been updated, and is now only + // blocked by the intent. + waitTS := writeTS.Prev() + waitTS.Logical = 0 + rts = waitForCheckpoint(t, waitTS) + t.Logf("rangefeed caught up to txn write timestamp at %s", rts) + + // Block Raft application on repl3. + blockRangeOnN3.Store(int64(desc.RangeID)) + + // Commit the transaction, and check its commit timestamp. + require.NoError(t, txn.Commit(ctx)) + commitTS := txn.CommitTimestamp() + require.Equal(t, commitTS, writeTS) + t.Logf("txn committed at %s", writeTS) + + // Unblock transaction pushes. Since repl3 won't apply the intent resolution + // yet, the rangefeed will keep trying to push the transaction. Once the + // transaction record is GCed (which happens async), the rangefeed will see an + // ABORTED status. + // + // It may see the intermediate COMMITTED state too, but at the time of writing + // that does not matter, since the rangefeed needs to keep tracking the + // intent until it applies the resolution, and it will also see the ABORTED + // status before that happens. + blockPush.Store(nil) + close(unblockPushC) + + // Make sure repl3 does not emit a checkpoint beyond the write timestamp. Its + // closed timestamp has already advanced past it, but the unresolved intent + // should prevent the resolved timestamp from advancing, despite the false + // ABORTED state. We also make sure no value has been emitted. + waitC := time.After(3 * time.Second) + for done := false; !done; { + select { + case c := <-checkpointC: + require.NotNil(t, c) + require.False(t, commitTS.LessEq(c.ResolvedTS), + "repl %s emitted checkpoint %s beyond write timestamp %s", repl3, c.ResolvedTS, commitTS) + case v := <-valueC: + require.Fail(t, "repl3 emitted premature value %s", v) + case <-waitC: + done = true + } + } + t.Logf("checkpoint still below write timestamp") + + // Unblock application on repl3. Wait for the checkpoint to catch up to the + // commit timestamp, and the committed value to be emitted. + blockRangeOnN3.Store(0) + close(unblockRangeC) + + rts = waitForCheckpoint(t, writeTS) + t.Logf("checkpoint %s caught up to write timestamp %s", rts, writeTS) + + select { + case v := <-valueC: + require.Equal(t, v.Key, key) + require.Equal(t, v.Value.Timestamp, writeTS) + t.Logf("received value %s", v) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for event") + } + + // The rangefeed should still be running. + require.NoError(t, fErr.Get()) +} + +// channelSink is a rangefeed sink which posts events to a channel. +type channelSink struct { + ctx context.Context + ch chan<- *kvpb.RangeFeedEvent +} + +func newChannelSink(ctx context.Context, ch chan<- *kvpb.RangeFeedEvent) *channelSink { + return &channelSink{ctx: ctx, ch: ch} +} + +func (c *channelSink) Context() context.Context { + return c.ctx +} + +func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error { + select { + case c.ch <- e: + return nil + case <-c.ctx.Done(): + return c.ctx.Err() + } +} diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index d72732096ce6..841b41ba1edb 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/storage/enginepb", "//pkg/util/admission", "//pkg/util/bufalloc", + "//pkg/util/contextutil", "//pkg/util/envutil", "//pkg/util/future", "//pkg/util/hlc", diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index bfec74e901e8..8cdeff5c4b47 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -53,6 +53,19 @@ var ( "periodically push txn write timestamps to advance rangefeed resolved timestamps", true, ) + + // PushTxnsBarrierEnabled is an escape hatch to disable the txn push barrier + // command in case it causes unexpected problems. This can result in + // violations of the rangefeed checkpoint guarantee, emitting premature + // checkpoints before all writes below it have been emitted in rare cases. + // See: https://github.com/cockroachdb/cockroach/issues/104309 + PushTxnsBarrierEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.rangefeed.push_txns.barrier.enabled", + "flush and apply prior writes when a txn push returns an ambiguous abort "+ + "(disabling may emit premature checkpoints before writes in rare cases)", + true, + ) ) // newErrBufferCapacityExceeded creates an error that is returned to subscribers diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 8af6c86094c8..6fe28a43d0e5 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -833,7 +833,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { var tp testTxnPusher tp.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { // The txns are not in a sorted order. Enforce one. sort.Slice(txns, func(i, j int) bool { return bytes.Compare(txns[i].Key, txns[j].Key) < 0 @@ -847,34 +847,34 @@ func TestProcessorTxnPushAttempt(t *testing.T) { assert.Equal(t, txn2Meta, txns[1]) assert.Equal(t, txn3Meta, txns[2]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push does not succeed. Protos not at larger ts. - return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil + return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, false, nil case 2: assert.Equal(t, 3, len(txns)) assert.Equal(t, txn1MetaT2Pre, txns[0]) assert.Equal(t, txn2Meta, txns[1]) assert.Equal(t, txn3Meta, txns[2]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push succeeds. Return new protos. - return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil + return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, false, nil case 3: assert.Equal(t, 2, len(txns)) assert.Equal(t, txn2MetaT2Post, txns[0]) assert.Equal(t, txn3MetaT2Post, txns[1]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push succeeds. Return new protos. - return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil + return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, false, nil default: - return nil, nil + return nil, false, nil } }) tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { @@ -992,10 +992,10 @@ func TestProcessorTxnPushDisabled(t *testing.T) { var tp testTxnPusher tp.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { err := errors.Errorf("unexpected txn push for txns=%v ts=%s", txns, ts) t.Errorf("%v", err) - return nil, err + return nil, false, err }) p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &tp, st) @@ -1547,11 +1547,11 @@ func TestProcessorContextCancellation(t *testing.T) { var pusher testTxnPusher pusher.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { pushReadyC <- struct{}{} <-ctx.Done() close(pushDoneC) - return nil, ctx.Err() + return nil, false, ctx.Err() }) pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { return nil diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 2b0fad18a659..99b9e890ef7f 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -12,12 +12,14 @@ package rangefeed import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -223,10 +225,17 @@ func (l *LegacyIntentScanner) Close() { l.iter.Close() } // cleaning up the intents of transactions that are found to be committed. type TxnPusher interface { // PushTxns attempts to push the specified transactions to a new - // timestamp. It returns the resulting transaction protos. - PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + // timestamp. It returns the resulting transaction protos, and a + // bool indicating whether any txn aborts were ambiguous (see + // PushTxnResponse.AmbiguousAbort). + // + // NB: anyAmbiguousAbort may be false with nodes <24.1. + PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error) // ResolveIntents resolves the specified intents. ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error + // Barrier waits for all past and ongoing write commands in the range to have + // applied on the leaseholder and the local replica. + Barrier(ctx context.Context) error } // txnPushAttempt pushes all old transactions that have unresolved intents on @@ -276,7 +285,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { // This may cause transaction restarts, but span refreshing should // prevent a restart for any transaction that has not been written // over at a larger timestamp. - pushedTxns, err := a.p.TxnPusher.PushTxns(ctx, a.txns, a.ts) + pushedTxns, anyAmbiguousAbort, err := a.p.TxnPusher.PushTxns(ctx, a.txns, a.ts) if err != nil { return err } @@ -349,6 +358,49 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { } } + // It's possible that the ABORTED state is a false negative, where the + // transaction was in fact committed but the txn record has been removed after + // resolving all intents (see batcheval.SynthesizeTxnFromMeta and + // Replica.CanCreateTxnRecord). If this replica has not applied the intent + // resolution yet, we may prematurely emit an MVCCAbortTxnOp and advance + // the resolved ts before emitting the committed intents. This violates the + // rangefeed checkpoint guarantee, and will at the time of writing cause the + // changefeed to drop these events entirely. See: + // https://github.com/cockroachdb/cockroach/issues/104309 + // + // PushTxns will let us know if it found such an ambiguous abort. To guarantee + // that we've applied all resolved intents in this case, submit a Barrier + // command to the leaseholder and wait for it to apply on the local replica. + // + // By the time the local replica applies the barrier it will have enqueued the + // resolved intents in the rangefeed processor's queue. These updates may not + // yet have been applied to the resolved timestamp intent tracker, but that's + // ok -- our MVCCAbortTxnOp will be enqueued and processed after them. + // + // This incurs an additional Raft write, but so would PushTxns() if we hadn't + // hit the ambiguous abort case. This will also block until ongoing writes + // have completed and applied, but that's fine since we currently run on our + // own goroutine (as opposed to on a rangefeed scheduler goroutine). + // + // NB: We can't try to reduce the span of the barrier, because LockSpans may + // not have the full set of intents. + // + // NB: PushTxnResponse.AmbiguousAbort and BarrierResponse.LeaseAppliedIndex + // are not guaranteed to be populated prior to 24.1. In that case, we degrade + // to the old (buggy) behavior. + if anyAmbiguousAbort && PushTxnsBarrierEnabled.Get(&a.p.Settings.SV) { + // The barrier will error out if our context is cancelled (which happens on + // processor shutdown) or if the replica is destroyed. Regardless, use a 1 + // minute backstop to prevent getting wedged. + // + // TODO(erikgrinaker): consider removing this once we have some confidence + // that it won't get wedged. + err := contextutil.RunWithTimeout(ctx, "pushtxns barrier", time.Minute, a.p.TxnPusher.Barrier) + if err != nil { + return err + } + } + // Inform the processor of all logical ops. a.p.sendEvent(ctx, event{ops: ops}, 0) diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 94b3c95102db..3a60f27d5bd8 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -369,13 +369,13 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error) resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, -) ([]*roachpb.Transaction, error) { +) ([]*roachpb.Transaction, bool, error) { return tp.pushTxnsFn(ctx, txns, ts) } @@ -383,8 +383,12 @@ func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.L return tp.resolveIntentsFn(ctx, intents) } +func (tp *testTxnPusher) Barrier(ctx context.Context) error { + return nil +} + func (tp *testTxnPusher) mockPushTxns( - fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), + fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error), ) { tp.pushTxnsFn = fn } @@ -443,7 +447,7 @@ func TestTxnPushAttempt(t *testing.T) { var tp testTxnPusher tp.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1]) @@ -454,7 +458,7 @@ func TestTxnPushAttempt(t *testing.T) { // Return all four protos. The PENDING txn is pushed. txn1ProtoPushed := txn1Proto.Clone() txn1ProtoPushed.WriteTimestamp = ts - return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil + return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, false, nil }) tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { require.Len(t, intents, 7) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index b19f63b634f6..410ea9444aee 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -101,8 +101,9 @@ func (s *lockedRangefeedStream) Send(e *kvpb.RangeFeedEvent) error { // rangefeedTxnPusher is a shim around intentResolver that implements the // rangefeed.TxnPusher interface. type rangefeedTxnPusher struct { - ir *intentresolver.IntentResolver - r *Replica + ir *intentresolver.IntentResolver + r *Replica + span roachpb.RSpan } // PushTxns is part of the rangefeed.TxnPusher interface. It performs a @@ -110,7 +111,7 @@ type rangefeedTxnPusher struct { // transactions. func (tp *rangefeedTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, -) ([]*roachpb.Transaction, error) { +) ([]*roachpb.Transaction, bool, error) { pushTxnMap := make(map[uuid.UUID]*enginepb.TxnMeta, len(txns)) for i := range txns { txn := &txns[i] @@ -126,18 +127,18 @@ func (tp *rangefeedTxnPusher) PushTxns( }, } - pushedTxnMap, _, pErr := tp.ir.MaybePushTransactions( + pushedTxnMap, anyAmbiguousAbort, pErr := tp.ir.MaybePushTransactions( ctx, pushTxnMap, h, kvpb.PUSH_TIMESTAMP, false, /* skipIfInFlight */ ) if pErr != nil { - return nil, pErr.GoError() + return nil, false, pErr.GoError() } pushedTxns := make([]*roachpb.Transaction, 0, len(pushedTxnMap)) for _, txn := range pushedTxnMap { pushedTxns = append(pushedTxns, txn) } - return pushedTxns, nil + return pushedTxns, anyAmbiguousAbort, nil } // ResolveIntents is part of the rangefeed.TxnPusher interface. @@ -150,6 +151,40 @@ func (tp *rangefeedTxnPusher) ResolveIntents( ).GoError() } +// Barrier is part of the rangefeed.TxnPusher interface. +func (tp *rangefeedTxnPusher) Barrier(ctx context.Context) error { + // Execute a Barrier on the leaseholder, and obtain its LAI. Error out on any + // range changes (e.g. splits/merges) that we haven't applied yet. + lai, desc, err := tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey) + if err != nil { + if errors.HasType(err, &kvpb.RangeKeyMismatchError{}) { + return errors.Wrap(err, "range barrier failed, range split") + } + return errors.Wrap(err, "range barrier failed") + } + if lai == 0 { + // We may be talking to a binary which doesn't support + // BarrierRequest.WithLeaseAppliedIndex, so just degrade to the previous + // (buggy) behavior. + return nil + } + if desc.RangeID != tp.r.RangeID { + return errors.Errorf("range barrier failed, range ID changed: %d -> %s", tp.r.RangeID, desc) + } + if !desc.RSpan().Equal(tp.span) { + return errors.Errorf("range barrier failed, range span changed: %s -> %s", tp.span, desc) + } + + // Wait for the local replica to apply it. In the common case where we are the + // leaseholder, the Barrier call will already have waited for application, so + // this succeeds immediately. + if _, err = tp.r.WaitForLeaseAppliedIndex(ctx, lai); err != nil { + return errors.Wrap(err, "range barrier failed") + } + + return nil +} + // RangeFeed registers a rangefeed over the specified span. It sends updates to // the provided stream and returns with a future error when the rangefeed is // complete. The surrounding store's ConcurrentRequestLimiter is used to limit @@ -369,7 +404,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Create a new rangefeed. desc := r.Desc() - tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r} + tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r, span: desc.RSpan()} cfg := rangefeed.Config{ AmbientContext: r.AmbientContext, Clock: r.Clock(), From 1045f2d593bd7cb53453ea8d23ae2be6ddb8ac07 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 23 Jan 2024 19:55:56 +0000 Subject: [PATCH 7/7] rangefeed: assert intent commits above resolved timestamp Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 2 +- .../kvserver/rangefeed/resolved_timestamp.go | 40 +++++-- .../rangefeed/resolved_timestamp_test.go | 101 ++++++++++-------- 3 files changed, 86 insertions(+), 57 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 8cdeff5c4b47..b817df38c985 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -790,7 +790,7 @@ func (p *Processor) consumeLogicalOps( // Determine whether the operation caused the resolved timestamp to // move forward. If so, publish a RangeFeedCheckpoint notification. - if p.rts.ConsumeLogicalOp(op) { + if p.rts.ConsumeLogicalOp(ctx, op) { p.publishCheckpoint(ctx) } } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index f1f9e8d57a8f..c6793040e9b0 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -13,11 +13,13 @@ package rangefeed import ( "bytes" "container/heap" + "context" "fmt" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -128,32 +130,40 @@ func (rts *resolvedTimestamp) ForwardClosedTS(newClosedTS hlc.Timestamp) bool { // operation within its range of tracked keys. This allows the structure to // update its internal intent tracking to reflect the change. The method returns // whether this caused the resolved timestamp to move forward. -func (rts *resolvedTimestamp) ConsumeLogicalOp(op enginepb.MVCCLogicalOp) bool { - if rts.consumeLogicalOp(op) { +func (rts *resolvedTimestamp) ConsumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { + if rts.consumeLogicalOp(ctx, op) { return rts.recompute() } rts.assertNoChange() return false } -func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { +func (rts *resolvedTimestamp) consumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { switch t := op.GetValue().(type) { case *enginepb.MVCCWriteValueOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCDeleteRangeOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCWriteIntentOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnMinTimestamp, t.Timestamp) case *enginepb.MVCCUpdateIntentOp: return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp) case *enginepb.MVCCCommitIntentOp: + // This assertion can be violated in mixed-version clusters prior + // to 24.1, so make it non-fatal for now. See: + // https://github.com/cockroachdb/cockroach/issues/104309 + rts.assertOpAboveRTS(ctx, op, t.Timestamp, false /* fatal */) return rts.intentQ.DecrRef(t.TxnID, t.Timestamp) case *enginepb.MVCCAbortIntentOp: @@ -264,10 +274,22 @@ func (rts *resolvedTimestamp) assertNoChange() { // assertOpAboveTimestamp asserts that this operation is at a larger timestamp // than the current resolved timestamp. A violation of this assertion would // indicate a failure of the closed timestamp mechanism. -func (rts *resolvedTimestamp) assertOpAboveRTS(op enginepb.MVCCLogicalOp, opTS hlc.Timestamp) { +func (rts *resolvedTimestamp) assertOpAboveRTS( + ctx context.Context, op enginepb.MVCCLogicalOp, opTS hlc.Timestamp, fatal bool, +) { if opTS.LessEq(rts.resolvedTS) { - panic(fmt.Sprintf("resolved timestamp %s equal to or above timestamp of operation %v", - rts.resolvedTS, op)) + // NB: MVCCLogicalOp.String() is only implemented for pointer receiver. + // We shadow the variable to avoid it escaping to the heap. + op := op + err := errors.AssertionFailedf( + "resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, &op) + if fatal { + // TODO(erikgrinaker): use log.Fatalf. Panic for now, since tests expect + // it and to minimize code churn for backports. + panic(err) + } else { + log.Errorf(ctx, "%v", err) + } } } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go index ff5f9795131d..7496c7475bd9 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go @@ -11,6 +11,7 @@ package rangefeed import ( + "context" "testing" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -176,6 +177,7 @@ func TestUnresolvedIntentQueue(t *testing.T) { func TestResolvedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -184,13 +186,13 @@ func TestResolvedTimestamp(t *testing.T) { // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -201,16 +203,16 @@ func TestResolvedTimestamp(t *testing.T) { // Write intent at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) + rts.ConsumeLogicalOp(ctx, writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) }) // Write value at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 4})) + rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 4})) }) // Write value at later timestamp. No effect on resolved timestamp. - fwd = rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 6})) + fwd = rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 6})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -221,12 +223,12 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn2. No effect on the resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn1. Resolved timestamp moves forward. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -236,13 +238,13 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at same timestamp. No change. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at later timestamp. Resolved // timestamp moves forward. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 18}, rts.Get()) @@ -253,47 +255,47 @@ func TestResolvedTimestamp(t *testing.T) { // First transaction aborted. Resolved timestamp moves to next earliest // intent. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction at higher timestamp. No effect. txn3 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction aborted. No effect. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction at higher timestamp. No effect. txn4 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction committed. No effect. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Second transaction observes one intent being resolved at timestamp // above closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 30}, rts.Get()) @@ -304,22 +306,22 @@ func TestResolvedTimestamp(t *testing.T) { // Second transaction observes another intent being resolved at timestamp // below closed time. Still one intent left. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 34}, rts.Get()) // Second transaction observes final intent being resolved at timestamp // below closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) // Fifth transaction at higher timestamp. No effect. txn5 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) @@ -330,7 +332,7 @@ func TestResolvedTimestamp(t *testing.T) { // Fifth transaction bumps epoch and re-writes one of its intents. Resolved // timestamp moves to the new transaction timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 46}, rts.Get()) @@ -338,48 +340,49 @@ func TestResolvedTimestamp(t *testing.T) { // its final epoch. Resolved timestamp moves forward after observing the // first intent committing at a higher timestamp and moves to the closed // timestamp after observing the second intent aborting. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 48}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn5)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn5)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 50}, rts.Get()) } func TestResolvedTimestampNoClosedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() // Add a value. No closed timestamp so no resolved timestamp. - fwd := rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 1})) + fwd := rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Update intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort the first intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Commit the second intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) } @@ -418,6 +421,8 @@ func TestResolvedTimestampNoIntents(t *testing.T) { func TestResolvedTimestampInit(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() + t.Run("CT Before Init", func(t *testing.T) { rts := makeResolvedTimestamp() @@ -436,7 +441,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -450,7 +455,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -469,12 +474,12 @@ func TestResolvedTimestampInit(t *testing.T) { // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort that intent's transaction. Not initialized so no-op. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -482,7 +487,7 @@ func TestResolvedTimestampInit(t *testing.T) { // out with the out-of-order intent abort operation. If this abort hadn't // allowed the unresolvedTxn's ref count to drop below 0, this would // have created a reference that would never be cleaned up. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -501,7 +506,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -513,6 +518,7 @@ func TestResolvedTimestampInit(t *testing.T) { func TestResolvedTimestampTxnAborted(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -523,7 +529,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -533,23 +539,23 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Abort txn1 after a periodic txn push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Update one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Abort one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Write another intent as txn1. Should add txn1 back into queue. // This will eventually require another txn push to evict. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -560,7 +566,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 19}, rts.Get()) // Abort txn1 again after another periodic push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 25}, rts.Get()) } @@ -569,6 +575,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { func TestClosedTimestampLogicalPart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -579,7 +586,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) @@ -591,7 +598,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) // Abort txn1. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) @@ -600,7 +607,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // and an intent is in the next wall tick; this used to cause an issue because // of the rounding logic. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) }