diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index eb25969978..bdeeabebeb 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -36,6 +36,7 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" + "github.com/cortexproject/cortex/pkg/util/labelset" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" util_math "github.com/cortexproject/cortex/pkg/util/math" @@ -130,7 +131,7 @@ type Distributor struct { asyncExecutor util.AsyncExecutor // Map to track label sets from user. - labelSetTracker *labelSetTracker + labelSetTracker *labelset.LabelSetTracker } // Config contains the configuration required to @@ -388,7 +389,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove asyncExecutor: util.NewNoOpExecutor(), } - d.labelSetTracker = newLabelSetTracker(d.receivedSamplesPerLabelSet) + d.labelSetTracker = labelset.NewLabelSetTracker() if cfg.NumPushWorkers > 0 { util_log.WarnExperimentalUse("Distributor: using goroutine worker pool") @@ -810,7 +811,16 @@ func (d *Distributor) updateLabelSetMetrics() { } } - d.labelSetTracker.updateMetrics(activeUserSet) + d.labelSetTracker.UpdateMetrics(activeUserSet, func(user, labelSetStr string, removeUser bool) { + if removeUser { + if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, map[string]string{"user": user}); err != nil { + level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_per_labelset_total metric for user", "user", user, "err", err) + } + return + } + d.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr) + d.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr) + }) } func (d *Distributor) cleanStaleIngesterMetrics() { @@ -913,6 +923,12 @@ func (d *Distributor) prepareMetadataKeys(req *cortexpb.WriteRequest, limits *va return metadataKeys, validatedMetadata, firstPartialErr } +type samplesLabelSetEntry struct { + floatSamples int64 + histogramSamples int64 + labels labels.Labels +} + func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, int, error, error) { pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys") defer pSpan.Finish() @@ -1070,8 +1086,16 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write validatedExemplars += len(ts.Exemplars) } for h, counter := range labelSetCounters { - d.labelSetTracker.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples) + d.labelSetTracker.Track(userID, h, counter.labels) + labelSetStr := counter.labels.String() + if counter.floatSamples > 0 { + d.receivedSamplesPerLabelSet.WithLabelValues(userID, sampleMetricTypeFloat, labelSetStr).Add(float64(counter.floatSamples)) + } + if counter.histogramSamples > 0 { + d.receivedSamplesPerLabelSet.WithLabelValues(userID, sampleMetricTypeHistogram, labelSetStr).Add(float64(counter.histogramSamples)) + } } + return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil } diff --git a/pkg/distributor/metrics.go b/pkg/distributor/metrics.go deleted file mode 100644 index 786ab954c2..0000000000 --- a/pkg/distributor/metrics.go +++ /dev/null @@ -1,95 +0,0 @@ -package distributor - -import ( - "sync" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - - "github.com/cortexproject/cortex/pkg/util" -) - -const ( - numMetricShards = 128 -) - -type labelSetTracker struct { - receivedSamplesPerLabelSet *prometheus.CounterVec - - shards []*labelSetCounterShard -} - -func newLabelSetTracker(receivedSamplesPerLabelSet *prometheus.CounterVec) *labelSetTracker { - shards := make([]*labelSetCounterShard, 0, numMetricShards) - for i := 0; i < numMetricShards; i++ { - shards = append(shards, &labelSetCounterShard{ - RWMutex: &sync.RWMutex{}, - userLabelSets: map[string]map[uint64]labels.Labels{}, - }) - } - return &labelSetTracker{shards: shards, receivedSamplesPerLabelSet: receivedSamplesPerLabelSet} -} - -type labelSetCounterShard struct { - *sync.RWMutex - userLabelSets map[string]map[uint64]labels.Labels -} - -type samplesLabelSetEntry struct { - floatSamples int64 - histogramSamples int64 - labels labels.Labels -} - -func (m *labelSetTracker) increaseSamplesLabelSet(userId string, hash uint64, labelSet labels.Labels, floatSamples, histogramSamples int64) { - s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricShards] - s.Lock() - if userEntry, ok := s.userLabelSets[userId]; ok { - if _, ok2 := userEntry[hash]; !ok2 { - userEntry[hash] = labelSet - } - } else { - s.userLabelSets[userId] = map[uint64]labels.Labels{hash: labelSet} - } - // Unlock before we update metrics. - s.Unlock() - - labelSetStr := labelSet.String() - if floatSamples > 0 { - m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeFloat, labelSetStr).Add(float64(floatSamples)) - } - if histogramSamples > 0 { - m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeHistogram, labelSetStr).Add(float64(histogramSamples)) - } -} - -// Clean up dangling user and label set from the tracker as well as metrics. -func (m *labelSetTracker) updateMetrics(userSet map[string]map[uint64]struct{}) { - for i := 0; i < numMetricShards; i++ { - shard := m.shards[i] - shard.Lock() - - for user, userEntry := range shard.userLabelSets { - limits, ok := userSet[user] - if !ok { - // If user is removed, we will delete user metrics in cleanupInactiveUser loop - // so skip deleting metrics here. - delete(shard.userLabelSets, user) - continue - } - for h, lbls := range userEntry { - // This limit no longer exists. - if _, ok := limits[h]; !ok { - delete(userEntry, h) - labelSetStr := lbls.String() - m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr) - m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr) - continue - } - } - } - - shard.Unlock() - } -} diff --git a/pkg/distributor/metrics_test.go b/pkg/distributor/metrics_test.go deleted file mode 100644 index 842e4fe6c3..0000000000 --- a/pkg/distributor/metrics_test.go +++ /dev/null @@ -1,103 +0,0 @@ -package distributor - -import ( - "strings" - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/prometheus/model/labels" - "github.com/stretchr/testify/require" -) - -func TestLabelSetCounter(t *testing.T) { - metricName := "cortex_distributor_received_samples_per_labelset_total" - reg := prometheus.NewPedanticRegistry() - dummyCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: metricName, - Help: "", - }, []string{"user", "type", "labelset"}) - counter := newLabelSetTracker(dummyCounter) - reg.MustRegister(dummyCounter) - - userID := "1" - userID2 := "2" - userID3 := "3" - - counter.increaseSamplesLabelSet(userID, 0, labels.FromStrings("foo", "bar"), 10, 0) - counter.increaseSamplesLabelSet(userID, 1, labels.FromStrings("foo", "baz"), 0, 5) - counter.increaseSamplesLabelSet(userID, 3, labels.EmptyLabels(), 20, 20) - counter.increaseSamplesLabelSet(userID2, 0, labels.FromStrings("foo", "bar"), 100, 5) - counter.increaseSamplesLabelSet(userID2, 2, labels.FromStrings("cluster", "us-west-2"), 0, 100) - - require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` - # TYPE cortex_distributor_received_samples_per_labelset_total counter - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 100 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 20 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 20 - `), metricName)) - - // Increment metrics and add a new user. - counter.increaseSamplesLabelSet(userID, 3, labels.EmptyLabels(), 20, 20) - counter.increaseSamplesLabelSet(userID2, 2, labels.FromStrings("cluster", "us-west-2"), 0, 100) - counter.increaseSamplesLabelSet(userID2, 4, labels.FromStrings("cluster", "us-west-2"), 10, 10) - counter.increaseSamplesLabelSet(userID3, 4, labels.FromStrings("cluster", "us-east-1"), 30, 30) - - require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` - # TYPE cortex_distributor_received_samples_per_labelset_total counter - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="float",user="3"} 30 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="histogram",user="3"} 30 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 10 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 40 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 40 - `), metricName)) - - // Remove user 2. But metrics for user 2 not cleaned up as it is expected to be cleaned up - // in cleanupInactiveUser loop. It is expected to have 3 minutes delay in this case. - userSet := map[string]map[uint64]struct { - }{ - userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}}, - userID3: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}}, - } - counter.updateMetrics(userSet) - require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` - # TYPE cortex_distributor_received_samples_per_labelset_total counter - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="float",user="3"} 30 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="histogram",user="3"} 30 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 10 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 40 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 40 - `), metricName)) - - // Simulate existing limits removed for each user. - userSet = map[string]map[uint64]struct { - }{ - userID: {0: struct{}{}}, - userID2: {}, - userID3: {}, - } - counter.updateMetrics(userSet) - - require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` - # TYPE cortex_distributor_received_samples_per_labelset_total counter - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 10 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 - `), metricName)) -} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dc82554ea9..d46398d4b9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -92,6 +92,8 @@ const ( // Period at which we should reset the max inflight query requests counter. maxInflightRequestResetPeriod = 1 * time.Minute + + labelSetMetricsTickInterval = 30 * time.Second ) var ( @@ -942,6 +944,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error { maxTrackerResetTicker := time.NewTicker(maxInflightRequestResetPeriod) defer maxTrackerResetTicker.Stop() + labelSetMetricsTicker := time.NewTicker(labelSetMetricsTickInterval) + defer labelSetMetricsTicker.Stop() + for { select { case <-metadataPurgeTicker.C: @@ -963,6 +968,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { i.maxInflightPushRequests.Tick() case <-userTSDBConfigTicker.C: i.updateUserTSDBConfigs() + case <-labelSetMetricsTicker.C: + i.updateLabelSetMetrics() case <-ctx.Done(): return nil case err := <-i.subservicesWatcher.Chan(): @@ -1027,6 +1034,25 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) { } } +func (i *Ingester) updateLabelSetMetrics() { + activeUserSet := make(map[string]map[uint64]struct{}) + for _, userID := range i.getTSDBUsers() { + userDB := i.getTSDB(userID) + if userDB == nil { + continue + } + + limits := i.limits.LimitsPerLabelSet(userID) + activeUserSet[userID] = make(map[uint64]struct{}, len(limits)) + for _, l := range limits { + activeUserSet[userID][l.Hash] = struct{}{} + } + } + + // Update label set metrics in validate metrics. + i.validateMetrics.UpdateLabelSet(activeUserSet, i.logger) +} + func (i *Ingester) RenewTokenHandler(w http.ResponseWriter, r *http.Request) { i.lifecycler.RenewTokens(0.1, r.Context()) w.WriteHeader(http.StatusNoContent) @@ -1140,6 +1166,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // process it before samples. Otherwise, we risk returning an error before ingestion. ingestedMetadata := i.pushMetadata(ctx, userID, req.GetMetadata()) + reasonCounter := newLabelSetReasonCounters() + // Keep track of some stats which are tracked only if the samples will be // successfully committed var ( @@ -1165,7 +1193,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte } } - handleAppendFailure = func(err error, timestampMs int64, lbls []cortexpb.LabelAdapter, copiedLabels labels.Labels) (rollback bool) { + handleAppendFailure = func(err error, timestampMs int64, lbls []cortexpb.LabelAdapter, copiedLabels labels.Labels, matchedLabelSetLimits []validation.LimitsPerLabelSet) (rollback bool) { // Check if the error is a soft error we can proceed on. If so, we keep track // of it, so that we can return it back to the distributor, which will return a // 400 error to the client. The client (Prometheus) will not retry on 400, and @@ -1201,6 +1229,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}): perLabelSetSeriesLimitCount++ + // We only track per labelset discarded samples for throttling by labelset limit. + reasonCounter.increment(matchedLabelSetLimits, perLabelsetSeriesLimit) updateFirstPartial(func() error { return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause, copiedLabels)) }) @@ -1245,6 +1275,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // To find out if any histogram was added to this series, we keep old value. oldSucceededHistogramsCount := succeededHistogramsCount + // Copied labels will be empty if ref is 0. + if ref == 0 { + // Copy the label set because both TSDB and the active series tracker may retain it. + copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels) + } + matchedLabelSetLimits := i.limiter.limitsPerLabelSets(userID, copiedLabels) + for _, s := range ts.Samples { var err error @@ -1256,8 +1293,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte } } else { - // Copy the label set because both TSDB and the active series tracker may retain it. - copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels) // Retain the reference in case there are multiple samples for the series. if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil { // Keep track of what series needs to be expired on the postings cache @@ -1271,7 +1306,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte failedSamplesCount++ - if rollback := handleAppendFailure(err, s.TimestampMs, ts.Labels, copiedLabels); !rollback { + if rollback := handleAppendFailure(err, s.TimestampMs, ts.Labels, copiedLabels, matchedLabelSetLimits); !rollback { continue } // The error looks an issue on our side, so we should rollback @@ -1316,7 +1351,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte failedHistogramsCount++ - if rollback := handleAppendFailure(err, hp.TimestampMs, ts.Labels, copiedLabels); !rollback { + if rollback := handleAppendFailure(err, hp.TimestampMs, ts.Labels, copiedLabels, matchedLabelSetLimits); !rollback { continue } // The error looks an issue on our side, so we should rollback @@ -1431,6 +1466,14 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte i.validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(discardedNativeHistogramCount)) } + for h, counter := range reasonCounter.counters { + labelStr := counter.lbls.String() + i.validateMetrics.LabelSetTracker.Track(userID, h, counter.lbls) + for reason, count := range counter.reasonCounter { + i.validateMetrics.DiscardedSamplesPerLabelSet.WithLabelValues(reason, userID, labelStr).Add(float64(count)) + } + } + // Distributor counts both samples, metadata and histograms, so for consistency ingester does the same. i.ingestionRate.Add(int64(succeededSamplesCount + succeededHistogramsCount + ingestedMetadata)) @@ -3190,3 +3233,31 @@ func putTimeSeriesChunksSlice(p []client.TimeSeriesChunk) { tsChunksPool.Put(p[:0]) } } + +type labelSetReasonCounters struct { + counters map[uint64]*labelSetReasonCounter +} + +type labelSetReasonCounter struct { + reasonCounter map[string]int + lbls labels.Labels +} + +func newLabelSetReasonCounters() *labelSetReasonCounters { + return &labelSetReasonCounters{counters: make(map[uint64]*labelSetReasonCounter)} +} + +func (c *labelSetReasonCounters) increment(matchedLabelSetLimits []validation.LimitsPerLabelSet, reason string) { + for _, l := range matchedLabelSetLimits { + if rc, exists := c.counters[l.Hash]; exists { + rc.reasonCounter[reason]++ + } else { + c.counters[l.Hash] = &labelSetReasonCounter{ + reasonCounter: map[string]int{ + reason: 1, + }, + lbls: l.LabelSet, + } + } + } +} diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 6505c8f314..15fec1d1fd 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -264,6 +264,10 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { ing.updateActiveSeries(ctx) require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_discarded_samples_per_labelset_total The total number of samples that were discarded for each labelset. + # TYPE cortex_discarded_samples_per_labelset_total counter + cortex_discarded_samples_per_labelset_total{labelset="{label1=\"value1\"}",reason="per_labelset_series_limit",user="1"} 1 + cortex_discarded_samples_per_labelset_total{labelset="{label2=\"value2\"}",reason="per_labelset_series_limit",user="1"} 1 # HELP cortex_discarded_samples_total The total number of samples that were discarded. # TYPE cortex_discarded_samples_total counter cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 2 @@ -275,7 +279,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { # TYPE cortex_ingester_usage_per_labelset gauge cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 - `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset", "cortex_discarded_samples_total")) + `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset", "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) // Should apply composite limits limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, @@ -311,6 +315,10 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { // Should backfill ing.updateActiveSeries(ctx) require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_discarded_samples_per_labelset_total The total number of samples that were discarded for each labelset. + # TYPE cortex_discarded_samples_per_labelset_total counter + cortex_discarded_samples_per_labelset_total{labelset="{label1=\"value1\"}",reason="per_labelset_series_limit",user="1"} 1 + cortex_discarded_samples_per_labelset_total{labelset="{label2=\"value2\"}",reason="per_labelset_series_limit",user="1"} 1 # HELP cortex_discarded_samples_total The total number of samples that were discarded. # TYPE cortex_discarded_samples_total counter cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 2 @@ -328,7 +336,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 0 cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 - `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset", "cortex_discarded_samples_total")) + `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset", "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) // Adding 5 metrics with only 1 label for i := 0; i < 5; i++ { @@ -356,6 +364,13 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { ing.updateActiveSeries(ctx) require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_discarded_samples_per_labelset_total The total number of samples that were discarded for each labelset. + # TYPE cortex_discarded_samples_per_labelset_total counter + cortex_discarded_samples_per_labelset_total{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",reason="per_labelset_series_limit",user="1"} 1 + cortex_discarded_samples_per_labelset_total{labelset="{comp1=\"compValue1\"}",reason="per_labelset_series_limit",user="1"} 1 + cortex_discarded_samples_per_labelset_total{labelset="{comp2=\"compValue2\"}",reason="per_labelset_series_limit",user="1"} 1 + cortex_discarded_samples_per_labelset_total{labelset="{label1=\"value1\"}",reason="per_labelset_series_limit",user="1"} 1 + cortex_discarded_samples_per_labelset_total{labelset="{label2=\"value2\"}",reason="per_labelset_series_limit",user="1"} 1 # HELP cortex_discarded_samples_total The total number of samples that were discarded. # TYPE cortex_discarded_samples_total counter cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 3 @@ -373,7 +388,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7 cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 2 - `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset", "cortex_discarded_samples_total")) + `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset", "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) // Should bootstrap and apply limits when configuration change limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, @@ -659,6 +674,26 @@ func TestIngesterUserLimitExceeded(t *testing.T) { limits.MaxLocalSeriesPerUser = 1 limits.MaxLocalMetricsWithMetadataPerUser = 1 + userID := "1" + // Series + labels1 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}} + sample1 := cortexpb.Sample{ + TimestampMs: 0, + Value: 1, + } + sample2 := cortexpb.Sample{ + TimestampMs: 1, + Value: 2, + } + labels3 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "biz"}} + sample3 := cortexpb.Sample{ + TimestampMs: 1, + Value: 3, + } + // Metadata + metadata1 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric", Help: "a help for testmetric", Type: cortexpb.COUNTER} + metadata2 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric2", Help: "a help for testmetric2", Type: cortexpb.COUNTER} + dir := t.TempDir() chunksDir := filepath.Join(dir, "chunks") @@ -666,8 +701,8 @@ func TestIngesterUserLimitExceeded(t *testing.T) { require.NoError(t, os.Mkdir(chunksDir, os.ModePerm)) require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) - blocksIngesterGenerator := func() *Ingester { - ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, nil, blocksDir, prometheus.NewRegistry(), true) + blocksIngesterGenerator := func(reg prometheus.Registerer) *Ingester { + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, nil, blocksDir, reg, true) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) // Wait until it's ACTIVE @@ -679,36 +714,17 @@ func TestIngesterUserLimitExceeded(t *testing.T) { } tests := []string{"blocks"} - for i, ingGenerator := range []func() *Ingester{blocksIngesterGenerator} { + for i, ingGenerator := range []func(reg prometheus.Registerer) *Ingester{blocksIngesterGenerator} { t.Run(tests[i], func(t *testing.T) { - ing := ingGenerator() - - userID := "1" - // Series - labels1 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}} - sample1 := cortexpb.Sample{ - TimestampMs: 0, - Value: 1, - } - sample2 := cortexpb.Sample{ - TimestampMs: 1, - Value: 2, - } - labels3 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "biz"}} - sample3 := cortexpb.Sample{ - TimestampMs: 1, - Value: 3, - } - // Metadata - metadata1 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric", Help: "a help for testmetric", Type: cortexpb.COUNTER} - metadata2 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric2", Help: "a help for testmetric2", Type: cortexpb.COUNTER} + reg := prometheus.NewRegistry() + ing := ingGenerator(reg) // Append only one series and one metadata first, expect no error. ctx := user.InjectOrgID(context.Background(), userID) _, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1}, []cortexpb.Sample{sample1}, []*cortexpb.MetricMetadata{metadata1}, nil, cortexpb.API)) require.NoError(t, err) - testLimits := func() { + testLimits := func(reg prometheus.Gatherer) { // Append to two series, expect series-exceeded error. _, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1, labels3}, []cortexpb.Sample{sample2, sample3}, nil, nil, cortexpb.API)) httpResp, ok := httpgrpc.HTTPResponseFromError(err) @@ -747,16 +763,24 @@ func TestIngesterUserLimitExceeded(t *testing.T) { m, err := ing.MetricsMetadata(ctx, nil) require.NoError(t, err) assert.Equal(t, []*cortexpb.MetricMetadata{metadata1}, m.Metadata) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="per_user_series_limit",user="1"} 1 + `), "cortex_discarded_samples_total")) } - testLimits() + testLimits(reg) // Limits should hold after restart. services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck - ing = ingGenerator() + // Use new registry to prevent metrics registration panic. + reg = prometheus.NewRegistry() + ing = ingGenerator(reg) defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck - testLimits() + testLimits(reg) }) } @@ -781,6 +805,26 @@ func TestIngesterMetricLimitExceeded(t *testing.T) { limits.MaxLocalSeriesPerMetric = 1 limits.MaxLocalMetadataPerMetric = 1 + userID := "1" + labels1 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}} + sample1 := cortexpb.Sample{ + TimestampMs: 0, + Value: 1, + } + sample2 := cortexpb.Sample{ + TimestampMs: 1, + Value: 2, + } + labels3 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "biz"}} + sample3 := cortexpb.Sample{ + TimestampMs: 1, + Value: 3, + } + + // Metadata + metadata1 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric", Help: "a help for testmetric", Type: cortexpb.COUNTER} + metadata2 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric", Help: "a help for testmetric2", Type: cortexpb.COUNTER} + dir := t.TempDir() chunksDir := filepath.Join(dir, "chunks") @@ -788,8 +832,8 @@ func TestIngesterMetricLimitExceeded(t *testing.T) { require.NoError(t, os.Mkdir(chunksDir, os.ModePerm)) require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) - blocksIngesterGenerator := func() *Ingester { - ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, nil, blocksDir, prometheus.NewRegistry(), true) + blocksIngesterGenerator := func(reg prometheus.Registerer) *Ingester { + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, nil, blocksDir, reg, true) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) // Wait until it's ACTIVE @@ -801,36 +845,17 @@ func TestIngesterMetricLimitExceeded(t *testing.T) { } tests := []string{"chunks", "blocks"} - for i, ingGenerator := range []func() *Ingester{blocksIngesterGenerator} { + for i, ingGenerator := range []func(reg prometheus.Registerer) *Ingester{blocksIngesterGenerator} { t.Run(tests[i], func(t *testing.T) { - ing := ingGenerator() - - userID := "1" - labels1 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}} - sample1 := cortexpb.Sample{ - TimestampMs: 0, - Value: 1, - } - sample2 := cortexpb.Sample{ - TimestampMs: 1, - Value: 2, - } - labels3 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "biz"}} - sample3 := cortexpb.Sample{ - TimestampMs: 1, - Value: 3, - } - - // Metadata - metadata1 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric", Help: "a help for testmetric", Type: cortexpb.COUNTER} - metadata2 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric", Help: "a help for testmetric2", Type: cortexpb.COUNTER} + reg := prometheus.NewRegistry() + ing := ingGenerator(reg) // Append only one series and one metadata first, expect no error. ctx := user.InjectOrgID(context.Background(), userID) _, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1}, []cortexpb.Sample{sample1}, []*cortexpb.MetricMetadata{metadata1}, nil, cortexpb.API)) require.NoError(t, err) - testLimits := func() { + testLimits := func(reg prometheus.Gatherer) { // Append two series, expect series-exceeded error. _, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1, labels3}, []cortexpb.Sample{sample2, sample3}, nil, nil, cortexpb.API)) httpResp, ok := httpgrpc.HTTPResponseFromError(err) @@ -869,16 +894,23 @@ func TestIngesterMetricLimitExceeded(t *testing.T) { m, err := ing.MetricsMetadata(ctx, nil) require.NoError(t, err) assert.Equal(t, []*cortexpb.MetricMetadata{metadata1}, m.Metadata) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="per_metric_series_limit",user="1"} 1 + `), "cortex_discarded_samples_total")) } - testLimits() + testLimits(reg) // Limits should hold after restart. services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck - ing = ingGenerator() + reg = prometheus.NewRegistry() + ing = ingGenerator(reg) defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck - testLimits() + testLimits(reg) }) } } @@ -1717,6 +1749,16 @@ func TestIngester_Push(t *testing.T) { limits := defaultLimitsTestConfig() limits.MaxExemplars = testData.maxExemplars limits.OutOfOrderTimeWindow = model.Duration(testData.oooTimeWindow) + limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{ + { + LabelSet: labels.FromMap(map[string]string{model.MetricNameLabel: "test"}), + Hash: 0, + }, + { + LabelSet: labels.EmptyLabels(), + Hash: 1, + }, + } i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry, !testData.disableNativeHistogram) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -6190,6 +6232,84 @@ func TestIngester_UserTSDB_BlocksToDelete(t *testing.T) { }) } +func TestIngester_UpdateLabelSetMetrics(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} + reg := prometheus.NewRegistry() + limits := defaultLimitsTestConfig() + userID := "1" + ctx := user.InjectOrgID(context.Background(), userID) + + limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{ + { + LabelSet: labels.FromMap(map[string]string{ + "foo": "bar", + }), + Limits: validation.LimitsPerLabelSetEntry{ + MaxSeries: 1, + }, + }, + { + LabelSet: labels.EmptyLabels(), + }, + } + tenantLimits := newMockTenantLimits(map[string]*validation.Limits{userID: &limits}) + + dir := t.TempDir() + chunksDir := filepath.Join(dir, "chunks") + blocksDir := filepath.Join(dir, "blocks") + require.NoError(t, os.Mkdir(chunksDir, os.ModePerm)) + require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) + + i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, tenantLimits, blocksDir, reg, false) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + // Wait until it's ACTIVE + test.Poll(t, time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + // Add user ID. + wreq := &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + {TimeSeries: &cortexpb.TimeSeries{ + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{model.MetricNameLabel: "test", "foo": "bar"})), + Samples: []cortexpb.Sample{{Value: 0, TimestampMs: 1}}, + }}, + }, + } + _, err = i.Push(ctx, wreq) + require.NoError(t, err) + + // Push one more series will trigger throttle by label set. + wreq = &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + {TimeSeries: &cortexpb.TimeSeries{ + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{model.MetricNameLabel: "test2", "foo": "bar"})), + Samples: []cortexpb.Sample{{Value: 0, TimestampMs: 0}}, + }}, + }, + } + _, err = i.Push(ctx, wreq) + require.Error(t, err) + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_discarded_samples_per_labelset_total The total number of samples that were discarded for each labelset. + # TYPE cortex_discarded_samples_per_labelset_total counter + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"bar\"}",reason="per_labelset_series_limit",user="1"} 1 + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 1 + `), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) + + // Expect per labelset validate metrics cleaned up. + i.closeAllTSDB() + i.updateLabelSetMetrics() + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 1 + `), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) +} + // mockTenantLimits exposes per-tenant limits based on a provided map type mockTenantLimits struct { limits map[string]*validation.Limits diff --git a/pkg/util/labelset/tracker.go b/pkg/util/labelset/tracker.go new file mode 100644 index 0000000000..6fa703ccb2 --- /dev/null +++ b/pkg/util/labelset/tracker.go @@ -0,0 +1,111 @@ +package labelset + +import ( + "sync" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/cortexproject/cortex/pkg/util" +) + +const ( + numMetricShards = 128 +) + +type LabelSetTracker struct { + shards []*labelSetCounterShard +} + +// NewLabelSetTracker initializes a LabelSetTracker to keep track of active labelset limits. +func NewLabelSetTracker() *LabelSetTracker { + shards := make([]*labelSetCounterShard, 0, numMetricShards) + for i := 0; i < numMetricShards; i++ { + shards = append(shards, &labelSetCounterShard{ + RWMutex: &sync.RWMutex{}, + userLabelSets: map[string]map[uint64]labels.Labels{}, + }) + } + return &LabelSetTracker{shards: shards} +} + +type labelSetCounterShard struct { + *sync.RWMutex + userLabelSets map[string]map[uint64]labels.Labels +} + +// Track accepts userID, label set and hash of the label set limit. +func (m *LabelSetTracker) Track(userId string, hash uint64, labelSet labels.Labels) { + s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricShards] + s.Lock() + if userEntry, ok := s.userLabelSets[userId]; ok { + if _, ok2 := userEntry[hash]; !ok2 { + userEntry[hash] = labelSet + } + } else { + s.userLabelSets[userId] = map[uint64]labels.Labels{hash: labelSet} + } + // Unlock before we update metrics. + s.Unlock() +} + +// UpdateMetrics cleans up dangling user and label set from the tracker as well as metrics. +// It takes a function for user to customize the metrics cleanup logic when either a user or +// a specific label set is removed. If a user is removed then removeUser is set to true. +func (m *LabelSetTracker) UpdateMetrics(userSet map[string]map[uint64]struct{}, deleteMetricFunc func(user, labelSetStr string, removeUser bool)) { + for i := 0; i < numMetricShards; i++ { + shard := m.shards[i] + shard.Lock() + + for user, userEntry := range shard.userLabelSets { + limits, ok := userSet[user] + // Remove user if it doesn't exist or has no limits anymore. + if !ok || len(limits) == 0 { + deleteMetricFunc(user, "", true) + delete(shard.userLabelSets, user) + continue + } + for h, lbls := range userEntry { + // This limit no longer exists. + if _, ok := limits[h]; !ok { + delete(userEntry, h) + labelSetStr := lbls.String() + deleteMetricFunc(user, labelSetStr, false) + continue + } + } + } + + shard.Unlock() + } +} + +// labelSetExists is used for testing only to check the existence of a label set. +func (m *LabelSetTracker) labelSetExists(userId string, hash uint64, labelSet labels.Labels) bool { + s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricShards] + s.RLock() + defer s.RUnlock() + userEntry, ok := s.userLabelSets[userId] + if !ok { + return false + } + set, ok := userEntry[hash] + if !ok { + return false + } + return labels.Compare(set, labelSet) == 0 +} + +// userExists is used for testing only to check the existence of a user. +func (m *LabelSetTracker) userExists(userId string) bool { + for i := 0; i < numMetricShards; i++ { + shard := m.shards[i] + shard.RLock() + defer shard.RUnlock() + _, ok := shard.userLabelSets[userId] + if ok { + return true + } + } + return false +} diff --git a/pkg/util/labelset/tracker_test.go b/pkg/util/labelset/tracker_test.go new file mode 100644 index 0000000000..1354d3db8c --- /dev/null +++ b/pkg/util/labelset/tracker_test.go @@ -0,0 +1,145 @@ +package labelset + +import ( + "fmt" + "strconv" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestLabelSetTracker(t *testing.T) { + tracker := NewLabelSetTracker() + userID := "1" + userID2 := "2" + userID3 := "3" + + tracker.Track(userID, 0, labels.FromStrings("foo", "bar")) + require.True(t, tracker.labelSetExists(userID, 0, labels.FromStrings("foo", "bar"))) + tracker.Track(userID, 1, labels.FromStrings("foo", "baz")) + require.True(t, tracker.labelSetExists(userID, 1, labels.FromStrings("foo", "baz"))) + tracker.Track(userID, 3, labels.EmptyLabels()) + require.True(t, tracker.labelSetExists(userID, 3, labels.EmptyLabels())) + tracker.Track(userID2, 0, labels.FromStrings("foo", "bar")) + require.True(t, tracker.labelSetExists(userID2, 0, labels.FromStrings("foo", "bar"))) + tracker.Track(userID2, 2, labels.FromStrings("cluster", "us-west-2")) + require.True(t, tracker.labelSetExists(userID2, 2, labels.FromStrings("cluster", "us-west-2"))) + + // Increment metrics and add a new user. + tracker.Track(userID, 3, labels.EmptyLabels()) + require.True(t, tracker.labelSetExists(userID, 3, labels.EmptyLabels())) + tracker.Track(userID2, 2, labels.FromStrings("cluster", "us-west-2")) + require.True(t, tracker.labelSetExists(userID2, 2, labels.FromStrings("cluster", "us-west-2"))) + tracker.Track(userID2, 4, labels.FromStrings("cluster", "us-west-2")) + require.True(t, tracker.labelSetExists(userID2, 4, labels.FromStrings("cluster", "us-west-2"))) + tracker.Track(userID3, 4, labels.FromStrings("cluster", "us-east-1")) + require.True(t, tracker.labelSetExists(userID3, 4, labels.FromStrings("cluster", "us-east-1"))) + + // Remove user 2. + userSet := map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}}, + userID3: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}}, + } + tracker.UpdateMetrics(userSet, noopDeleteMetrics) + // user 2 removed. + require.False(t, tracker.userExists(userID2)) + // user 1 and 3 remain unchanged. + require.True(t, tracker.labelSetExists(userID, 0, labels.FromStrings("foo", "bar"))) + require.True(t, tracker.labelSetExists(userID, 1, labels.FromStrings("foo", "baz"))) + require.True(t, tracker.labelSetExists(userID, 3, labels.EmptyLabels())) + require.True(t, tracker.labelSetExists(userID3, 4, labels.FromStrings("cluster", "us-east-1"))) + + // Simulate existing limits removed for each user. + userSet = map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}}, + userID2: {}, + userID3: {}, + } + tracker.UpdateMetrics(userSet, noopDeleteMetrics) + // User 2 and 3 removed. User 1 exists. + require.True(t, tracker.userExists(userID)) + require.False(t, tracker.userExists(userID2)) + require.False(t, tracker.userExists(userID3)) + + require.True(t, tracker.labelSetExists(userID, 0, labels.FromStrings("foo", "bar"))) + require.False(t, tracker.labelSetExists(userID, 1, labels.FromStrings("foo", "baz"))) + require.False(t, tracker.labelSetExists(userID, 3, labels.EmptyLabels())) + require.False(t, tracker.labelSetExists(userID3, 4, labels.FromStrings("cluster", "us-east-1"))) +} + +func noopDeleteMetrics(user, labelSetStr string, removeUser bool) {} + +func TestLabelSetTracker_UpdateMetrics(t *testing.T) { + tracker := NewLabelSetTracker() + userID := "1" + userID2 := "2" + userID3 := "3" + lbls := labels.FromStrings("foo", "bar") + lbls2 := labels.FromStrings("foo", "baz") + tracker.Track(userID, 0, lbls) + tracker.Track(userID, 1, lbls2) + tracker.Track(userID2, 0, lbls) + tracker.Track(userID2, 1, lbls2) + tracker.Track(userID3, 0, lbls) + tracker.Track(userID3, 1, lbls2) + + deleteCalls := make(map[string]struct{}) + mockDeleteMetrics := func(user, labelSetStr string, removeUser bool) { + deleteCalls[formatDeleteCallString(user, labelSetStr, removeUser)] = struct{}{} + } + + userSet := map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}, 1: struct{}{}}, + userID2: {0: struct{}{}, 1: struct{}{}}, + userID3: {0: struct{}{}, 1: struct{}{}}, + } + // No user or label set removed, no change. + tracker.UpdateMetrics(userSet, mockDeleteMetrics) + require.Equal(t, 0, len(deleteCalls)) + + userSet = map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}, 1: struct{}{}}, + userID2: {1: struct{}{}}, + userID3: {0: struct{}{}}, + } + // LabelSet removed from user 2 and 3 + tracker.UpdateMetrics(userSet, mockDeleteMetrics) + require.Equal(t, 2, len(deleteCalls)) + _, ok := deleteCalls[formatDeleteCallString(userID2, lbls.String(), false)] + require.True(t, ok) + _, ok = deleteCalls[formatDeleteCallString(userID3, lbls2.String(), false)] + require.True(t, ok) + + userSet = map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}, 1: struct{}{}}, + userID2: {1: struct{}{}}, + userID3: {}, + } + // User 3 doesn't have limits anymore. Remove user. + tracker.UpdateMetrics(userSet, mockDeleteMetrics) + require.Equal(t, 3, len(deleteCalls)) + _, ok = deleteCalls[formatDeleteCallString(userID3, "", true)] + require.True(t, ok) + + userSet = map[string]map[uint64]struct { + }{ + userID3: {}, + } + // Remove user 1 and 2. + tracker.UpdateMetrics(userSet, mockDeleteMetrics) + require.Equal(t, 5, len(deleteCalls)) + _, ok = deleteCalls[formatDeleteCallString(userID, "", true)] + require.True(t, ok) + _, ok = deleteCalls[formatDeleteCallString(userID2, "", true)] + require.True(t, ok) +} + +func formatDeleteCallString(user, labelSetStr string, removeUser bool) string { + return fmt.Sprintf("%s,%s,%s", user, labelSetStr, strconv.FormatBool(removeUser)) +} diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 3e36ddf529..26d352e29b 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -12,11 +12,13 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" + "github.com/cortexproject/cortex/pkg/util/labelset" ) const ( @@ -79,6 +81,9 @@ type ValidateMetrics struct { DiscardedMetadata *prometheus.CounterVec HistogramSamplesReducedResolution *prometheus.CounterVec LabelSizeBytes *prometheus.HistogramVec + + DiscardedSamplesPerLabelSet *prometheus.CounterVec + LabelSetTracker *labelset.LabelSetTracker } func registerCollector(r prometheus.Registerer, c prometheus.Collector) { @@ -97,6 +102,14 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { []string{discardReasonLabel, "user"}, ) registerCollector(r, discardedSamples) + discardedSamplesPerLabelSet := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cortex_discarded_samples_per_labelset_total", + Help: "The total number of samples that were discarded for each labelset.", + }, + []string{discardReasonLabel, "user", "labelset"}, + ) + registerCollector(r, discardedSamplesPerLabelSet) discardedExemplars := prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cortex_discarded_exemplars_total", @@ -132,15 +145,50 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { m := &ValidateMetrics{ DiscardedSamples: discardedSamples, + DiscardedSamplesPerLabelSet: discardedSamplesPerLabelSet, DiscardedExemplars: discardedExemplars, DiscardedMetadata: discardedMetadata, HistogramSamplesReducedResolution: histogramSamplesReducedResolution, LabelSizeBytes: labelSizeBytes, + LabelSetTracker: labelset.NewLabelSetTracker(), } return m } +// UpdateSamplesDiscardedForSeries updates discarded samples and discarded samples per labelset for the provided reason and series. +// Used in test only for now. +func (m *ValidateMetrics) updateSamplesDiscardedForSeries(userID, reason string, labelSetLimits []LimitsPerLabelSet, lbls labels.Labels, count int) { + matchedLimits := LimitsPerLabelSetsForSeries(labelSetLimits, lbls) + m.updateSamplesDiscarded(userID, reason, matchedLimits, count) +} + +// updateSamplesDiscarded updates discarded samples and discarded samples per labelset for the provided reason. +// The provided label set needs to be pre-filtered to match the series if applicable. +// Used in test only for now. +func (m *ValidateMetrics) updateSamplesDiscarded(userID, reason string, labelSetLimits []LimitsPerLabelSet, count int) { + m.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(count)) + for _, limit := range labelSetLimits { + m.LabelSetTracker.Track(userID, limit.Hash, limit.LabelSet) + m.DiscardedSamplesPerLabelSet.WithLabelValues(reason, userID, limit.LabelSet.String()).Add(float64(count)) + } +} + +func (m *ValidateMetrics) UpdateLabelSet(userSet map[string]map[uint64]struct{}, logger log.Logger) { + m.LabelSetTracker.UpdateMetrics(userSet, func(user, labelSetStr string, removeUser bool) { + if removeUser { + // No need to clean up discarded samples per user here as it will be cleaned up elsewhere. + if err := util.DeleteMatchingLabels(m.DiscardedSamplesPerLabelSet, map[string]string{"user": user}); err != nil { + level.Warn(logger).Log("msg", "failed to remove cortex_discarded_samples_per_labelset_total metric for user", "user", user, "err", err) + } + return + } + if err := util.DeleteMatchingLabels(m.DiscardedSamplesPerLabelSet, map[string]string{"user": user, "labelset": labelSetStr}); err != nil { + level.Warn(logger).Log("msg", "failed to remove cortex_discarded_samples_per_labelset_total metric", "user", user, "labelset", labelSetStr, "err", err) + } + }) +} + // ValidateSampleTimestamp returns an err if the sample timestamp is invalid. // The returned error may retain the provided series labels. func ValidateSampleTimestamp(validateMetrics *ValidateMetrics, limits *Limits, userID string, ls []cortexpb.LabelAdapter, timestampMs int64) ValidationError { @@ -355,6 +403,9 @@ func DeletePerUserValidationMetrics(validateMetrics *ValidateMetrics, userID str if err := util.DeleteMatchingLabels(validateMetrics.DiscardedSamples, filter); err != nil { level.Warn(log).Log("msg", "failed to remove cortex_discarded_samples_total metric for user", "user", userID, "err", err) } + if err := util.DeleteMatchingLabels(validateMetrics.DiscardedSamplesPerLabelSet, filter); err != nil { + level.Warn(log).Log("msg", "failed to remove cortex_discarded_samples_per_labelset_total metric for user", "user", userID, "err", err) + } if err := util.DeleteMatchingLabels(validateMetrics.DiscardedExemplars, filter); err != nil { level.Warn(log).Log("msg", "failed to remove cortex_discarded_exemplars_total metric for user", "user", userID, "err", err) } diff --git a/pkg/util/validation/validate_test.go b/pkg/util/validation/validate_test.go index 6b6fb00bbf..80eff18691 100644 --- a/pkg/util/validation/validate_test.go +++ b/pkg/util/validation/validate_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" @@ -31,6 +32,21 @@ func TestValidateLabels(t *testing.T) { cfg.MaxLabelNamesPerSeries = 2 cfg.MaxLabelsSizeBytes = 90 cfg.EnforceMetricName = true + cfg.LimitsPerLabelSet = []LimitsPerLabelSet{ + { + Limits: LimitsPerLabelSetEntry{MaxSeries: 0}, + LabelSet: labels.FromMap(map[string]string{ + model.MetricNameLabel: "foo", + }), + Hash: 0, + }, + // Default partition + { + Limits: LimitsPerLabelSetEntry{MaxSeries: 0}, + LabelSet: labels.EmptyLabels(), + Hash: 1, + }, + } for _, c := range []struct { metric model.Metric @@ -269,6 +285,12 @@ func TestValidateLabelOrder(t *testing.T) { {Name: "a", Value: "a"}, }, "a") assert.Equal(t, expected, actual) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="labels_not_sorted",user="testUser"} 1 + `), "cortex_discarded_samples_total")) } func TestValidateLabelDuplication(t *testing.T) { @@ -301,6 +323,12 @@ func TestValidateLabelDuplication(t *testing.T) { {Name: "a", Value: "a"}, }, "a") assert.Equal(t, expected, actual) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="duplicate_label_names",user="testUser"} 2 + `), "cortex_discarded_samples_total")) } func TestValidateNativeHistogram(t *testing.T) { @@ -418,3 +446,132 @@ func TestValidateNativeHistogram(t *testing.T) { }) } } + +func TestValidateMetrics_UpdateSamplesDiscardedForSeries(t *testing.T) { + reg := prometheus.NewRegistry() + v := NewValidateMetrics(reg) + userID := "user" + limits := []LimitsPerLabelSet{ + { + LabelSet: labels.FromMap(map[string]string{"foo": "bar"}), + Hash: 0, + }, + { + LabelSet: labels.FromMap(map[string]string{"foo": "baz"}), + Hash: 1, + }, + { + LabelSet: labels.EmptyLabels(), + Hash: 2, + }, + } + v.updateSamplesDiscardedForSeries(userID, "dummy", limits, labels.FromMap(map[string]string{"foo": "bar"}), 100) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_discarded_samples_per_labelset_total The total number of samples that were discarded for each labelset. + # TYPE cortex_discarded_samples_per_labelset_total counter + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"bar\"}",reason="dummy",user="user"} 100 + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="dummy",user="user"} 100 + `), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) + + v.updateSamplesDiscardedForSeries(userID, "out-of-order", limits, labels.FromMap(map[string]string{"foo": "baz"}), 1) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_discarded_samples_per_labelset_total The total number of samples that were discarded for each labelset. + # TYPE cortex_discarded_samples_per_labelset_total counter + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"bar\"}",reason="dummy",user="user"} 100 + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"baz\"}",reason="out-of-order",user="user"} 1 + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="dummy",user="user"} 100 + cortex_discarded_samples_total{reason="out-of-order",user="user"} 1 + `), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) + + // Match default partition. + v.updateSamplesDiscardedForSeries(userID, "too-old", limits, labels.FromMap(map[string]string{"foo": "foo"}), 1) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_discarded_samples_per_labelset_total The total number of samples that were discarded for each labelset. + # TYPE cortex_discarded_samples_per_labelset_total counter + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"bar\"}",reason="dummy",user="user"} 100 + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"baz\"}",reason="out-of-order",user="user"} 1 + cortex_discarded_samples_per_labelset_total{labelset="{}",reason="too-old",user="user"} 1 + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="dummy",user="user"} 100 + cortex_discarded_samples_total{reason="out-of-order",user="user"} 1 + cortex_discarded_samples_total{reason="too-old",user="user"} 1 + `), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) +} + +func TestValidateMetrics_UpdateLabelSet(t *testing.T) { + reg := prometheus.NewRegistry() + v := NewValidateMetrics(reg) + userID := "user" + logger := log.NewNopLogger() + limits := []LimitsPerLabelSet{ + { + LabelSet: labels.FromMap(map[string]string{"foo": "bar"}), + Hash: 0, + }, + { + LabelSet: labels.FromMap(map[string]string{"foo": "baz"}), + Hash: 1, + }, + { + LabelSet: labels.EmptyLabels(), + Hash: 2, + }, + } + + v.updateSamplesDiscarded(userID, "dummy", limits, 100) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_discarded_samples_per_labelset_total The total number of samples that were discarded for each labelset. + # TYPE cortex_discarded_samples_per_labelset_total counter + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"bar\"}",reason="dummy",user="user"} 100 + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"baz\"}",reason="dummy",user="user"} 100 + cortex_discarded_samples_per_labelset_total{labelset="{}",reason="dummy",user="user"} 100 + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="dummy",user="user"} 100 + `), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) + + // Remove default partition. + userSet := map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}, 1: struct{}{}}, + } + v.UpdateLabelSet(userSet, logger) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_discarded_samples_per_labelset_total The total number of samples that were discarded for each labelset. + # TYPE cortex_discarded_samples_per_labelset_total counter + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"bar\"}",reason="dummy",user="user"} 100 + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"baz\"}",reason="dummy",user="user"} 100 + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="dummy",user="user"} 100 + `), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) + + // Remove limit 1. + userSet = map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}}, + } + v.UpdateLabelSet(userSet, logger) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_discarded_samples_per_labelset_total The total number of samples that were discarded for each labelset. + # TYPE cortex_discarded_samples_per_labelset_total counter + cortex_discarded_samples_per_labelset_total{labelset="{foo=\"bar\"}",reason="dummy",user="user"} 100 + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="dummy",user="user"} 100 + `), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) + + // Remove user. + v.UpdateLabelSet(nil, logger) + // cortex_discarded_samples_total metric still exists as it should be cleaned up in another loop. + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="dummy",user="user"} 100 + `), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total")) +}