Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Address SwitchTraffic bugs around replication lag and cancel on error #17616

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
101 changes: 101 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,107 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
shardNames = append(shardNames, shardName)
}
testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)

// Confirm that switching writes works as expected in the face of
// vreplication lag (canSwitch() precheck) and when cancelling the
// switch due to replication failing to catch up in time.
t.Run("validate switch writes error handling", func(t *testing.T) {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
productConn, err := productTab.TabletConn("product", true)
require.NoError(t, err)
defer productConn.Close()
customerConn1, err := customerTab1.TabletConn("customer", true)
require.NoError(t, err)
customerConn2, err := customerTab2.TabletConn("customer", true)
require.NoError(t, err)
startingTestRowID := 10000000
numTestRows := 100
addTestRows := func() {
for i := 0; i < numTestRows; i++ {
execQuery(t, productConn, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')",
startingTestRowID+i))
}
}
deleteTestRows := func() {
execQuery(t, productConn, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID))
}
addIndex := func() {
for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} {
execQuery(t, customerConn, "set session sql_mode=''")
execQuery(t, customerConn, "alter table customer add unique index name_idx (name)")
}
}
dropIndex := func() {
for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} {
execQuery(t, customerConn, "alter table customer drop index name_idx")
}
}
lockTargetTable := func() {
for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} {
execQuery(t, customerConn, "lock table customer read")
}
}
unlockTargetTable := func() {
for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} {
execQuery(t, customerConn, "unlock tables")
}
}
cleanupTestData := func() {
dropIndex()
deleteTestRows()
}
restartWorkflow := func() {
// We have to restart the workflow again as the duplicate key error
// is a permanent/terminal one.
err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow)
require.NoError(t, err, "failed to start workflow: %v", err)
}
waitForTargetToCatchup := func() {
restartWorkflow()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
waitForNoWorkflowLag(t, vc, targetKs, workflow)
}

// First let's test that the pre-checks work as expected. We ALTER
// the table on the customer (target) shard to add a unique index on
// the name field.
addIndex()
// Then we insert some test rows across both shards in the product
// (source) keyspace.
addTestRows()
// Now the workflow should go into the error state and the lag should
// start to climb. So we sleep for twice the max lag duration that we
// will set for the SwitchTraffic call.
lagDuration := 3 * time.Second
time.Sleep(lagDuration * 3)
out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String())
// It should fail in the canSwitch() precheck.
require.Error(t, err)
require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*",
workflow, lagDuration.String()), out)
cleanupTestData()
waitForTargetToCatchup()

// Now let's test that the cancel works by setting the command timeout
// to a fraction (6s) of the default max repl lag duration (30s). First
// we lock the customer table on the target tablets so that we cannot
// apply the INSERTs and catch up.
lockTargetTable()
addTestRows()
timeout := lagDuration * 2 // 6s
// Use the default max-replication-lag-allowed value of 30s.
out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
// It should fail due to the command context timeout and we should
// successfully cancel.
require.Error(t, err)
require.Contains(t, out, "failed to sync up replication between the source and target")
unlockTargetTable()
deleteTestRows()
waitForTargetToCatchup()
})

