Skip to content

Commit

Permalink
tx_throttler: remove unused topology watchers
Browse files Browse the repository at this point in the history
Signed-off-by: deepthi <[email protected]>
  • Loading branch information
deepthi committed Oct 31, 2023
1 parent 9b04f1d commit fecbcf5
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 105 deletions.

This file was deleted.

43 changes: 9 additions & 34 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,20 @@ import (
)

// These vars store the functions used to create the topo server, healthcheck,
// topology watchers and go/vt/throttler. These are provided here so that they can be overridden
// and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
topologyWatcherFactory topologyWatcherFactoryFunc
throttlerFactory throttlerFactoryFunc
healthCheckFactory healthCheckFactoryFunc
throttlerFactory throttlerFactoryFunc
)

func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
}
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency)
}
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
Expand Down Expand Up @@ -149,7 +144,8 @@ type txThrottler struct {
topoServer *topo.Server

// stats
throttlerRunning *stats.Gauge
throttlerRunning *stats.Gauge
// TODO(deepthi): deprecated, should be deleted in v20
topoWatchers *stats.GaugesWithSingleLabel
healthChecksReadTotal *stats.CountersWithMultiLabels
healthChecksRecordedTotal *stats.CountersWithMultiLabels
Expand All @@ -170,10 +166,9 @@ type txThrottlerStateImpl struct {

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
throttleMu sync.Mutex
throttler ThrottlerInterface
stopHealthCheck context.CancelFunc
topologyWatchers map[string]TopologyWatcherInterface
throttleMu sync.Mutex
throttler ThrottlerInterface
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
healthCheckChan chan *discovery.TabletHealth
Expand Down Expand Up @@ -204,8 +199,7 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"),
topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read",
topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "DEPRECATED: transaction throttler topology watchers", "cell"), healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read",
[]string{"cell", "DbType"}),
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded",
[]string{"cell", "DbType"}),
Expand Down Expand Up @@ -322,31 +316,12 @@ func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, t
ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells)
ts.healthCheckChan = ts.healthCheck.Subscribe()

ts.topologyWatchers = make(
map[string]TopologyWatcherInterface, len(ts.healthCheckCells))
for _, cell := range ts.healthCheckCells {
ts.topologyWatchers[cell] = topologyWatcherFactory(
topoServer,
ts.healthCheck,
cell,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency,
)
ts.txThrottler.topoWatchers.Add(cell, 1)
}
}

func (ts *txThrottlerStateImpl) closeHealthCheckStream() {
if ts.healthCheck == nil {
return
}
for cell, watcher := range ts.topologyWatchers {
watcher.Stop()
ts.txThrottler.topoWatchers.Reset(cell)
}
ts.topologyWatchers = nil
ts.stopHealthCheck()
ts.healthCheck.Close()
}
Expand Down
13 changes: 0 additions & 13 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package txthrottler
// Commands to generate the mocks for this test.
//go:generate mockgen -destination mock_healthcheck_test.go -package txthrottler -mock_names "HealthCheck=MockHealthCheck" vitess.io/vitess/go/vt/discovery HealthCheck
//go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler ThrottlerInterface
//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface

import (
"context"
Expand Down Expand Up @@ -74,16 +73,6 @@ func TestEnabledThrottler(t *testing.T) {
return mockHealthCheck
}

topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
assert.Equal(t, ts, topoServer)
assert.Contains(t, []string{"cell1", "cell2"}, cell)
assert.Equal(t, "keyspace", keyspace)
assert.Equal(t, "shard", shard)
result := NewMockTopologyWatcherInterface(mockCtrl)
result.EXPECT().Stop()
return result
}

mockThrottler := NewMockThrottlerInterface(mockCtrl)
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
assert.Equal(t, 1, threadCount)
Expand Down Expand Up @@ -131,7 +120,6 @@ func TestEnabledThrottler(t *testing.T) {
throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl)
assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes)
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts())

assert.False(t, throttlerImpl.Throttle(100, "some_workload"))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some_workload"])
Expand Down Expand Up @@ -162,7 +150,6 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"])
throttlerImpl.Close()
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts())
}

func TestFetchKnownCells(t *testing.T) {
Expand Down

0 comments on commit fecbcf5

Please sign in to comment.