Skip to content

Commit

Permalink
discarded samples per labelset metrics for throttle by labelset (cort…
Browse files Browse the repository at this point in the history
…exproject#6492)

* discarded samples per labelset metrics for throttle by labelset

Signed-off-by: Ben Ye <[email protected]>

* address comment

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Jan 8, 2025
1 parent 9571b8a commit 6b6663b
Show file tree
Hide file tree
Showing 9 changed files with 747 additions and 266 deletions.
32 changes: 28 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/labelset"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
util_math "github.com/cortexproject/cortex/pkg/util/math"
Expand Down Expand Up @@ -130,7 +131,7 @@ type Distributor struct {
asyncExecutor util.AsyncExecutor

// Map to track label sets from user.
labelSetTracker *labelSetTracker
labelSetTracker *labelset.LabelSetTracker
}

// Config contains the configuration required to
Expand Down Expand Up @@ -388,7 +389,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
asyncExecutor: util.NewNoOpExecutor(),
}

d.labelSetTracker = newLabelSetTracker(d.receivedSamplesPerLabelSet)
d.labelSetTracker = labelset.NewLabelSetTracker()

if cfg.NumPushWorkers > 0 {
util_log.WarnExperimentalUse("Distributor: using goroutine worker pool")
Expand Down Expand Up @@ -810,7 +811,16 @@ func (d *Distributor) updateLabelSetMetrics() {
}
}

d.labelSetTracker.updateMetrics(activeUserSet)
d.labelSetTracker.UpdateMetrics(activeUserSet, func(user, labelSetStr string, removeUser bool) {
if removeUser {
if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, map[string]string{"user": user}); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_per_labelset_total metric for user", "user", user, "err", err)
}
return
}
d.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr)
d.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr)
})
}

func (d *Distributor) cleanStaleIngesterMetrics() {
Expand Down Expand Up @@ -913,6 +923,12 @@ func (d *Distributor) prepareMetadataKeys(req *cortexpb.WriteRequest, limits *va
return metadataKeys, validatedMetadata, firstPartialErr
}

type samplesLabelSetEntry struct {
floatSamples int64
histogramSamples int64
labels labels.Labels
}

func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, int, error, error) {
pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys")
defer pSpan.Finish()
Expand Down Expand Up @@ -1070,8 +1086,16 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
validatedExemplars += len(ts.Exemplars)
}
for h, counter := range labelSetCounters {
d.labelSetTracker.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples)
d.labelSetTracker.Track(userID, h, counter.labels)
labelSetStr := counter.labels.String()
if counter.floatSamples > 0 {
d.receivedSamplesPerLabelSet.WithLabelValues(userID, sampleMetricTypeFloat, labelSetStr).Add(float64(counter.floatSamples))
}
if counter.histogramSamples > 0 {
d.receivedSamplesPerLabelSet.WithLabelValues(userID, sampleMetricTypeHistogram, labelSetStr).Add(float64(counter.histogramSamples))
}
}

return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
}

Expand Down
95 changes: 0 additions & 95 deletions pkg/distributor/metrics.go

This file was deleted.

103 changes: 0 additions & 103 deletions pkg/distributor/metrics_test.go

This file was deleted.

Loading

0 comments on commit 6b6663b

Please sign in to comment.