diff --git a/go/test/endtoend/topotest/consul/main_test.go b/go/test/endtoend/topotest/consul/main_test.go index c6d48f44930..f37ea24ebae 100644 --- a/go/test/endtoend/topotest/consul/main_test.go +++ b/go/test/endtoend/topotest/consul/main_test.go @@ -26,7 +26,9 @@ import ( topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -221,6 +223,59 @@ func TestKeyspaceLocking(t *testing.T) { topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true) } +// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceRecords method. +// We test out different updates and see if we receive the correct update +// from the watch. +func TestWatchAllKeyspaceRecords(t *testing.T) { + // Create the topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + require.NoError(t, err) + + watchCtx, watchCancel := context.WithCancel(context.Background()) + defer watchCancel() + initRecords, ch, err := ts.WatchAllKeyspaceRecords(watchCtx) + require.NoError(t, err) + + // Check that we have the initial records. + // The existing keyspace record should be seen. + require.Len(t, initRecords, 1) + require.EqualValues(t, KeyspaceName, initRecords[0].KeyspaceInfo.KeyspaceName()) + + // Create a new keyspace record and see that we receive an update. + newKeyspaceName := "ksTest" + err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: policy.DurabilitySemiSync, + }) + require.NoError(t, err) + defer func() { + err = ts.DeleteKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record := <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy) + + // Update the keyspace record and see that we receive an update. + func() { + ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords") + require.NoError(t, err) + defer unlock(&err) + ki.DurabilityPolicy = policy.DurabilityCrossCell + err = ts.UpdateKeyspace(ctx, ki) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy) +} + func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { t.Helper() qr, err := conn.ExecuteFetch(query, 1000, true) diff --git a/go/test/endtoend/topotest/etcd2/main_test.go b/go/test/endtoend/topotest/etcd2/main_test.go index ee2b542109b..38c4e55059a 100644 --- a/go/test/endtoend/topotest/etcd2/main_test.go +++ b/go/test/endtoend/topotest/etcd2/main_test.go @@ -26,7 +26,9 @@ import ( topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils" "vitess.io/vitess/go/test/endtoend/utils" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" "vitess.io/vitess/go/vt/log" @@ -268,6 +270,59 @@ func TestNamedLocking(t *testing.T) { topoutils.WaitForBoolValue(t, &secondCallerAcquired, true) } +// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceRecords method. +// We test out different updates and see if we receive the correct update +// from the watch. +func TestWatchAllKeyspaceRecords(t *testing.T) { + // Create the topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + require.NoError(t, err) + + watchCtx, watchCancel := context.WithCancel(context.Background()) + defer watchCancel() + initRecords, ch, err := ts.WatchAllKeyspaceRecords(watchCtx) + require.NoError(t, err) + + // Check that we have the initial records. + // The existing keyspace record should be seen. + require.Len(t, initRecords, 1) + require.EqualValues(t, KeyspaceName, initRecords[0].KeyspaceInfo.KeyspaceName()) + + // Create a new keyspace record and see that we receive an update. + newKeyspaceName := "ksTest" + err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: policy.DurabilitySemiSync, + }) + require.NoError(t, err) + defer func() { + err = ts.DeleteKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record := <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy) + + // Update the keyspace record and see that we receive an update. + func() { + ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords") + require.NoError(t, err) + defer unlock(&err) + ki.DurabilityPolicy = policy.DurabilityCrossCell + err = ts.UpdateKeyspace(ctx, ki) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy) +} + func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { t.Helper() var res []*sqltypes.Result diff --git a/go/test/endtoend/topotest/zk2/main_test.go b/go/test/endtoend/topotest/zk2/main_test.go index c6569519a3d..afa3a1de438 100644 --- a/go/test/endtoend/topotest/zk2/main_test.go +++ b/go/test/endtoend/topotest/zk2/main_test.go @@ -25,7 +25,9 @@ import ( topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils" "vitess.io/vitess/go/test/endtoend/utils" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" "vitess.io/vitess/go/vt/log" @@ -197,6 +199,59 @@ func TestKeyspaceLocking(t *testing.T) { topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true) } +// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceRecords method. +// We test out different updates and see if we receive the correct update +// from the watch. +func TestWatchAllKeyspaceRecords(t *testing.T) { + // Create the topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + require.NoError(t, err) + + watchCtx, watchCancel := context.WithCancel(context.Background()) + defer watchCancel() + initRecords, ch, err := ts.WatchAllKeyspaceRecords(watchCtx) + require.NoError(t, err) + + // Check that we have the initial records. + // The existing keyspace record should be seen. + require.Len(t, initRecords, 1) + require.EqualValues(t, KeyspaceName, initRecords[0].KeyspaceInfo.KeyspaceName()) + + // Create a new keyspace record and see that we receive an update. + newKeyspaceName := "ksTest" + err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: policy.DurabilitySemiSync, + }) + require.NoError(t, err) + defer func() { + err = ts.DeleteKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record := <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy) + + // Update the keyspace record and see that we receive an update. + func() { + ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords") + require.NoError(t, err) + defer unlock(&err) + ki.DurabilityPolicy = policy.DurabilityCrossCell + err = ts.UpdateKeyspace(ctx, ki) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy) +} + func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { t.Helper() var res []*sqltypes.Result