diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 45695c54ec8..9b9acfec810 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -828,7 +828,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow) // Confirm that switching writes works as expected in the face of - // vreplication lag (canSwitch() precheck) and when cancelling the + // vreplication lag (canSwitch() precheck) and when canceling the // switch due to replication failing to catch up in time. t.Run("validate switch writes error handling", func(t *testing.T) { productConn, err := productTab.TabletConn("product", true) @@ -875,23 +875,20 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl deleteTestRows() } restartWorkflow := func() { - // We have to restart the workflow again as the duplicate key error - // is a permanent/terminal one. err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow) require.NoError(t, err, "failed to start workflow: %v", err) } waitForTargetToCatchup := func() { - restartWorkflow() waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) waitForNoWorkflowLag(t, vc, targetKs, workflow) } // First let's test that the pre-checks work as expected. We ALTER - // the table on the customer (target) shard to add a unique index on + // the table on the customer (target) shards to add a unique index on // the name field. addIndex() - // Then we insert some test rows across both shards in the product - // (source) keyspace. + // Then we replicate some test rows across both customer shards by + // inserting them in the product (source) keyspace. addTestRows() // Now the workflow should go into the error state and the lag should // start to climb. So we sleep for twice the max lag duration that we @@ -908,6 +905,9 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl // Confirm that queries still work fine. execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") cleanupTestData() + // We have to restart the workflow again as the duplicate key error + // is a permanent/terminal one. + restartWorkflow() waitForTargetToCatchup() // Now let's test that the cancel works by setting the command timeout @@ -920,7 +920,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl // Use the default max-replication-lag-allowed value of 30s. // We run the command in a goroutine so that we can unblock things // after the timeout is reached -- as the vplayer query is blocking - // on the table lock. + // on the table lock in the MySQL layer. wg := sync.WaitGroup{} wg.Add(1) go func() { diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go index 47d5a48c4f9..7b4fe69074f 100644 --- a/go/vt/vtctl/workflow/stream_migrator.go +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -210,7 +210,7 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error { errs := &concurrency.AllErrorRecorder{} if err := sm.deleteTargetStreams(ctx); err != nil { - errs.RecordError(err) + errs.RecordError(fmt.Errorf("could not delete target streams: %v", err)) } // Restart the source streams, but leave the Reshard workflow's reverse @@ -224,7 +224,7 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error { return err }) if err != nil { - errs.RecordError(err) + errs.RecordError(fmt.Errorf("could not restart source streams: %v", err)) sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err) } if errs.HasErrors() { diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index c8bf6c8eb68..1d4ef438f5e 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1074,11 +1074,12 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { } // switchDeniedTables switches the denied tables rules for the traffic switch. -// They are removed on the source side and added on the target side. -// If backward is true, then we swap this logic, removing on the target side -// and adding on the source side. You would want to do that e.g. when canceling -// a failed (and currently partial) traffic switch as the source and target -// have already been switched in the trafficSwitcher. +// They are added on the source side and removed on the target side. +// If backward is true, then we swap this logic, removing on the source side +// and adding on the target side. You would want to do that e.g. when canceling +// a failed (and currently partial) traffic switch as we may have already +// switched the denied tables entries and in any event we need to go back to +// the original state. func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool) error { if ts.MigrationType() != binlogdatapb.MigrationType_TABLES { return nil diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go index 2ee66b8756b..0418e005682 100644 --- a/go/vt/vtctl/workflow/workflows.go +++ b/go/vt/vtctl/workflow/workflows.go @@ -672,7 +672,7 @@ func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state b return math.MaxInt64 } // We do NOT update the heartbeat timestamp when we are regularly updating the - // position as we replication transactions (GTIDs). + // position as we replicate transactions (GTIDs). // When we DO record a heartbeat, we set the updated time to the same value. // When recording that we are throttled, we update the updated time but NOT // the heartbeat time.