Skip to content

Commit

Permalink
feat: added support for watching shards records as well, and for node…
Browse files Browse the repository at this point in the history
…s being deleted

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Jan 21, 2025
1 parent 7b6f05d commit 6e164aa
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 44 deletions.
58 changes: 53 additions & 5 deletions go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceRecords method.
// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method.
// We test out different updates and see if we receive the correct update
// from the watch.
func TestWatchAllKeyspaceRecords(t *testing.T) {
Expand All @@ -233,13 +233,27 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {

watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
initRecords, ch, err := ts.WatchAllKeyspaceRecords(watchCtx)
initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx)
require.NoError(t, err)

// Check that we have the initial records.
// The existing keyspace record should be seen.
require.Len(t, initRecords, 1)
require.EqualValues(t, KeyspaceName, initRecords[0].KeyspaceInfo.KeyspaceName())
// The existing keyspace and shard records should be seen.
require.Len(t, initRecords, 2)
var ksInfo *topo.KeyspaceInfo
var shardInfo *topo.ShardInfo
for _, record := range initRecords {
if record.KeyspaceInfo != nil {
ksInfo = record.KeyspaceInfo
}
if record.ShardInfo != nil {
shardInfo = record.ShardInfo
}
}
require.NotNil(t, ksInfo)
require.NotNil(t, shardInfo)
require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName())
require.EqualValues(t, KeyspaceName, shardInfo.Keyspace())
require.EqualValues(t, "0", shardInfo.ShardName())

// Create a new keyspace record and see that we receive an update.
newKeyspaceName := "ksTest"
Expand All @@ -258,6 +272,40 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)

// Creating a shard should also trigger an update.
err = ts.CreateShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Nil(t, record.ShardInfo.Shard.PrimaryAlias)

primaryAlias := &topodatapb.TabletAlias{
Cell: cell,
Uid: 100,
}
// Updating a shard should also trigger an update.
_, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryAlias
return nil
})
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias)

// Deleting a shard should also trigger an update.
err = ts.DeleteShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Error(t, record.Err)

// Update the keyspace record and see that we receive an update.
func() {
ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
Expand Down
58 changes: 53 additions & 5 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestNamedLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
}

// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceRecords method.
// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method.
// We test out different updates and see if we receive the correct update
// from the watch.
func TestWatchAllKeyspaceRecords(t *testing.T) {
Expand All @@ -280,13 +280,27 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {

watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
initRecords, ch, err := ts.WatchAllKeyspaceRecords(watchCtx)
initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx)
require.NoError(t, err)

// Check that we have the initial records.
// The existing keyspace record should be seen.
require.Len(t, initRecords, 1)
require.EqualValues(t, KeyspaceName, initRecords[0].KeyspaceInfo.KeyspaceName())
// The existing keyspace and shard records should be seen.
require.Len(t, initRecords, 2)
var ksInfo *topo.KeyspaceInfo
var shardInfo *topo.ShardInfo
for _, record := range initRecords {
if record.KeyspaceInfo != nil {
ksInfo = record.KeyspaceInfo
}
if record.ShardInfo != nil {
shardInfo = record.ShardInfo
}
}
require.NotNil(t, ksInfo)
require.NotNil(t, shardInfo)
require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName())
require.EqualValues(t, KeyspaceName, shardInfo.Keyspace())
require.EqualValues(t, "0", shardInfo.ShardName())

// Create a new keyspace record and see that we receive an update.
newKeyspaceName := "ksTest"
Expand All @@ -305,6 +319,40 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)

// Creating a shard should also trigger an update.
err = ts.CreateShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Nil(t, record.ShardInfo.Shard.PrimaryAlias)

primaryAlias := &topodatapb.TabletAlias{
Cell: cell,
Uid: 100,
}
// Updating a shard should also trigger an update.
_, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryAlias
return nil
})
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias)

// Deleting a shard should also trigger an update.
err = ts.DeleteShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Error(t, record.Err)

// Update the keyspace record and see that we receive an update.
func() {
ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
Expand Down
58 changes: 53 additions & 5 deletions go/test/endtoend/topotest/zk2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceRecords method.
// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method.
// We test out different updates and see if we receive the correct update
// from the watch.
func TestWatchAllKeyspaceRecords(t *testing.T) {
Expand All @@ -209,13 +209,27 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {

watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
initRecords, ch, err := ts.WatchAllKeyspaceRecords(watchCtx)
initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx)
require.NoError(t, err)

// Check that we have the initial records.
// The existing keyspace record should be seen.
require.Len(t, initRecords, 1)
require.EqualValues(t, KeyspaceName, initRecords[0].KeyspaceInfo.KeyspaceName())
// The existing keyspace and shard records should be seen.
require.Len(t, initRecords, 2)
var ksInfo *topo.KeyspaceInfo
var shardInfo *topo.ShardInfo
for _, record := range initRecords {
if record.KeyspaceInfo != nil {
ksInfo = record.KeyspaceInfo
}
if record.ShardInfo != nil {
shardInfo = record.ShardInfo
}
}
require.NotNil(t, ksInfo)
require.NotNil(t, shardInfo)
require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName())
require.EqualValues(t, KeyspaceName, shardInfo.Keyspace())
require.EqualValues(t, "0", shardInfo.ShardName())

// Create a new keyspace record and see that we receive an update.
newKeyspaceName := "ksTest"
Expand All @@ -234,6 +248,40 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)

// Creating a shard should also trigger an update.
err = ts.CreateShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Nil(t, record.ShardInfo.Shard.PrimaryAlias)