// Now let's confirm that it now works as expected.
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)
Expand Down
7 changes: 3 additions & 4 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3069,15 +3069,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedR
if err != nil {
return "", err
}
if wf.MaxVReplicationTransactionLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationTransactionLag, maxAllowedReplLagSecs), nil
}
for _, stream := range wf.ShardStreams {
for _, st := range stream.GetStreams() {
if st.Message == Frozen {
return cannotSwitchFrozen, nil
}
// If no new events have been replicated after the copy phase then it will be 0.
if vreplLag := time.Now().Unix() - st.TimeUpdated.Seconds; vreplLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil
}
switch st.State {
case binlogdatapb.VReplicationWorkflowState_Copying.String():
return cannotSwitchCopyIncomplete, nil
Expand Down
8 changes: 6 additions & 2 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,9 +1147,13 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
}

// We create a new context while canceling the migration, so that we are independent of the original
// context being cancelled prior to or during the cancel operation.
// context being canceled prior to or during the cancel operation itself.
// First we create a copy of the parent context, so that we maintain the locks, but which cannot be
// canceled by the parent context.
wcCtx := context.WithoutCancel(ctx)
// Now we create a child context from that which has a timeout.
cmTimeout := 60 * time.Second
cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout)
cmCtx, cmCancel := context.WithTimeout(wcCtx, cmTimeout)
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
Expand Down
63 changes: 34 additions & 29 deletions go/vt/vtctl/workflow/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,35 +446,11 @@ func (wf *workflowFetcher) scanWorkflow(
workflow.WorkflowSubType = res.WorkflowSubType.String()
workflow.DeferSecondaryKeys = res.DeferSecondaryKeys

// MaxVReplicationTransactionLag estimates the actual statement processing lag
// between the source and the target. If we are still processing source events it
// is the difference b/w current time and the timestamp of the last event. If
// heartbeats are more recent than the last event, then the lag is the time since
// the last heartbeat as there can be an actual event immediately after the
// heartbeat, but which has not yet been processed on the target.
// We don't allow switching during the copy phase, so in that case we just return
// a large lag. All timestamps are in seconds since epoch.
if rstream.TransactionTimestamp == nil {
rstream.TransactionTimestamp = &vttimepb.Time{}
}
lastTransactionTime := rstream.TransactionTimestamp.Seconds
if rstream.TimeHeartbeat == nil {
rstream.TimeHeartbeat = &vttimepb.Time{}
}
lastHeartbeatTime := rstream.TimeHeartbeat.Seconds
if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() {
meta.maxVReplicationTransactionLag = math.MaxInt64
} else {
if lastTransactionTime == 0 /* no new events after copy */ ||
lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ {

lastTransactionTime = lastHeartbeatTime
}
now := time.Now().Unix() /* seconds since epoch */
transactionReplicationLag := float64(now - lastTransactionTime)
if transactionReplicationLag > meta.maxVReplicationTransactionLag {
meta.maxVReplicationTransactionLag = transactionReplicationLag
}
// MaxVReplicationTransactionLag estimates the max statement processing lag
// between the source and the target across all of the workflow streams.
transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.State)
if transactionReplicationLag > meta.maxVReplicationTransactionLag {
meta.maxVReplicationTransactionLag = transactionReplicationLag
}
}

Expand Down Expand Up @@ -670,3 +646,32 @@ func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerd
}
return rstream.State.String()
}

// getVReplicationTrxLag estimates the actual statement processing lag between the
// source and the target. If we are still processing source events it is the
// difference between current time and the timestamp of the last event. If
// heartbeats are more recent than the last event, then the lag is the time since
// the last heartbeat as there can be an actual event immediately after the
// heartbeat, but which has not yet been processed on the target. We don't allow
// switching during the copy phase, so in that case we just return a large lag.
// All timestamps are in seconds since epoch.
func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 {
if trxTs == nil {
trxTs = &vttimepb.Time{}
}
lastTransactionTime := trxTs.Seconds
if updatedTs == nil {
updatedTs = &vttimepb.Time{}
}
lastUpdateTime := updatedTs.Seconds
if state == binlogdatapb.VReplicationWorkflowState_Copying {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
return math.MaxInt64
}
if state == binlogdatapb.VReplicationWorkflowState_Running && // We could be in the ERROR state
(lastTransactionTime == 0 /* No new events after copy */ ||
lastUpdateTime > lastTransactionTime /* No recent transactions, so all caught up */) {
lastTransactionTime = lastUpdateTime
}
now := time.Now().Unix() // Seconds since epoch
return float64(now - lastTransactionTime)
}
Loading