Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Address SwitchTraffic bugs around replication lag and cancel on error #17616

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName
tablets := make(map[string]*cluster.VttabletProcess)
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Tablets {
if tablet.Vttablet.GetTabletStatus() == "SERVING" {
if tablet.Vttablet.GetTabletStatus() == "SERVING" && (tabletType == "" || strings.EqualFold(tablet.Vttablet.GetTabletType(), tabletType)) {
log.Infof("Serving status of tablet %s is %s, %s", tablet.Name, tablet.Vttablet.ServingStatus, tablet.Vttablet.GetTabletStatus())
tablets[tablet.Name] = tablet.Vttablet
}
Expand Down
146 changes: 146 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,11 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
shardNames = append(shardNames, shardName)
}
testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)

testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2},
workflow, workflowType)

// Now let's confirm that it works as expected with an error.
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)
Expand Down Expand Up @@ -1057,6 +1062,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase, targetKsOpts))

tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary")
var sourceTablets, targetTablets []*cluster.VttabletProcess

// Test multi-primary setups, like a Galera cluster, which have auto increment steps > 1.
for _, tablet := range tablets {
Expand All @@ -1069,9 +1075,11 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
targetShards = "," + targetShards + ","
for _, tab := range tablets {
if strings.Contains(targetShards, ","+tab.Shard+",") {
targetTablets = append(targetTablets, tab)
log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard)
catchup(t, tab, workflow, "Reshard")
} else {
sourceTablets = append(sourceTablets, tab)
log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard)
continue
}
Expand All @@ -1085,6 +1093,10 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
if dryRunResultSwitchWrites != nil {
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary", "--dry-run")
}
if tableName == "customer" {
testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, workflow, "reshard")
}
// Now let's confirm that it works as expected with an error.
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary")
reshardAction(t, "Complete", workflow, ksName, "", "", "", "")
for tabletName, count := range counts {
Expand Down Expand Up @@ -1656,6 +1668,140 @@ func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspac
})
}

// testSwitchWritesErrorHandling confirms that switching writes works as expected
// in the face of vreplication lag (canSwitch() precheck) and when canceling the
// switch due to replication failing to catch up in time.
// The workflow MUST be migrating the customer table from the source to the
// target keyspace AND the workflow must currently have reads switched but not
// writes.
func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, workflow, workflowType string) {
t.Run("validate switch writes error handling", func(t *testing.T) {
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
require.NotZero(t, len(sourceTablets), "no source tablets provided")
require.NotZero(t, len(targetTablets), "no target tablets provided")
sourceKs := sourceTablets[0].Keyspace
targetKs := targetTablets[0].Keyspace
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
var err error
sourceConns := make([]*mysql.Conn, len(sourceTablets))
for i, tablet := range sourceTablets {
sourceConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
require.NoError(t, err)
defer sourceConns[i].Close()
}
targetConns := make([]*mysql.Conn, len(targetTablets))
for i, tablet := range targetTablets {
targetConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
require.NoError(t, err)
defer targetConns[i].Close()
}
startingTestRowID := 10000000
numTestRows := 100
addTestRows := func() {
for i := 0; i < numTestRows; i++ {
execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')",
startingTestRowID+i))
}
}
deleteTestRows := func() {
execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID))
}
addIndex := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "set session sql_mode=''")
execQuery(t, targetConn, "alter table customer add unique index name_idx (name)")
}
}
dropIndex := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "alter table customer drop index name_idx")
}
}
lockTargetTable := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "lock table customer read")
}
}
unlockTargetTable := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "unlock tables")
}
}
cleanupTestData := func() {
dropIndex()
deleteTestRows()
}
restartWorkflow := func() {
err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow)
require.NoError(t, err, "failed to start workflow: %v", err)
}
waitForTargetToCatchup := func() {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
waitForNoWorkflowLag(t, vc, targetKs, workflow)
}

// First let's test that the prechecks work as expected. We ALTER
// the table on the target shards to add a unique index on the name
// field.
addIndex()
// Then we replicate some test rows across the target shards by
// inserting them in the source keyspace.
addTestRows()
// Now the workflow should go into the error state and the lag should
// start to climb. So we sleep for twice the max lag duration that we
// will set for the SwitchTraffic call.
lagDuration := 3 * time.Second
time.Sleep(lagDuration * 3)
out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String())
// It should fail in the canSwitch() precheck.
require.Error(t, err)
require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*",
workflow, lagDuration.String()), out)
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
cleanupTestData()
// We have to restart the workflow again as the duplicate key error
// is a permanent/terminal one.
restartWorkflow()
waitForTargetToCatchup()