primaryAlias := &topodatapb.TabletAlias{
Cell: cell,
Uid: 100,
}
// Updating a shard should also trigger an update.
_, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryAlias
return nil
})
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias)

// Deleting a shard should also trigger an update.
err = ts.DeleteShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Error(t, record.Err)

// Update the keyspace record and see that we receive an update.
func() {
ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
Expand Down
59 changes: 42 additions & 17 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ type KeyspaceInfo struct {
}

// NewKeyspaceInfo creates a new KeyspaceInfo.
func NewKeyspaceInfo(name string, keyspace *topodatapb.Keyspace) *KeyspaceInfo {
func NewKeyspaceInfo(name string, keyspace *topodatapb.Keyspace, version Version) *KeyspaceInfo {
return &KeyspaceInfo{
keyspace: name,
Keyspace: keyspace,
version: version,
}
}

Expand Down Expand Up @@ -440,16 +441,17 @@ func (ts *Server) GetShardNames(ctx context.Context, keyspace string) ([]string,
}

// WatchKeyspacePrefixData wraps the data we receive on the watch recursive channel
// The WatchAllKeyspaceRecords API guarantees exactly one of Value or Err will be set.
// The WatchAllKeyspaceAndShardRecords API guarantees exactly one of Value or Err will be set.
type WatchKeyspacePrefixData struct {
KeyspaceInfo *KeyspaceInfo
ShardInfo *ShardInfo
Err error
}

// WatchAllKeyspaceRecords will set a watch on the Keyspace prefix.
// WatchAllKeyspaceAndShardRecords will set a watch on the Keyspace prefix.
// It has the same contract as conn.WatchRecursive, but it also unpacks the
// contents into a Keyspace object.
func (ts *Server) WatchAllKeyspaceRecords(ctx context.Context) ([]*WatchKeyspacePrefixData, <-chan *WatchKeyspacePrefixData, error) {
func (ts *Server) WatchAllKeyspaceAndShardRecords(ctx context.Context) ([]*WatchKeyspacePrefixData, <-chan *WatchKeyspacePrefixData, error) {
if err := ctx.Err(); err != nil {
return nil, nil, err
}
Expand All @@ -463,7 +465,7 @@ func (ts *Server) WatchAllKeyspaceRecords(ctx context.Context) ([]*WatchKeyspace
return nil, nil, err
}
// Unpack the initial data.
initialRes, err := checkAndUnpackKeyspaceRecord(current...)
initialRes, err := checkAndUnpackKeyspacePrefixRecord(current...)
if err != nil {
// Cancel the watch, drain channel.
cancel()
Expand All @@ -484,14 +486,21 @@ func (ts *Server) WatchAllKeyspaceRecords(ctx context.Context) ([]*WatchKeyspace

for wd := range wdChannel {
if wd.Err != nil {
// Last error value, we're done.
// wdChannel will be closed right after
// this, no need to do anything.
changes <- &WatchKeyspacePrefixData{Err: wd.Err}
return
if IsErrType(wd.Err, NoNode) {
// One of the nodes was deleted.
// We have the path and it will be processed like normal.
// We make sure to copy the error from this to signal to the receiver
// that the node was deleted.
} else {
// Last error value, we're done.
// wdChannel will be closed right after
// this, no need to do anything.
changes <- &WatchKeyspacePrefixData{Err: wd.Err}
return
}
}

res, err := checkAndUnpackKeyspaceRecord(wd)
res, err := checkAndUnpackKeyspacePrefixRecord(wd)
if err != nil {
cancel()
for range wdChannel {
Expand All @@ -513,20 +522,36 @@ func (ts *Server) WatchAllKeyspaceRecords(ctx context.Context) ([]*WatchKeyspace
return initialRes, changes, nil
}

// checkAndUnpackKeyspaceRecord checks for Keyspace objects and unpacks them.
func checkAndUnpackKeyspaceRecord(wds ...*WatchDataRecursive) ([]*WatchKeyspacePrefixData, error) {
// checkAndUnpackKeyspacePrefixRecord checks for Keyspace objects and unpacks them.
func checkAndUnpackKeyspacePrefixRecord(wds ...*WatchDataRecursive) ([]*WatchKeyspacePrefixData, error) {
var res []*WatchKeyspacePrefixData
for _, wd := range wds {
fileDir, fileType := path.Split(wd.Path)
// Check if the file is a keyspace record.
// If it is, then we unpack it.
if fileType == KeyspaceFile {
// Check the type of file.
//path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile)
switch fileType {
case KeyspaceFile:
// Unpack a keyspace record.
ksName := path.Base(fileDir)
value := &topodatapb.Keyspace{}
if err := value.UnmarshalVT(wd.Contents); err != nil {
return nil, err
}
res = append(res, &WatchKeyspacePrefixData{
KeyspaceInfo: NewKeyspaceInfo(path.Base(fileDir), value),
Err: wd.Err,
KeyspaceInfo: NewKeyspaceInfo(ksName, value, wd.Version),
})
case ShardFile:
shardName := path.Base(fileDir)
ksName := path.Base(path.Dir(path.Dir(path.Clean(fileDir))))
// Unpack a shard record.
value := &topodatapb.Shard{}
if err := value.UnmarshalVT(wd.Contents); err != nil {
return nil, err
}
res = append(res, &WatchKeyspacePrefixData{
Err: wd.Err,
ShardInfo: NewShardInfo(ksName, shardName, value, wd.Version),
})
}
}
Expand Down
Loading

0 comments on commit 6e164aa

Please sign in to comment.