Skip to content

Commit

Permalink
Changes from self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 23, 2025
1 parent 7019af3 commit bb1207a
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 16 deletions.
16 changes: 8 additions & 8 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)

// Confirm that switching writes works as expected in the face of
// vreplication lag (canSwitch() precheck) and when cancelling the
// vreplication lag (canSwitch() precheck) and when canceling the
// switch due to replication failing to catch up in time.
t.Run("validate switch writes error handling", func(t *testing.T) {
productConn, err := productTab.TabletConn("product", true)
Expand Down Expand Up @@ -875,23 +875,20 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
deleteTestRows()
}
restartWorkflow := func() {
// We have to restart the workflow again as the duplicate key error
// is a permanent/terminal one.
err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow)
require.NoError(t, err, "failed to start workflow: %v", err)
}
waitForTargetToCatchup := func() {
restartWorkflow()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
waitForNoWorkflowLag(t, vc, targetKs, workflow)
}

// First let's test that the pre-checks work as expected. We ALTER
// the table on the customer (target) shard to add a unique index on
// the table on the customer (target) shards to add a unique index on
// the name field.
addIndex()
// Then we insert some test rows across both shards in the product
// (source) keyspace.
// Then we replicate some test rows across both customer shards by
// inserting them in the product (source) keyspace.
addTestRows()
// Now the workflow should go into the error state and the lag should
// start to climb. So we sleep for twice the max lag duration that we
Expand All @@ -908,6 +905,9 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
cleanupTestData()
// We have to restart the workflow again as the duplicate key error
// is a permanent/terminal one.
restartWorkflow()
waitForTargetToCatchup()

// Now let's test that the cancel works by setting the command timeout
Expand All @@ -920,7 +920,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
// Use the default max-replication-lag-allowed value of 30s.
// We run the command in a goroutine so that we can unblock things
// after the timeout is reached -- as the vplayer query is blocking
// on the table lock.
// on the table lock in the MySQL layer.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/stream_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error {
errs := &concurrency.AllErrorRecorder{}

if err := sm.deleteTargetStreams(ctx); err != nil {
errs.RecordError(err)
errs.RecordError(fmt.Errorf("could not delete target streams: %v", err))
}

// Restart the source streams, but leave the Reshard workflow's reverse
Expand All @@ -224,7 +224,7 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error {
return err
})
if err != nil {
errs.RecordError(err)
errs.RecordError(fmt.Errorf("could not restart source streams: %v", err))
sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err)
}
if errs.HasErrors() {
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,11 +1074,12 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error {
}

// switchDeniedTables switches the denied tables rules for the traffic switch.
// They are removed on the source side and added on the target side.
// If backward is true, then we swap this logic, removing on the target side
// and adding on the source side. You would want to do that e.g. when canceling
// a failed (and currently partial) traffic switch as the source and target
// have already been switched in the trafficSwitcher.
// They are added on the source side and removed on the target side.
// If backward is true, then we swap this logic, removing on the source side
// and adding on the target side. You would want to do that e.g. when canceling
// a failed (and currently partial) traffic switch as we may have already
// switched the denied tables entries and in any event we need to go back to
// the original state.
func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool) error {
if ts.MigrationType() != binlogdatapb.MigrationType_TABLES {
return nil
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state b
return math.MaxInt64
}
// We do NOT update the heartbeat timestamp when we are regularly updating the
// position as we replication transactions (GTIDs).
// position as we replicate transactions (GTIDs).
// When we DO record a heartbeat, we set the updated time to the same value.
// When recording that we are throttled, we update the updated time but NOT
// the heartbeat time.
Expand Down

0 comments on commit bb1207a

Please sign in to comment.