From 084ace8cb596559b02c2aa1ae1787ac1c7ea8f57 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 22 Jan 2025 15:46:19 -0500 Subject: [PATCH 01/15] Unify vrepl lag calculation Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 3 +- go/vt/vtctl/workflow/workflows.go | 73 +++++++++++++++++++------------ 2 files changed, 46 insertions(+), 30 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 0c8da8d9363..2d52d951189 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3075,7 +3075,8 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedR return cannotSwitchFrozen, nil } // If no new events have been replicated after the copy phase then it will be 0. - if vreplLag := time.Now().Unix() - st.TimeUpdated.Seconds; vreplLag > maxAllowedReplLagSecs { + vreplLag := int64(getVReplicationTrxLag(st.TransactionTimestamp, st.TimeUpdated, binlogdatapb.VReplicationWorkflowState(binlogdatapb.VReplicationWorkflowState_value[st.State]))) + if vreplLag > maxAllowedReplLagSecs { return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil } switch st.State { diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go index da0ee5dfec7..f5dcc29ed1d 100644 --- a/go/vt/vtctl/workflow/workflows.go +++ b/go/vt/vtctl/workflow/workflows.go @@ -446,35 +446,11 @@ func (wf *workflowFetcher) scanWorkflow( workflow.WorkflowSubType = res.WorkflowSubType.String() workflow.DeferSecondaryKeys = res.DeferSecondaryKeys - // MaxVReplicationTransactionLag estimates the actual statement processing lag - // between the source and the target. If we are still processing source events it - // is the difference b/w current time and the timestamp of the last event. If - // heartbeats are more recent than the last event, then the lag is the time since - // the last heartbeat as there can be an actual event immediately after the - // heartbeat, but which has not yet been processed on the target. - // We don't allow switching during the copy phase, so in that case we just return - // a large lag. All timestamps are in seconds since epoch. - if rstream.TransactionTimestamp == nil { - rstream.TransactionTimestamp = &vttimepb.Time{} - } - lastTransactionTime := rstream.TransactionTimestamp.Seconds - if rstream.TimeHeartbeat == nil { - rstream.TimeHeartbeat = &vttimepb.Time{} - } - lastHeartbeatTime := rstream.TimeHeartbeat.Seconds - if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() { - meta.maxVReplicationTransactionLag = math.MaxInt64 - } else { - if lastTransactionTime == 0 /* no new events after copy */ || - lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ { - - lastTransactionTime = lastHeartbeatTime - } - now := time.Now().Unix() /* seconds since epoch */ - transactionReplicationLag := float64(now - lastTransactionTime) - if transactionReplicationLag > meta.maxVReplicationTransactionLag { - meta.maxVReplicationTransactionLag = transactionReplicationLag - } + // MaxVReplicationTransactionLag estimates the max statement processing lag + // between the source and the target across all of the workflow streams. + transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.State) + if transactionReplicationLag > meta.maxVReplicationTransactionLag { + meta.maxVReplicationTransactionLag = transactionReplicationLag } } @@ -670,3 +646,42 @@ func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerd } return rstream.State.String() } + +type GenericStream interface { + *tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream | *vtctldatapb.Workflow_Stream +} + +type Stream[T GenericStream] struct { + TransactionTimestamp *vttimepb.Time + TimeUpdated *vttimepb.Time + State binlogdatapb.VReplicationWorkflowState +} + +// getVReplicationTrxLag estimates the actual statement processing lag between the +// source and the target. If we are still processing source events it is the +// difference between current time and the timestamp of the last event. If +// heartbeats are more recent than the last event, then the lag is the time since +// the last heartbeat as there can be an actual event immediately after the +// heartbeat, but which has not yet been processed on the target. We don't allow +// switching during the copy phase, so in that case we just return a large lag. +// All timestamps are in seconds since epoch. +func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 { + if trxTs == nil { + trxTs = &vttimepb.Time{} + } + lastTransactionTime := trxTs.Seconds + if updatedTs == nil { + updatedTs = &vttimepb.Time{} + } + lastUpdateTime := updatedTs.Seconds + if state == binlogdatapb.VReplicationWorkflowState_Copying { + return math.MaxInt64 + } + if lastTransactionTime == 0 /* no new events after copy */ || + lastUpdateTime > lastTransactionTime /* no recent transactions, so all caught up */ { + + lastTransactionTime = lastUpdateTime + } + now := time.Now().Unix() /* seconds since epoch */ + return float64(now - lastTransactionTime) +} From fbf4334cf45d779079a274f58710b44abf70a637 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 22 Jan 2025 22:52:07 +0000 Subject: [PATCH 02/15] Maintain the locks in the new cancel context Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/traffic_switcher.go | 5 ++++- go/vt/vtctl/workflow/workflows.go | 10 ---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 4fc34992b0f..1732f3425b7 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1148,8 +1148,11 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat // We create a new context while canceling the migration, so that we are independent of the original // context being cancelled prior to or during the cancel operation. + // Create a child context that cannot be canceled by the parent, so that we maintain the locks. + cctx := context.WithoutCancel(ctx) + // Now create a child context from that which has a timeout. cmTimeout := 60 * time.Second - cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout) + cmCtx, cmCancel := context.WithTimeout(cctx, cmTimeout) defer cmCancel() if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go index f5dcc29ed1d..99739d3556b 100644 --- a/go/vt/vtctl/workflow/workflows.go +++ b/go/vt/vtctl/workflow/workflows.go @@ -647,16 +647,6 @@ func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerd return rstream.State.String() } -type GenericStream interface { - *tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream | *vtctldatapb.Workflow_Stream -} - -type Stream[T GenericStream] struct { - TransactionTimestamp *vttimepb.Time - TimeUpdated *vttimepb.Time - State binlogdatapb.VReplicationWorkflowState -} - // getVReplicationTrxLag estimates the actual statement processing lag between the // source and the target. If we are still processing source events it is the // difference between current time and the timestamp of the last event. If From 4e8e1099acfd9532033e49fd659b07f50979abc3 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 22 Jan 2025 23:46:01 +0000 Subject: [PATCH 03/15] Add e2e test Signed-off-by: Matt Lord --- .../vreplication/vreplication_test.go | 99 +++++++++++++++++++ go/vt/vtctl/workflow/server.go | 8 +- go/vt/vtctl/workflow/workflows.go | 8 +- 3 files changed, 106 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 617e5f79f6a..55d0ec5c07f 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -826,6 +826,105 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl shardNames = append(shardNames, shardName) } testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow) + + // Confirm that switching writes works as expected in the face of + // vreplication lag (canSwitch() precheck) and when cancelling the + // switch due to replication failing to catch up in time. + t.Run("validate switch writes error handling", func(t *testing.T) { + productConn, err := productTab.TabletConn("product", true) + require.NoError(t, err) + defer productConn.Close() + customerConn1, err := customerTab1.TabletConn("customer", true) + require.NoError(t, err) + customerConn2, err := customerTab2.TabletConn("customer", true) + require.NoError(t, err) + startingTestRowID := 10000000 + numTestRows := 100 + addTestRows := func() { + for i := 0; i < numTestRows; i++ { + execQuery(t, productConn, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')", startingTestRowID+i)) + } + } + deleteTestRows := func() { + execQuery(t, productConn, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID)) + } + addIndex := func() { + for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { + execQuery(t, customerConn, "set session sql_mode=''") + execQuery(t, customerConn, "alter table customer add unique index name_idx (name)") + } + } + dropIndex := func() { + for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { + execQuery(t, customerConn, "alter table customer drop index name_idx") + } + } + lockTargetTable := func() { + for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { + execQuery(t, customerConn, "lock table customer read") + } + } + unlockTargetTable := func() { + for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { + execQuery(t, customerConn, "unlock tables") + } + } + cleanupTestData := func() { + dropIndex() + deleteTestRows() + } + restartWorkflow := func() { + // We have to restart the workflow again as the duplicate key error + // is a permanent/terminal one. + err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow) + require.NoError(t, err, "failed to start workflow: %v", err) + } + waitForTargetToCatchup := func() { + restartWorkflow() + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + waitForNoWorkflowLag(t, vc, targetKs, workflow) + } + + // First let's test that the pre-checks work as expected. + // We ALTER the table on the customer (target) shard to + // add a unique index on the name field. + addIndex() + // Then we insert some test rows across both shards in the product + // (source) keyspace. + addTestRows() + // Now the workflow should go into the error state and the lag should + // start to climb. So we sleep for twice the max lag duration that we + // will set for the SwitchTraffic call. + lagDuration := 3 * time.Second + time.Sleep(lagDuration * 3) + out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, + "SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String()) + // It should fail in the canSwitch() precheck. + require.Error(t, err) + require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*", + workflow, lagDuration.String()), out) + cleanupTestData() + waitForTargetToCatchup() + + // Now let's test that the cancel works by setting the command timeout + // to half the duration. Lock the customer table on the target tablets + // so that we cannot apply the INSERTs and catch up. + lockTargetTable() + addTestRows() + timeout := lagDuration * 2 + // Use the default max-replication-lag-allowed value of 30s. + out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, + "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String()) + // It should fail due to the command context timeout and we should + // successfully cancel. + require.Error(t, err) + require.Contains(t, out, "failed to sync up replication between the source and target") + unlockTargetTable() + deleteTestRows() + waitForTargetToCatchup() + }) + + // Now let's confirm that it now works as expected. switchWrites(t, workflowType, ksWorkflow, false) checkThatVDiffFails(t, targetKs, workflow) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 2d52d951189..732e798bc04 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3069,16 +3069,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedR if err != nil { return "", err } + if wf.MaxVReplicationLag > maxAllowedReplLagSecs { + return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationLag, maxAllowedReplLagSecs), nil + } for _, stream := range wf.ShardStreams { for _, st := range stream.GetStreams() { if st.Message == Frozen { return cannotSwitchFrozen, nil } - // If no new events have been replicated after the copy phase then it will be 0. - vreplLag := int64(getVReplicationTrxLag(st.TransactionTimestamp, st.TimeUpdated, binlogdatapb.VReplicationWorkflowState(binlogdatapb.VReplicationWorkflowState_value[st.State]))) - if vreplLag > maxAllowedReplLagSecs { - return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil - } switch st.State { case binlogdatapb.VReplicationWorkflowState_Copying.String(): return cannotSwitchCopyIncomplete, nil diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go index 99739d3556b..7e800bb28c7 100644 --- a/go/vt/vtctl/workflow/workflows.go +++ b/go/vt/vtctl/workflow/workflows.go @@ -667,11 +667,11 @@ func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.V if state == binlogdatapb.VReplicationWorkflowState_Copying { return math.MaxInt64 } - if lastTransactionTime == 0 /* no new events after copy */ || - lastUpdateTime > lastTransactionTime /* no recent transactions, so all caught up */ { - + if state == binlogdatapb.VReplicationWorkflowState_Running && // We could be in the ERROR state + (lastTransactionTime == 0 /* No new events after copy */ || + lastUpdateTime > lastTransactionTime /* No recent transactions, so all caught up */) { lastTransactionTime = lastUpdateTime } - now := time.Now().Unix() /* seconds since epoch */ + now := time.Now().Unix() // Seconds since epoch return float64(now - lastTransactionTime) } From 9fb48f794538785cd4457631bfc010c7f214c318 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 23 Jan 2025 04:23:41 +0000 Subject: [PATCH 04/15] Use trx lag Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 732e798bc04..ad2009da2a8 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3069,8 +3069,8 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedR if err != nil { return "", err } - if wf.MaxVReplicationLag > maxAllowedReplLagSecs { - return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationLag, maxAllowedReplLagSecs), nil + if wf.MaxVReplicationTransactionLag > maxAllowedReplLagSecs { + return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationTransactionLag, maxAllowedReplLagSecs), nil } for _, stream := range wf.ShardStreams { for _, st := range stream.GetStreams() { From 433d7ffb752b1199ea3ae759d5d50745e836c1ef Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 23 Jan 2025 04:31:53 +0000 Subject: [PATCH 05/15] Improve comments Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/traffic_switcher.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 1732f3425b7..4333586a880 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1147,10 +1147,11 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat } // We create a new context while canceling the migration, so that we are independent of the original - // context being cancelled prior to or during the cancel operation. - // Create a child context that cannot be canceled by the parent, so that we maintain the locks. + // context being canceled prior to or during the cancel operation itself. + // First we create a copy of the parent context, so that we maintain the locks, but which cannot be + // canceled by the parent context. cctx := context.WithoutCancel(ctx) - // Now create a child context from that which has a timeout. + // Now we create a child context from that which has a timeout. cmTimeout := 60 * time.Second cmCtx, cmCancel := context.WithTimeout(cctx, cmTimeout) defer cmCancel() From a7f400c970d245d8504b87539244336d45fe2c9d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 23 Jan 2025 04:33:42 +0000 Subject: [PATCH 06/15] Nitty var name change Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/traffic_switcher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 4333586a880..8b035d9c6c5 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1150,10 +1150,10 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat // context being canceled prior to or during the cancel operation itself. // First we create a copy of the parent context, so that we maintain the locks, but which cannot be // canceled by the parent context. - cctx := context.WithoutCancel(ctx) + wcCtx := context.WithoutCancel(ctx) // Now we create a child context from that which has a timeout. cmTimeout := 60 * time.Second - cmCtx, cmCancel := context.WithTimeout(cctx, cmTimeout) + cmCtx, cmCancel := context.WithTimeout(wcCtx, cmTimeout) defer cmCancel() if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { From 3964c7ce92814b847f9de5df79c50c45c45e6b6d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 23 Jan 2025 04:38:27 +0000 Subject: [PATCH 07/15] More nits Signed-off-by: Matt Lord --- .../endtoend/vreplication/vreplication_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 55d0ec5c07f..1f8f3ff6c9d 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -842,7 +842,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl numTestRows := 100 addTestRows := func() { for i := 0; i < numTestRows; i++ { - execQuery(t, productConn, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')", startingTestRowID+i)) + execQuery(t, productConn, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')", + startingTestRowID+i)) } } deleteTestRows := func() { @@ -885,9 +886,9 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl waitForNoWorkflowLag(t, vc, targetKs, workflow) } - // First let's test that the pre-checks work as expected. - // We ALTER the table on the customer (target) shard to - // add a unique index on the name field. + // First let's test that the pre-checks work as expected. We ALTER + // the table on the customer (target) shard to add a unique index on + // the name field. addIndex() // Then we insert some test rows across both shards in the product // (source) keyspace. @@ -907,11 +908,12 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl waitForTargetToCatchup() // Now let's test that the cancel works by setting the command timeout - // to half the duration. Lock the customer table on the target tablets - // so that we cannot apply the INSERTs and catch up. + // to a fraction (6s) of the default max repl lag duration (30s). First + // we lock the customer table on the target tablets so that we cannot + // apply the INSERTs and catch up. lockTargetTable() addTestRows() - timeout := lagDuration * 2 + timeout := lagDuration * 2 // 6s // Use the default max-replication-lag-allowed value of 30s. out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String()) From 49c11052d9da8c78e43821b8259b98e500aaa479 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 23 Jan 2025 11:21:17 -0500 Subject: [PATCH 08/15] Improve error handling Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 38 ++++++++++++++++------ go/vt/vtctl/workflow/switcher.go | 4 +-- go/vt/vtctl/workflow/switcher_dry_run.go | 3 +- go/vt/vtctl/workflow/switcher_interface.go | 2 +- go/vt/vtctl/workflow/traffic_switcher.go | 11 ++++++- 5 files changed, 43 insertions(+), 15 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index ad2009da2a8..42f6a114b13 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2915,8 +2915,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("failed to migrate the workflow streams", err) } if cancel { - sw.cancelMigration(ctx, sm) - return 0, sw.logs(), nil + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } + return 0, sw.logs(), err } // We stop writes on the source before stopping the source streams so that the catchup time @@ -2928,7 +2930,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // we actually stop them. ts.Logger().Infof("Stopping source writes") if err := sw.stopSourceWrites(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) } @@ -2946,7 +2950,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource) } } - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr)) + } return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) } @@ -2956,7 +2962,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // the tablet's deny list check and the first mysqld side table lock. for cnt := 1; cnt <= lockTablesCycles; cnt++ { if err := ts.executeLockTablesOnSource(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr)) + } return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err) } // No need to UNLOCK the tables as the connection was closed once the locks were acquired @@ -2977,7 +2985,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } ts.Logger().Infof("Waiting for streams to catchup") if err := sw.waitForCatchup(ctx, waitTimeout); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to sync up replication between the source and target", err) } @@ -2986,7 +2996,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to migrate the workflow streams", err) } @@ -2995,7 +3007,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } ts.Logger().Infof("Resetting sequences") if err := sw.resetSequences(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to reset the sequences", err) } @@ -3004,7 +3018,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to create the reverse vreplication streams", err) } @@ -3019,7 +3035,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit initSeqCtx, cancel := context.WithTimeout(ctx, waitTimeout/2) defer cancel() if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err) } } diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go index 789822b5be9..03d1a413e98 100644 --- a/go/vt/vtctl/workflow/switcher.go +++ b/go/vt/vtctl/workflow/switcher.go @@ -124,8 +124,8 @@ func (r *switcher) stopStreams(ctx context.Context, sm *StreamMigrator) ([]strin return sm.StopStreams(ctx) } -func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { - r.ts.cancelMigration(ctx, sm) +func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error { + return r.ts.cancelMigration(ctx, sm) } func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) { diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index 29c40f85a69..c7f66b93e14 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -301,8 +301,9 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) ( return nil, nil } -func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) { +func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) error { dr.drLog.Log("Cancel migration as requested") + return nil } func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string, _ ...topo.LockOption) (context.Context, func(*error), error) { diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index 560b7a695fd..e780add1a2c 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -27,7 +27,7 @@ import ( type iswitcher interface { lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) - cancelMigration(ctx context.Context, sm *StreamMigrator) + cancelMigration(ctx context.Context, sm *StreamMigrator) error stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) stopSourceWrites(ctx context.Context) error waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 8b035d9c6c5..2a5e982270e 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1137,8 +1137,9 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error { // cancelMigration attempts to revert all changes made during the migration so that we can get back to the // state when traffic switching (or reversing) was initiated. -func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { +func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error { var err error + cancelErrs := &concurrency.AllErrorRecorder{} if ctx.Err() != nil { // Even though we create a new context later on we still record any context error: @@ -1162,6 +1163,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { + cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err) } @@ -1174,13 +1176,20 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat return err }) if err != nil { + cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } err = ts.deleteReverseVReplication(cmCtx) if err != nil { + cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err) } + + if cancelErrs.HasErrors() { + return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary") + } + return nil } func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error { From a239dc6b11f736bba02193c0452904ae7904b327 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 23 Jan 2025 16:42:29 +0000 Subject: [PATCH 09/15] Check that we do not hvae cancel migration failed in errors Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vreplication_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 1f8f3ff6c9d..32ef7a95374 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -904,6 +904,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl require.Error(t, err) require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*", workflow, lagDuration.String()), out) + require.NotContains(t, out, "cancel migration failed") cleanupTestData() waitForTargetToCatchup() @@ -921,6 +922,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl // successfully cancel. require.Error(t, err) require.Contains(t, out, "failed to sync up replication between the source and target") + require.NotContains(t, out, "cancel migration failed") unlockTargetTable() deleteTestRows() waitForTargetToCatchup() From 45adff7e0c0a47533b56f58b7fa8c8e20b12824a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 23 Jan 2025 13:13:11 -0500 Subject: [PATCH 10/15] Correct vreplication trx lag calculation Signed-off-by: Matt Lord --- .../vreplication/vreplication_test.go | 4 ++++ go/vt/vtctl/workflow/workflows.go | 23 +++++++++++++------ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 32ef7a95374..64075cfbf93 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -905,6 +905,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*", workflow, lagDuration.String()), out) require.NotContains(t, out, "cancel migration failed") + // Confirm that queries still work fine. + execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") cleanupTestData() waitForTargetToCatchup() @@ -923,6 +925,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl require.Error(t, err) require.Contains(t, out, "failed to sync up replication between the source and target") require.NotContains(t, out, "cancel migration failed") + // Confirm that queries still work fine. + execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") unlockTargetTable() deleteTestRows() waitForTargetToCatchup() diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go index 7e800bb28c7..2ee66b8756b 100644 --- a/go/vt/vtctl/workflow/workflows.go +++ b/go/vt/vtctl/workflow/workflows.go @@ -448,7 +448,7 @@ func (wf *workflowFetcher) scanWorkflow( // MaxVReplicationTransactionLag estimates the max statement processing lag // between the source and the target across all of the workflow streams. - transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.State) + transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State) if transactionReplicationLag > meta.maxVReplicationTransactionLag { meta.maxVReplicationTransactionLag = transactionReplicationLag } @@ -655,7 +655,7 @@ func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerd // heartbeat, but which has not yet been processed on the target. We don't allow // switching during the copy phase, so in that case we just return a large lag. // All timestamps are in seconds since epoch. -func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 { +func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 { if trxTs == nil { trxTs = &vttimepb.Time{} } @@ -663,14 +663,23 @@ func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.V if updatedTs == nil { updatedTs = &vttimepb.Time{} } - lastUpdateTime := updatedTs.Seconds + lastUpdatedTime := updatedTs.Seconds + if heartbeatTs == nil { + heartbeatTs = &vttimepb.Time{} + } + lastHeartbeatTime := heartbeatTs.Seconds if state == binlogdatapb.VReplicationWorkflowState_Copying { return math.MaxInt64 } - if state == binlogdatapb.VReplicationWorkflowState_Running && // We could be in the ERROR state - (lastTransactionTime == 0 /* No new events after copy */ || - lastUpdateTime > lastTransactionTime /* No recent transactions, so all caught up */) { - lastTransactionTime = lastUpdateTime + // We do NOT update the heartbeat timestamp when we are regularly updating the + // position as we replication transactions (GTIDs). + // When we DO record a heartbeat, we set the updated time to the same value. + // When recording that we are throttled, we update the updated time but NOT + // the heartbeat time. + if lastTransactionTime == 0 /* No replicated events after copy */ || + (lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */ + lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) { + lastTransactionTime = lastUpdatedTime } now := time.Now().Unix() // Seconds since epoch return float64(now - lastTransactionTime) From 7019af3edfe64f6d256a54781cddc0c9aa2d0e5a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 23 Jan 2025 16:46:30 -0500 Subject: [PATCH 11/15] Fix another bug in cancel: reverting the denied tables list Signed-off-by: Matt Lord --- .../vreplication/vreplication_test.go | 17 ++++++++-- go/vt/vtctl/workflow/server.go | 4 +-- go/vt/vtctl/workflow/stream_migrator.go | 14 +++++++-- go/vt/vtctl/workflow/traffic_switcher.go | 31 +++++++++++++------ 4 files changed, 48 insertions(+), 18 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 64075cfbf93..45695c54ec8 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -918,8 +918,20 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl addTestRows() timeout := lagDuration * 2 // 6s // Use the default max-replication-lag-allowed value of 30s. - out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, - "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String()) + // We run the command in a goroutine so that we can unblock things + // after the timeout is reached -- as the vplayer query is blocking + // on the table lock. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, + "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String()) + }() + time.Sleep(timeout) + // Now we can unblock things and let it continue. + unlockTargetTable() + wg.Wait() // It should fail due to the command context timeout and we should // successfully cancel. require.Error(t, err) @@ -927,7 +939,6 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl require.NotContains(t, out, "cancel migration failed") // Confirm that queries still work fine. execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") - unlockTargetTable() deleteTestRows() waitForTargetToCatchup() }) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 42f6a114b13..1e86761c71b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2951,7 +2951,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } if cerr := sw.cancelMigration(ctx, sm); cerr != nil { - err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr)) + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) } return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) } @@ -2963,7 +2963,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit for cnt := 1; cnt <= lockTablesCycles; cnt++ { if err := ts.executeLockTablesOnSource(ctx); err != nil { if cerr := sw.cancelMigration(ctx, sm); cerr != nil { - err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr)) + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) } return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err) } diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go index a700a1338dd..47d5a48c4f9 100644 --- a/go/vt/vtctl/workflow/stream_migrator.go +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -203,12 +203,15 @@ func (sm *StreamMigrator) Templates() []*VReplicationStream { } // CancelStreamMigrations cancels the stream migrations. -func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) { +func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error { if sm.streams == nil { - return + return nil } + errs := &concurrency.AllErrorRecorder{} - _ = sm.deleteTargetStreams(ctx) + if err := sm.deleteTargetStreams(ctx); err != nil { + errs.RecordError(err) + } // Restart the source streams, but leave the Reshard workflow's reverse // variant stopped. @@ -221,8 +224,13 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) { return err }) if err != nil { + errs.RecordError(err) sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err) } + if errs.HasErrors() { + return errs.AggrError(vterrors.Aggregate) + } + return nil } // MigrateStreams migrates N streams diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 2a5e982270e..c8bf6c8eb68 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -754,7 +754,7 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error { if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - return ts.switchDeniedTables(ctx) + return ts.switchDeniedTables(ctx, false) } return ts.changeShardsAccess(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), allowWrites) } @@ -1062,7 +1062,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { var err error if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - err = ts.switchDeniedTables(ctx) + err = ts.switchDeniedTables(ctx, false) } else { err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites) } @@ -1075,16 +1075,25 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { // switchDeniedTables switches the denied tables rules for the traffic switch. // They are removed on the source side and added on the target side. -func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error { +// If backward is true, then we swap this logic, removing on the target side +// and adding on the source side. You would want to do that e.g. when canceling +// a failed (and currently partial) traffic switch as the source and target +// have already been switched in the trafficSwitcher. +func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool) error { if ts.MigrationType() != binlogdatapb.MigrationType_TABLES { return nil } + rmsource, rmtarget := false, true + if backward { + rmsource, rmtarget = true, false + } + egrp, ectx := errgroup.WithContext(ctx) egrp.Go(func() error { return ts.ForAllSources(func(source *MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables()) + return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, rmsource, ts.Tables()) }); err != nil { return err } @@ -1107,7 +1116,7 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error { egrp.Go(func() error { return ts.ForAllTargets(func(target *MigrationTarget) error { if _, err := ts.TopoServer().UpdateShardFields(ectx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, rmtarget, ts.Tables()) }); err != nil { return err } @@ -1153,12 +1162,12 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat // canceled by the parent context. wcCtx := context.WithoutCancel(ctx) // Now we create a child context from that which has a timeout. - cmTimeout := 60 * time.Second + cmTimeout := 2 * time.Minute cmCtx, cmCancel := context.WithTimeout(wcCtx, cmTimeout) defer cmCancel() if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - err = ts.switchDeniedTables(cmCtx) + err = ts.switchDeniedTables(cmCtx, true /* revert */) } else { err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } @@ -1167,7 +1176,10 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err) } - sm.CancelStreamMigrations(cmCtx) + if err := sm.CancelStreamMigrations(cmCtx); err != nil { + cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err)) + ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err) + } err = ts.ForAllTargets(func(target *MigrationTarget) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", @@ -1180,8 +1192,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } - err = ts.deleteReverseVReplication(cmCtx) - if err != nil { + if err := ts.deleteReverseVReplication(cmCtx); err != nil { cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err) } From bb1207ac64be886e8c7e6ebb148efdbd8f9cdf43 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 23 Jan 2025 16:59:39 -0500 Subject: [PATCH 12/15] Changes from self review Signed-off-by: Matt Lord --- .../endtoend/vreplication/vreplication_test.go | 16 ++++++++-------- go/vt/vtctl/workflow/stream_migrator.go | 4 ++-- go/vt/vtctl/workflow/traffic_switcher.go | 11 ++++++----- go/vt/vtctl/workflow/workflows.go | 2 +- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 45695c54ec8..9b9acfec810 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -828,7 +828,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow) // Confirm that switching writes works as expected in the face of - // vreplication lag (canSwitch() precheck) and when cancelling the + // vreplication lag (canSwitch() precheck) and when canceling the // switch due to replication failing to catch up in time. t.Run("validate switch writes error handling", func(t *testing.T) { productConn, err := productTab.TabletConn("product", true) @@ -875,23 +875,20 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl deleteTestRows() } restartWorkflow := func() { - // We have to restart the workflow again as the duplicate key error - // is a permanent/terminal one. err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow) require.NoError(t, err, "failed to start workflow: %v", err) } waitForTargetToCatchup := func() { - restartWorkflow() waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) waitForNoWorkflowLag(t, vc, targetKs, workflow) } // First let's test that the pre-checks work as expected. We ALTER - // the table on the customer (target) shard to add a unique index on + // the table on the customer (target) shards to add a unique index on // the name field. addIndex() - // Then we insert some test rows across both shards in the product - // (source) keyspace. + // Then we replicate some test rows across both customer shards by + // inserting them in the product (source) keyspace. addTestRows() // Now the workflow should go into the error state and the lag should // start to climb. So we sleep for twice the max lag duration that we @@ -908,6 +905,9 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl // Confirm that queries still work fine. execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") cleanupTestData() + // We have to restart the workflow again as the duplicate key error + // is a permanent/terminal one. + restartWorkflow() waitForTargetToCatchup() // Now let's test that the cancel works by setting the command timeout @@ -920,7 +920,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl // Use the default max-replication-lag-allowed value of 30s. // We run the command in a goroutine so that we can unblock things // after the timeout is reached -- as the vplayer query is blocking - // on the table lock. + // on the table lock in the MySQL layer. wg := sync.WaitGroup{} wg.Add(1) go func() { diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go index 47d5a48c4f9..7b4fe69074f 100644 --- a/go/vt/vtctl/workflow/stream_migrator.go +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -210,7 +210,7 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error { errs := &concurrency.AllErrorRecorder{} if err := sm.deleteTargetStreams(ctx); err != nil { - errs.RecordError(err) + errs.RecordError(fmt.Errorf("could not delete target streams: %v", err)) } // Restart the source streams, but leave the Reshard workflow's reverse @@ -224,7 +224,7 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error { return err }) if err != nil { - errs.RecordError(err) + errs.RecordError(fmt.Errorf("could not restart source streams: %v", err)) sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err) } if errs.HasErrors() { diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index c8bf6c8eb68..1d4ef438f5e 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1074,11 +1074,12 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { } // switchDeniedTables switches the denied tables rules for the traffic switch. -// They are removed on the source side and added on the target side. -// If backward is true, then we swap this logic, removing on the target side -// and adding on the source side. You would want to do that e.g. when canceling -// a failed (and currently partial) traffic switch as the source and target -// have already been switched in the trafficSwitcher. +// They are added on the source side and removed on the target side. +// If backward is true, then we swap this logic, removing on the source side +// and adding on the target side. You would want to do that e.g. when canceling +// a failed (and currently partial) traffic switch as we may have already +// switched the denied tables entries and in any event we need to go back to +// the original state. func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool) error { if ts.MigrationType() != binlogdatapb.MigrationType_TABLES { return nil diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go index 2ee66b8756b..0418e005682 100644 --- a/go/vt/vtctl/workflow/workflows.go +++ b/go/vt/vtctl/workflow/workflows.go @@ -672,7 +672,7 @@ func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state b return math.MaxInt64 } // We do NOT update the heartbeat timestamp when we are regularly updating the - // position as we replication transactions (GTIDs). + // position as we replicate transactions (GTIDs). // When we DO record a heartbeat, we set the updated time to the same value. // When recording that we are throttled, we update the updated time but NOT // the heartbeat time. From ad09bb01d8336d57e8ff3b03848858d64a4fcec3 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 23 Jan 2025 22:48:41 +0000 Subject: [PATCH 13/15] Minor optimization for copying phase Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/workflows.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go index 0418e005682..a1b4393f2c0 100644 --- a/go/vt/vtctl/workflow/workflows.go +++ b/go/vt/vtctl/workflow/workflows.go @@ -656,6 +656,9 @@ func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerd // switching during the copy phase, so in that case we just return a large lag. // All timestamps are in seconds since epoch. func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 { + if state == binlogdatapb.VReplicationWorkflowState_Copying { + return math.MaxInt64 + } if trxTs == nil { trxTs = &vttimepb.Time{} } @@ -668,9 +671,6 @@ func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state b heartbeatTs = &vttimepb.Time{} } lastHeartbeatTime := heartbeatTs.Seconds - if state == binlogdatapb.VReplicationWorkflowState_Copying { - return math.MaxInt64 - } // We do NOT update the heartbeat timestamp when we are regularly updating the // position as we replicate transactions (GTIDs). // When we DO record a heartbeat, we set the updated time to the same value. From 76396decac35fdfabd9c707ecb4c5c718771981a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 24 Jan 2025 02:06:00 -0500 Subject: [PATCH 14/15] Generalize subtest and test reshard Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/cluster_test.go | 2 +- .../vreplication/vreplication_test.go | 254 ++++++++++-------- 2 files changed, 140 insertions(+), 116 deletions(-) diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index bcf50d43702..1cf44e1d4c7 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -856,7 +856,7 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName tablets := make(map[string]*cluster.VttabletProcess) for _, shard := range keyspace.Shards { for _, tablet := range shard.Tablets { - if tablet.Vttablet.GetTabletStatus() == "SERVING" { + if tablet.Vttablet.GetTabletStatus() == "SERVING" && (tabletType == "" || strings.EqualFold(tablet.Vttablet.GetTabletType(), tabletType)) { log.Infof("Serving status of tablet %s is %s, %s", tablet.Name, tablet.Vttablet.ServingStatus, tablet.Vttablet.GetTabletStatus()) tablets[tablet.Name] = tablet.Vttablet } diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 9b9acfec810..aa8ed044a5a 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -827,121 +827,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl } testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow) - // Confirm that switching writes works as expected in the face of - // vreplication lag (canSwitch() precheck) and when canceling the - // switch due to replication failing to catch up in time. - t.Run("validate switch writes error handling", func(t *testing.T) { - productConn, err := productTab.TabletConn("product", true) - require.NoError(t, err) - defer productConn.Close() - customerConn1, err := customerTab1.TabletConn("customer", true) - require.NoError(t, err) - customerConn2, err := customerTab2.TabletConn("customer", true) - require.NoError(t, err) - startingTestRowID := 10000000 - numTestRows := 100 - addTestRows := func() { - for i := 0; i < numTestRows; i++ { - execQuery(t, productConn, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')", - startingTestRowID+i)) - } - } - deleteTestRows := func() { - execQuery(t, productConn, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID)) - } - addIndex := func() { - for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { - execQuery(t, customerConn, "set session sql_mode=''") - execQuery(t, customerConn, "alter table customer add unique index name_idx (name)") - } - } - dropIndex := func() { - for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { - execQuery(t, customerConn, "alter table customer drop index name_idx") - } - } - lockTargetTable := func() { - for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { - execQuery(t, customerConn, "lock table customer read") - } - } - unlockTargetTable := func() { - for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { - execQuery(t, customerConn, "unlock tables") - } - } - cleanupTestData := func() { - dropIndex() - deleteTestRows() - } - restartWorkflow := func() { - err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow) - require.NoError(t, err, "failed to start workflow: %v", err) - } - waitForTargetToCatchup := func() { - waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) - waitForNoWorkflowLag(t, vc, targetKs, workflow) - } - - // First let's test that the pre-checks work as expected. We ALTER - // the table on the customer (target) shards to add a unique index on - // the name field. - addIndex() - // Then we replicate some test rows across both customer shards by - // inserting them in the product (source) keyspace. - addTestRows() - // Now the workflow should go into the error state and the lag should - // start to climb. So we sleep for twice the max lag duration that we - // will set for the SwitchTraffic call. - lagDuration := 3 * time.Second - time.Sleep(lagDuration * 3) - out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, - "SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String()) - // It should fail in the canSwitch() precheck. - require.Error(t, err) - require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*", - workflow, lagDuration.String()), out) - require.NotContains(t, out, "cancel migration failed") - // Confirm that queries still work fine. - execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") - cleanupTestData() - // We have to restart the workflow again as the duplicate key error - // is a permanent/terminal one. - restartWorkflow() - waitForTargetToCatchup() - - // Now let's test that the cancel works by setting the command timeout - // to a fraction (6s) of the default max repl lag duration (30s). First - // we lock the customer table on the target tablets so that we cannot - // apply the INSERTs and catch up. - lockTargetTable() - addTestRows() - timeout := lagDuration * 2 // 6s - // Use the default max-replication-lag-allowed value of 30s. - // We run the command in a goroutine so that we can unblock things - // after the timeout is reached -- as the vplayer query is blocking - // on the table lock in the MySQL layer. - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, - "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String()) - }() - time.Sleep(timeout) - // Now we can unblock things and let it continue. - unlockTargetTable() - wg.Wait() - // It should fail due to the command context timeout and we should - // successfully cancel. - require.Error(t, err) - require.Contains(t, out, "failed to sync up replication between the source and target") - require.NotContains(t, out, "cancel migration failed") - // Confirm that queries still work fine. - execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") - deleteTestRows() - waitForTargetToCatchup() - }) + testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2}, + sourceKs, targetKs, workflow, workflowType) // Now let's confirm that it now works as expected. switchWrites(t, workflowType, ksWorkflow, false) @@ -1175,6 +1062,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase, targetKsOpts)) tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary") + var sourceTablets, targetTablets []*cluster.VttabletProcess // Test multi-primary setups, like a Galera cluster, which have auto increment steps > 1. for _, tablet := range tablets { @@ -1187,9 +1075,11 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou targetShards = "," + targetShards + "," for _, tab := range tablets { if strings.Contains(targetShards, ","+tab.Shard+",") { + targetTablets = append(targetTablets, tab) log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard) catchup(t, tab, workflow, "Reshard") } else { + sourceTablets = append(sourceTablets, tab) log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard) continue } @@ -1203,6 +1093,9 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou if dryRunResultSwitchWrites != nil { reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary", "--dry-run") } + if tableName == "customer" { + testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, ksName, ksName, workflow, "reshard") + } reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary") reshardAction(t, "Complete", workflow, ksName, "", "", "", "") for tabletName, count := range counts { @@ -1774,6 +1667,137 @@ func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspac }) } +// testSwitchWritesErrorHandling confirms that switching writes works as expected +// in the face of vreplication lag (canSwitch() precheck) and when canceling the +// switch due to replication failing to catch up in time. +// The workflow MUST be migrating the customer table from the source to the +// target keyspace AND the workflow must currently have reads switched but not +// writes. +func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, sourceKs, targetKs, workflow, workflowType string) { + t.Run("validate switch writes error handling", func(t *testing.T) { + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + require.NotZero(t, len(sourceTablets), "no source tablets provided for keyspace %s", sourceKs) + require.NotZero(t, len(targetTablets), "no target tablets provided for keyspace %s", targetKs) + var err error + sourceConns := make([]*mysql.Conn, len(sourceTablets)) + for i, tablet := range sourceTablets { + sourceConns[i], err = tablet.TabletConn(tablet.Keyspace, true) + require.NoError(t, err) + defer sourceConns[i].Close() + } + targetConns := make([]*mysql.Conn, len(targetTablets)) + for i, tablet := range targetTablets { + targetConns[i], err = tablet.TabletConn(tablet.Keyspace, true) + require.NoError(t, err) + defer targetConns[i].Close() + } + startingTestRowID := 10000000 + numTestRows := 100 + addTestRows := func() { + for i := 0; i < numTestRows; i++ { + execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')", + startingTestRowID+i)) + } + } + deleteTestRows := func() { + execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID)) + } + addIndex := func() { + for _, targetConn := range targetConns { + execQuery(t, targetConn, "set session sql_mode=''") + execQuery(t, targetConn, "alter table customer add unique index name_idx (name)") + } + } + dropIndex := func() { + for _, targetConn := range targetConns { + execQuery(t, targetConn, "alter table customer drop index name_idx") + } + } + lockTargetTable := func() { + for _, targetConn := range targetConns { + execQuery(t, targetConn, "lock table customer read") + } + } + unlockTargetTable := func() { + for _, targetConn := range targetConns { + execQuery(t, targetConn, "unlock tables") + } + } + cleanupTestData := func() { + dropIndex() + deleteTestRows() + } + restartWorkflow := func() { + err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow) + require.NoError(t, err, "failed to start workflow: %v", err) + } + waitForTargetToCatchup := func() { + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + waitForNoWorkflowLag(t, vc, targetKs, workflow) + } + + // First let's test that the prechecks work as expected. We ALTER + // the table on the target shards to add a unique index on the name + // field. + addIndex() + // Then we replicate some test rows across the target shards by + // inserting them in the source keyspace. + addTestRows() + // Now the workflow should go into the error state and the lag should + // start to climb. So we sleep for twice the max lag duration that we + // will set for the SwitchTraffic call. + lagDuration := 3 * time.Second + time.Sleep(lagDuration * 3) + out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, + "SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String()) + // It should fail in the canSwitch() precheck. + require.Error(t, err) + require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*", + workflow, lagDuration.String()), out) + require.NotContains(t, out, "cancel migration failed") + // Confirm that queries still work fine. + execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") + cleanupTestData() + // We have to restart the workflow again as the duplicate key error + // is a permanent/terminal one. + restartWorkflow() + waitForTargetToCatchup() + + // Now let's test that the cancel works by setting the command timeout + // to a fraction (6s) of the default max repl lag duration (30s). First + // we lock the customer table on the target tablets so that we cannot + // apply the INSERTs and catch up. + lockTargetTable() + addTestRows() + timeout := lagDuration * 2 // 6s + // Use the default max-replication-lag-allowed value of 30s. + // We run the command in a goroutine so that we can unblock things + // after the timeout is reached -- as the vplayer query is blocking + // on the table lock in the MySQL layer. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, + "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String()) + }() + time.Sleep(timeout) + // Now we can unblock things and let it continue. + unlockTargetTable() + wg.Wait() + // It should fail due to the command context timeout and we should + // successfully cancel. + require.Error(t, err) + require.Contains(t, out, "failed to sync up replication between the source and target") + require.NotContains(t, out, "cancel migration failed") + // Confirm that queries still work fine. + execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") + deleteTestRows() + waitForTargetToCatchup() + }) +} + // restartWorkflow confirms that a workflow can be successfully // stopped and started. func restartWorkflow(t *testing.T, ksWorkflow string) { From 2253a88b4bae3c581bc98fd93abc3417afa1b59d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 24 Jan 2025 07:50:56 -0500 Subject: [PATCH 15/15] Derive sourceKs and targetKs from input tablets Signed-off-by: Matt Lord --- .../endtoend/vreplication/vreplication_test.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index aa8ed044a5a..f12e8edc759 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -828,9 +828,9 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow) testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2}, - sourceKs, targetKs, workflow, workflowType) + workflow, workflowType) - // Now let's confirm that it now works as expected. + // Now let's confirm that it works as expected with an error. switchWrites(t, workflowType, ksWorkflow, false) checkThatVDiffFails(t, targetKs, workflow) @@ -1094,8 +1094,9 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary", "--dry-run") } if tableName == "customer" { - testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, ksName, ksName, workflow, "reshard") + testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, workflow, "reshard") } + // Now let's confirm that it works as expected with an error. reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary") reshardAction(t, "Complete", workflow, ksName, "", "", "", "") for tabletName, count := range counts { @@ -1673,12 +1674,15 @@ func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspac // The workflow MUST be migrating the customer table from the source to the // target keyspace AND the workflow must currently have reads switched but not // writes. -func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, sourceKs, targetKs, workflow, workflowType string) { +func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, workflow, workflowType string) { t.Run("validate switch writes error handling", func(t *testing.T) { - ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) - require.NotZero(t, len(sourceTablets), "no source tablets provided for keyspace %s", sourceKs) - require.NotZero(t, len(targetTablets), "no target tablets provided for keyspace %s", targetKs) + defer vtgateConn.Close() + require.NotZero(t, len(sourceTablets), "no source tablets provided") + require.NotZero(t, len(targetTablets), "no target tablets provided") + sourceKs := sourceTablets[0].Keyspace + targetKs := targetTablets[0].Keyspace + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) var err error sourceConns := make([]*mysql.Conn, len(sourceTablets)) for i, tablet := range sourceTablets {