Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.1: changefeedccl: add changefeed.total_ranges metric #130983

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,7 @@
<tr><td>APPLICATION</td><td>changefeed.sink_batch_hist_nanos</td><td>Time spent batched in the sink buffer before being flushed and acknowledged</td><td>Changefeeds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.sink_io_inflight</td><td>The number of keys currently inflight as IO requests being sent to the sink</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.size_based_flushes</td><td>Total size based flushes across all feeds</td><td>Flushes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.total_ranges</td><td>The total number of ranges being watched by changefeed aggregators</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.usage.error_count</td><td>Count of errors encountered while generating usage metrics for changefeeds</td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.usage.query_duration</td><td>Time taken by the queries used to generate usage metrics for changefeeds</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.usage.table_bytes</td><td>Aggregated number of bytes of data per table watched by changefeeds</td><td>Storage</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
Expand Down
54 changes: 54 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,60 @@ func TestChangefeedLaggingRangesMetrics(t *testing.T) {
cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks)
}

func TestChangefeedTotalRangesMetric(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
registry := s.Server.JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
defaultSLI, err := metrics.getSLIMetrics(defaultSLIScope)
require.NoError(t, err)
totalRanges := defaultSLI.TotalRanges

// Total ranges should start at zero.
require.Zero(t, totalRanges.Value())

assertTotalRanges := func(expected int64) {
testutils.SucceedsSoon(t, func() error {
if actual := totalRanges.Value(); expected != actual {
return errors.Newf("expected total ranges to be %d, but got %d", expected, actual)
}
return nil
})
}

sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, "CREATE TABLE foo (x int)")

// We expect one range after creating a changefeed on a single table.
fooFeed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH lagging_ranges_polling_interval='1s'")
assertTotalRanges(1)

// We expect total ranges to be zero again after pausing the changefeed.
require.NoError(t, fooFeed.(cdctest.EnterpriseTestFeed).Pause())
assertTotalRanges(0)

// We once again expect one range after resuming the changefeed.
require.NoError(t, fooFeed.(cdctest.EnterpriseTestFeed).Resume())
assertTotalRanges(1)

// We expect two ranges after starting another changefeed on a single table.
barFeed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH lagging_ranges_polling_interval='1s'")
assertTotalRanges(2)

// We expect there to still be one range after cancelling one of the changefeeds.
require.NoError(t, fooFeed.Close())
assertTotalRanges(1)

// We expect there to be no ranges left after cancelling the other changefeed.
require.NoError(t, barFeed.Close())
assertTotalRanges(0)
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
}

