diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 617e5f79f6a..55d0ec5c07f 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -826,6 +826,105 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl shardNames = append(shardNames, shardName) } testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow) + + // Confirm that switching writes works as expected in the face of + // vreplication lag (canSwitch() precheck) and when cancelling the + // switch due to replication failing to catch up in time. + t.Run("validate switch writes error handling", func(t *testing.T) { + productConn, err := productTab.TabletConn("product", true) + require.NoError(t, err) + defer productConn.Close() + customerConn1, err := customerTab1.TabletConn("customer", true) + require.NoError(t, err) + customerConn2, err := customerTab2.TabletConn("customer", true) + require.NoError(t, err) + startingTestRowID := 10000000 + numTestRows := 100 + addTestRows := func() { + for i := 0; i < numTestRows; i++ { + execQuery(t, productConn, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')", startingTestRowID+i)) + } + } + deleteTestRows := func() { + execQuery(t, productConn, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID)) + } + addIndex := func() { + for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { + execQuery(t, customerConn, "set session sql_mode=''") + execQuery(t, customerConn, "alter table customer add unique index name_idx (name)") + } + } + dropIndex := func() { + for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { + execQuery(t, customerConn, "alter table customer drop index name_idx") + } + } + lockTargetTable := func() { + for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { + execQuery(t, customerConn, "lock table customer read") + } + } + unlockTargetTable := func() { + for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} { + execQuery(t, customerConn, "unlock tables") + } + } + cleanupTestData := func() { + dropIndex() + deleteTestRows() + } + restartWorkflow := func() { + // We have to restart the workflow again as the duplicate key error + // is a permanent/terminal one. + err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow) + require.NoError(t, err, "failed to start workflow: %v", err) + } + waitForTargetToCatchup := func() { + restartWorkflow() + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + waitForNoWorkflowLag(t, vc, targetKs, workflow) + } + + // First let's test that the pre-checks work as expected. + // We ALTER the table on the customer (target) shard to + // add a unique index on the name field. + addIndex() + // Then we insert some test rows across both shards in the product + // (source) keyspace. + addTestRows() + // Now the workflow should go into the error state and the lag should + // start to climb. So we sleep for twice the max lag duration that we + // will set for the SwitchTraffic call. + lagDuration := 3 * time.Second + time.Sleep(lagDuration * 3) + out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, + "SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String()) + // It should fail in the canSwitch() precheck. + require.Error(t, err) + require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*", + workflow, lagDuration.String()), out) + cleanupTestData() + waitForTargetToCatchup() + + // Now let's test that the cancel works by setting the command timeout + // to half the duration. Lock the customer table on the target tablets + // so that we cannot apply the INSERTs and catch up. + lockTargetTable() + addTestRows() + timeout := lagDuration * 2 + // Use the default max-replication-lag-allowed value of 30s. + out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, + "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String()) + // It should fail due to the command context timeout and we should + // successfully cancel. + require.Error(t, err) + require.Contains(t, out, "failed to sync up replication between the source and target") + unlockTargetTable() + deleteTestRows() + waitForTargetToCatchup() + }) + + // Now let's confirm that it now works as expected. switchWrites(t, workflowType, ksWorkflow, false) checkThatVDiffFails(t, targetKs, workflow) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 2d52d951189..732e798bc04 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3069,16 +3069,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedR if err != nil { return "", err } + if wf.MaxVReplicationLag > maxAllowedReplLagSecs { + return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationLag, maxAllowedReplLagSecs), nil + } for _, stream := range wf.ShardStreams { for _, st := range stream.GetStreams() { if st.Message == Frozen { return cannotSwitchFrozen, nil } - // If no new events have been replicated after the copy phase then it will be 0. - vreplLag := int64(getVReplicationTrxLag(st.TransactionTimestamp, st.TimeUpdated, binlogdatapb.VReplicationWorkflowState(binlogdatapb.VReplicationWorkflowState_value[st.State]))) - if vreplLag > maxAllowedReplLagSecs { - return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil - } switch st.State { case binlogdatapb.VReplicationWorkflowState_Copying.String(): return cannotSwitchCopyIncomplete, nil diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go index 99739d3556b..7e800bb28c7 100644 --- a/go/vt/vtctl/workflow/workflows.go +++ b/go/vt/vtctl/workflow/workflows.go @@ -667,11 +667,11 @@ func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.V if state == binlogdatapb.VReplicationWorkflowState_Copying { return math.MaxInt64 } - if lastTransactionTime == 0 /* no new events after copy */ || - lastUpdateTime > lastTransactionTime /* no recent transactions, so all caught up */ { - + if state == binlogdatapb.VReplicationWorkflowState_Running && // We could be in the ERROR state + (lastTransactionTime == 0 /* No new events after copy */ || + lastUpdateTime > lastTransactionTime /* No recent transactions, so all caught up */) { lastTransactionTime = lastUpdateTime } - now := time.Now().Unix() /* seconds since epoch */ + now := time.Now().Unix() // Seconds since epoch return float64(now - lastTransactionTime) }