Skip to content

Commit

Permalink
feat: minor refactor of code to prevent returning of a timer
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Jan 21, 2025
1 parent 0a51b8f commit 7b6f05d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 11 deletions.
6 changes: 2 additions & 4 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vttablet/tmclient"
Expand Down Expand Up @@ -109,7 +108,7 @@ func updateShardsToWatch() {

// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
// channel for polling.
func OpenTabletDiscovery() <-chan time.Time {
func OpenTabletDiscovery() {
ts = topo.Open()
tmc = inst.InitializeTMC()
// Clear existing cache and perform a new refresh.
Expand All @@ -122,10 +121,9 @@ func OpenTabletDiscovery() <-chan time.Time {
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
if err := refreshAllInformation(ctx); err != nil {
if err := refreshTopoTick(ctx); err != nil {
log.Errorf("failed to initialize topo information: %+v", err)
}
return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// getAllTablets gets all tablets from all cells using a goroutine per cell.
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,11 @@ func ContinuousDiscovery() {

go handleDiscoveryRequests()

OpenTabletDiscovery()
healthTick := time.Tick(config.HealthPollSeconds * time.Second)
caretakingTick := time.Tick(time.Minute)
recoveryTick := time.Tick(config.GetRecoveryPollDuration())
tabletTopoTick := OpenTabletDiscovery()
tabletTopoTick := time.Tick(config.GetTopoInformationRefreshDuration())
var recoveryEntrance int64
var snapshotTopologiesTick <-chan time.Time
if config.GetSnapshotTopologyInterval() > 0 {
Expand Down Expand Up @@ -308,16 +309,16 @@ func ContinuousDiscovery() {
}()
case <-tabletTopoTick:
ctx, cancel := context.WithTimeout(context.Background(), config.GetTopoInformationRefreshDuration())
if err := refreshAllInformation(ctx); err != nil {
if err := refreshTopoTick(ctx); err != nil {
log.Errorf("failed to refresh topo information: %+v", err)
}
cancel()
}
}
}

// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks.
func refreshAllInformation(ctx context.Context) error {
// refreshTopoTick refreshes information from the topo server on a time tick.
func refreshTopoTick(ctx context.Context) error {
// Create an errgroup
eg, ctx := errgroup.WithContext(ctx)

Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtorc/logic/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration {
return time.Since(start)
}

func TestRefreshAllInformation(t *testing.T) {
func TestRefreshTopoTick(t *testing.T) {
// Store the old flags and restore on test completion
oldTs := ts
defer func() {
Expand All @@ -85,15 +85,15 @@ func TestRefreshAllInformation(t *testing.T) {
// Test error
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel context to simulate timeout
require.Error(t, refreshAllInformation(ctx))
require.Error(t, refreshTopoTick(ctx))
require.False(t, process.FirstDiscoveryCycleComplete.Load())
_, discoveredOnce = process.HealthTest()
require.False(t, discoveredOnce)

// Test success
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
require.NoError(t, refreshAllInformation(ctx2))
require.NoError(t, refreshTopoTick(ctx2))
require.True(t, process.FirstDiscoveryCycleComplete.Load())
_, discoveredOnce = process.HealthTest()
require.True(t, discoveredOnce)
Expand Down

0 comments on commit 7b6f05d

Please sign in to comment.