func TestChangefeedBackfillObservability(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
22 changes: 12 additions & 10 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ import (
// the caller with information about the state of the kvfeed.
type MonitoringConfig struct {
// LaggingRangesCallback is called periodically with the number of lagging ranges
// in the kvfeed.
LaggingRangesCallback func(int64)
// and total ranges watched by the kvfeed.
LaggingRangesCallback func(lagging int64, total int64)
// LaggingRangesPollingInterval is how often the kv feed will poll for
// lagging ranges.
// lagging ranges and total ranges.
LaggingRangesPollingInterval time.Duration
// LaggingRangesThreshold is how far behind a range must be to be considered
// lagging.
Expand Down Expand Up @@ -176,15 +176,15 @@ func Run(ctx context.Context, cfg Config) error {

func startLaggingRangesObserver(
g ctxgroup.Group,
updateLaggingRanges func(int64),
updateLaggingRanges func(lagging int64, total int64),
pollingInterval time.Duration,
threshold time.Duration,
) func(fn kvcoord.ForEachRangeFn) {
) kvcoord.RangeObserver {
return func(fn kvcoord.ForEachRangeFn) {
g.GoCtx(func(ctx context.Context) error {
// Reset metrics on shutdown.
defer func() {
updateLaggingRanges(0)
updateLaggingRanges(0 /* lagging */, 0 /* total */)
}()

var timer timeutil.Timer
Expand All @@ -198,9 +198,11 @@ func startLaggingRangesObserver(
case <-timer.C:
timer.Read = true

count := int64(0)
var laggingCount, totalCount int64
thresholdTS := timeutil.Now().Add(-1 * threshold)
err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error {
totalCount += 1

// The resolved timestamp of a range determines the timestamp which is caught up to.
// However, during catchup scans, this is not set. For catchup scans, we consider the
// time the partial rangefeed was created to be its resolved ts. Note that a range can
Expand All @@ -212,14 +214,14 @@ func startLaggingRangesObserver(
}

if ts.Less(hlc.Timestamp{WallTime: thresholdTS.UnixNano()}) {
count += 1
laggingCount += 1
}
return nil
})
if err != nil {
return err
}
updateLaggingRanges(count)
updateLaggingRanges(laggingCount, totalCount)
timer.Reset(pollingInterval)
}
}
Expand Down Expand Up @@ -251,7 +253,7 @@ type kvFeed struct {
codec keys.SQLCodec

onBackfillCallback func() func()
rangeObserver func(fn kvcoord.ForEachRangeFn)
rangeObserver kvcoord.RangeObserver
schemaChangeEvents changefeedbase.SchemaChangeEventClass
schemaChangePolicy changefeedbase.SchemaChangePolicy

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type rangeFeedConfig struct {
Spans []kvcoord.SpanTimePair
WithDiff bool
WithFiltering bool
RangeObserver func(fn kvcoord.ForEachRangeFn)
RangeObserver kvcoord.RangeObserver
Knobs TestingKnobs
}

Expand Down
29 changes: 22 additions & 7 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type AggMetrics struct {
AggregatorProgress *aggmetric.AggGauge
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge
TotalRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram

Expand Down Expand Up @@ -157,6 +158,7 @@ type sliMetrics struct {
AggregatorProgress *aggmetric.Gauge
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge
TotalRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram

Expand Down Expand Up @@ -936,12 +938,18 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
Measurement: "Unix Timestamp Nanoseconds",
Unit: metric.Unit_TIMESTAMP_NS,
}
metaLaggingRangePercentage := metric.Metadata{
metaLaggingRanges := metric.Metadata{
Name: "changefeed.lagging_ranges",
Help: "The number of ranges considered to be lagging behind",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaTotalRanges := metric.Metadata{
Name: "changefeed.total_ranges",
Help: "The total number of ranges being watched by changefeed aggregators",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaCloudstorageBufferedBytes := metric.Metadata{
Name: "changefeed.cloudstorage_buffered_bytes",
Help: "The number of bytes buffered in cloudstorage sink files which have not been emitted yet",
Expand Down Expand Up @@ -1044,7 +1052,8 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SchemaRegistrations: b.Counter(metaSchemaRegistryRegistrations),
AggregatorProgress: b.FunctionalGauge(metaAggregatorProgress, functionalGaugeMinFn),
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
LaggingRanges: b.Gauge(metaLaggingRangePercentage),
LaggingRanges: b.Gauge(metaLaggingRanges),
TotalRanges: b.Gauge(metaTotalRanges),
CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes),
KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedKafkaThrottlingNanos,
Expand Down Expand Up @@ -1118,6 +1127,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope),
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
TotalRanges: a.TotalRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),
// TODO(#130358): Again, this doesn't belong here, but it's the most
Expand Down Expand Up @@ -1155,7 +1165,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
// getLaggingRangesCallback returns a function which can be called to update the
// lagging ranges metric. It should be called with the current number of lagging
// ranges.
func (s *sliMetrics) getLaggingRangesCallback() func(int64) {
func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) {
// Because this gauge is shared between changefeeds in the same metrics scope,
// we must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to
// ensure values written by others are not overwritten. The code below is used
Expand All @@ -1172,13 +1182,18 @@ func (s *sliMetrics) getLaggingRangesCallback() func(int64) {
// If 1 lagging range is deleted, last=7,i=10: X.Dec(11-10) = X.Dec(1)
last := struct {
syncutil.Mutex
v int64
lagging int64
total int64
}{}
return func(i int64) {
return func(lagging int64, total int64) {
last.Lock()
defer last.Unlock()
s.LaggingRanges.Dec(last.v - i)
last.v = i

s.LaggingRanges.Dec(last.lagging - lagging)
last.lagging = lagging

s.TotalRanges.Dec(last.total - total)
last.total = total
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,17 @@ var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
// ForEachRangeFn is used to execute `fn` over each range in a rangefeed.
type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error

// A RangeObserver is a function that observes the ranges in a rangefeed
// by polling fn.
type RangeObserver func(fn ForEachRangeFn)

type rangeFeedConfig struct {
disableMuxRangeFeed bool
overSystemTable bool
withDiff bool
withFiltering bool
withMetadata bool
rangeObserver func(ForEachRangeFn)
rangeObserver RangeObserver

knobs struct {
// onRangefeedEvent invoked on each rangefeed event.
Expand Down Expand Up @@ -156,7 +160,7 @@ func WithFiltering() RangeFeedOption {

// WithRangeObserver is called when the rangefeed starts with a function that
// can be used to iterate over all the ranges.
func WithRangeObserver(observer func(ForEachRangeFn)) RangeFeedOption {
func WithRangeObserver(observer RangeObserver) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.rangeObserver = observer
})
Expand Down