// Now let's test that the cancel works by setting the command timeout
// to a fraction (6s) of the default max repl lag duration (30s). First
// we lock the customer table on the target tablets so that we cannot
// apply the INSERTs and catch up.
lockTargetTable()
addTestRows()
timeout := lagDuration * 2 // 6s
// Use the default max-replication-lag-allowed value of 30s.
// We run the command in a goroutine so that we can unblock things
// after the timeout is reached -- as the vplayer query is blocking
// on the table lock in the MySQL layer.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
}()
time.Sleep(timeout)
// Now we can unblock things and let it continue.
unlockTargetTable()
wg.Wait()
// It should fail due to the command context timeout and we should
// successfully cancel.
require.Error(t, err)
require.Contains(t, out, "failed to sync up replication between the source and target")
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
deleteTestRows()
waitForTargetToCatchup()
})
}

// restartWorkflow confirms that a workflow can be successfully
// stopped and started.
func restartWorkflow(t *testing.T, ksWorkflow string) {
Expand Down
45 changes: 31 additions & 14 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2915,8 +2915,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
return handleError("failed to migrate the workflow streams", err)
}
if cancel {
sw.cancelMigration(ctx, sm)
return 0, sw.logs(), nil
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return 0, sw.logs(), err
}

// We stop writes on the source before stopping the source streams so that the catchup time
Expand All @@ -2928,7 +2930,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// we actually stop them.
ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}

Expand All @@ -2946,7 +2950,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource)
}
}
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
}

Expand All @@ -2956,7 +2962,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// the tablet's deny list check and the first mysqld side table lock.
for cnt := 1; cnt <= lockTablesCycles; cnt++ {
if err := ts.executeLockTablesOnSource(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err)
}
// No need to UNLOCK the tables as the connection was closed once the locks were acquired
Expand All @@ -2977,7 +2985,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Waiting for streams to catchup")
if err := sw.waitForCatchup(ctx, waitTimeout); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to sync up replication between the source and target", err)
}

Expand All @@ -2986,7 +2996,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Migrating streams")
if err := sw.migrateStreams(ctx, sm); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to migrate the workflow streams", err)
}

Expand All @@ -2995,7 +3007,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Resetting sequences")
if err := sw.resetSequences(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to reset the sequences", err)
}

Expand All @@ -3004,7 +3018,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
ts.Logger().Infof("Creating reverse streams")
if err := sw.createReverseVReplication(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to create the reverse vreplication streams", err)
}

Expand All @@ -3019,7 +3035,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
initSeqCtx, cancel := context.WithTimeout(ctx, waitTimeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}
Expand Down Expand Up @@ -3069,15 +3087,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedR
if err != nil {
return "", err
}
if wf.MaxVReplicationTransactionLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationTransactionLag, maxAllowedReplLagSecs), nil
}
for _, stream := range wf.ShardStreams {
for _, st := range stream.GetStreams() {
if st.Message == Frozen {
return cannotSwitchFrozen, nil
}
// If no new events have been replicated after the copy phase then it will be 0.
if vreplLag := time.Now().Unix() - st.TimeUpdated.Seconds; vreplLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil
}
switch st.State {
case binlogdatapb.VReplicationWorkflowState_Copying.String():
return cannotSwitchCopyIncomplete, nil
Expand Down
14 changes: 11 additions & 3 deletions go/vt/vtctl/workflow/stream_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,15 @@ func (sm *StreamMigrator) Templates() []*VReplicationStream {
}

// CancelStreamMigrations cancels the stream migrations.
func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error {
if sm.streams == nil {
return
return nil
}
errs := &concurrency.AllErrorRecorder{}

_ = sm.deleteTargetStreams(ctx)
if err := sm.deleteTargetStreams(ctx); err != nil {
errs.RecordError(fmt.Errorf("could not delete target streams: %v", err))
}

// Restart the source streams, but leave the Reshard workflow's reverse
// variant stopped.
Expand All @@ -221,8 +224,13 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
return err
})
if err != nil {
errs.RecordError(fmt.Errorf("could not restart source streams: %v", err))
sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err)
}
if errs.HasErrors() {
return errs.AggrError(vterrors.Aggregate)
}
return nil
}

// MigrateStreams migrates N streams
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func (r *switcher) stopStreams(ctx context.Context, sm *StreamMigrator) ([]strin
return sm.StopStreams(ctx)
}

func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
r.ts.cancelMigration(ctx, sm)
func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
return r.ts.cancelMigration(ctx, sm)
}

func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,9 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) (
return nil, nil
}

func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) {
func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
dr.drLog.Log("Cancel migration as requested")
return nil
}

func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string, _ ...topo.LockOption) (context.Context, func(*error), error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

type iswitcher interface {
lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error)
cancelMigration(ctx context.Context, sm *StreamMigrator)
cancelMigration(ctx context.Context, sm *StreamMigrator) error
stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error)
stopSourceWrites(ctx context.Context) error
waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error
Expand Down
Loading
Loading