From 59340f2f9ceb23cc62ee00a152db38565a83ae00 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 22 Jan 2025 23:46:01 +0000 Subject: [PATCH] Add e2e test Signed-off-by: Matt Lord --- .../vreplication/vreplication_test.go | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 617e5f79f6a..3b7dac2394a 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -826,7 +826,35 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl shardNames = append(shardNames, shardName) } testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow) - switchWrites(t, workflowType, ksWorkflow, false) + + // Confirm that switching writes works as expected in the face of + // vreplication lag and cancelling the switch. + t.Run("validate switch writes", func(t *testing.T) { + // First let's test that the pre-checks work as expected. + // Lock the customer table so that we can't replicate the coming + // sentinal row. + productConn, err := productTab.TabletConn("product", true) + require.NoError(t, err) + defer productConn.Close() + customerConn, err := customerTab1.TabletConn("customer", true) + require.NoError(t, err) + execQuery(t, customerConn, "lock tables customer read") + // Insert that sentinal row on the source. + execQuery(t, productConn, "insert into customer(cid, name) values(9999999, 'laggingCustomer')") + // Sleep twice as long as we will set the max replication lag to. + lagDuration := 3 * time.Second + time.Sleep(lagDuration * 2) + _, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--tablet-types=primary", "--workflow", workflow, + "--target-keyspace", targetKs, "SwitchWrites", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String()) + require.Error(t, err) + t.Logf("expected error: %v", err) + execQuery(t, customerConn, "unlock tables") + + // Now confirm that it works as expected. + catchup(t, customerTab1, workflow, workflowType) + catchup(t, customerTab2, workflow, workflowType) + switchWrites(t, workflowType, ksWorkflow, false) + }) checkThatVDiffFails(t, targetKs, workflow) @@ -920,14 +948,14 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl execVtgateQuery(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'") waitForRowCountInTablet(t, customerTab1, "customer", "customer", 1) - waitForRowCountInTablet(t, customerTab2, "customer", "customer", 2) - waitForRowCount(t, vtgateConn, "customer", "customer.customer", 3) + waitForRowCountInTablet(t, customerTab2, "customer", "customer", 3) + waitForRowCount(t, vtgateConn, "customer", "customer.customer", 4) query = "insert into customer (name, cid) values('george', 5)" execVtgateQuery(t, vtgateConn, "customer", query) waitForRowCountInTablet(t, customerTab1, "customer", "customer", 1) - waitForRowCountInTablet(t, customerTab2, "customer", "customer", 3) - waitForRowCount(t, vtgateConn, "customer", "customer.customer", 4) + waitForRowCountInTablet(t, customerTab2, "customer", "customer", 4) + waitForRowCount(t, vtgateConn, "customer", "customer.customer", 5) } }) }