From d0f253c3e3a3dfd2d81d1a6049b50c8967bbe752 Mon Sep 17 00:00:00 2001 From: Harry John Date: Sun, 8 Sep 2024 21:20:02 -0700 Subject: [PATCH] StoreGateway: Implement metadata API limit in queryable (#6195) --- CHANGELOG.md | 3 +- pkg/querier/blocks_store_queryable.go | 43 +++++-- pkg/querier/blocks_store_queryable_test.go | 133 ++++++++++++++++++++- 3 files changed, 160 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 590d7c5bd7..b748b3d5db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ * [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163 * [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173 * [ENHANCEMENT] Ingester: Add new API `/ingester/all_user_stats` which shows loaded blocks, active timeseries and ingestion rate for a specific ingester. #6178 -* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to to track the number of histogram samples which resolution was reduced. #6182 +* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182 +* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195 ## 1.18.0 2024-09-03 diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index caf4de3d6b..e85f671783 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -345,7 +345,11 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe spanLog, spanCtx := spanlogger.New(ctx, "blocksStoreQuerier.LabelNames") defer spanLog.Span.Finish() - minT, maxT := q.minT, q.maxT + minT, maxT, limit := q.minT, q.maxT, int64(0) + + if hints != nil { + limit = int64(hints.Limit) + } var ( resMtx sync.Mutex @@ -355,7 +359,7 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe ) queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) { - nameSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelNamesFromStore(spanCtx, userID, clients, minT, maxT, convertedMatchers) + nameSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelNamesFromStore(spanCtx, userID, clients, minT, maxT, limit, convertedMatchers) if err != nil { return nil, err, retryableError } @@ -372,6 +376,7 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe return nil, nil, err } + // TODO(johrry): pass limit when merging. return strutil.MergeSlices(resNameSets...), resWarnings, nil } @@ -384,7 +389,11 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints spanLog, spanCtx := spanlogger.New(ctx, "blocksStoreQuerier.LabelValues") defer spanLog.Span.Finish() - minT, maxT := q.minT, q.maxT + minT, maxT, limit := q.minT, q.maxT, int64(0) + + if hints != nil { + limit = int64(hints.Limit) + } var ( resValueSets = [][]string{} @@ -394,7 +403,7 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints ) queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) { - valueSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelValuesFromStore(spanCtx, userID, name, clients, minT, maxT, matchers...) + valueSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelValuesFromStore(spanCtx, userID, name, clients, minT, maxT, limit, matchers...) if err != nil { return nil, err, retryableError } @@ -411,6 +420,7 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints return nil, nil, err } + // TODO(johrry): pass limit when merging. return strutil.MergeSlices(resValueSets...), resWarnings, nil } @@ -427,9 +437,9 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec spanLog, spanCtx := spanlogger.New(ctx, "blocksStoreQuerier.selectSorted") defer spanLog.Span.Finish() - minT, maxT := q.minT, q.maxT + minT, maxT, limit := q.minT, q.maxT, int64(0) if sp != nil { - minT, maxT = sp.Start, sp.End + minT, maxT, limit = sp.Start, sp.End, int64(sp.Limit) } var ( @@ -443,7 +453,7 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec ) queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) { - seriesSets, queriedBlocks, warnings, numChunks, err, retryableError := q.fetchSeriesFromStores(spanCtx, sp, userID, clients, minT, maxT, matchers, maxChunksLimit, leftChunksLimit) + seriesSets, queriedBlocks, warnings, numChunks, err, retryableError := q.fetchSeriesFromStores(spanCtx, sp, userID, clients, minT, maxT, limit, matchers, maxChunksLimit, leftChunksLimit) if err != nil { return nil, err, retryableError } @@ -471,6 +481,7 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec storage.EmptySeriesSet() } + // TODO(johrry): pass limit when merging. return series.NewSeriesSetWithWarnings( storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge), resWarnings) @@ -593,6 +604,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( clients map[BlocksStoreClient][]ulid.ULID, minT int64, maxT int64, + limit int64, matchers []*labels.Matcher, maxChunksLimit int, leftChunksLimit int, @@ -635,7 +647,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( seriesQueryStats := &hintspb.QueryStats{} skipChunks := sp != nil && sp.Func == "series" - req, err := createSeriesRequest(minT, maxT, convertedMatchers, shardingInfo, skipChunks, blockIDs, defaultAggrs) + req, err := createSeriesRequest(minT, maxT, limit, convertedMatchers, shardingInfo, skipChunks, blockIDs, defaultAggrs) if err != nil { return errors.Wrapf(err, "failed to create series request") } @@ -825,6 +837,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( clients map[BlocksStoreClient][]ulid.ULID, minT int64, maxT int64, + limit int64, matchers []storepb.LabelMatcher, ) ([][]string, annotations.Annotations, []ulid.ULID, error, error) { var ( @@ -846,7 +859,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( blockIDs := blockIDs g.Go(func() error { - req, err := createLabelNamesRequest(minT, maxT, blockIDs, matchers) + req, err := createLabelNamesRequest(minT, maxT, limit, blockIDs, matchers) if err != nil { return errors.Wrapf(err, "failed to create label names request") } @@ -927,6 +940,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( clients map[BlocksStoreClient][]ulid.ULID, minT int64, maxT int64, + limit int64, matchers ...*labels.Matcher, ) ([][]string, annotations.Annotations, []ulid.ULID, error, error) { var ( @@ -948,7 +962,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( blockIDs := blockIDs g.Go(func() error { - req, err := createLabelValuesRequest(minT, maxT, name, blockIDs, matchers...) + req, err := createLabelValuesRequest(minT, maxT, limit, name, blockIDs, matchers...) if err != nil { return errors.Wrapf(err, "failed to create label values request") } @@ -1025,7 +1039,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( return valueSets, warnings, queriedBlocks, nil, merr.Err() } -func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr) (*storepb.SeriesRequest, error) { +func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatcher, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr) (*storepb.SeriesRequest, error) { // Selectively query only specific blocks. hints := &hintspb.SeriesRequestHints{ BlockMatchers: []storepb.LabelMatcher{ @@ -1046,6 +1060,7 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shar return &storepb.SeriesRequest{ MinTime: minT, MaxTime: maxT, + Limit: limit, Matchers: matchers, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, Hints: anyHints, @@ -1057,10 +1072,11 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shar }, nil } -func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID, matchers []storepb.LabelMatcher) (*storepb.LabelNamesRequest, error) { +func createLabelNamesRequest(minT, maxT, limit int64, blockIDs []ulid.ULID, matchers []storepb.LabelMatcher) (*storepb.LabelNamesRequest, error) { req := &storepb.LabelNamesRequest{ Start: minT, End: maxT, + Limit: limit, Matchers: matchers, } @@ -1085,10 +1101,11 @@ func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID, matchers [] return req, nil } -func createLabelValuesRequest(minT, maxT int64, label string, blockIDs []ulid.ULID, matchers ...*labels.Matcher) (*storepb.LabelValuesRequest, error) { +func createLabelValuesRequest(minT, maxT, limit int64, label string, blockIDs []ulid.ULID, matchers ...*labels.Matcher) (*storepb.LabelValuesRequest, error) { req := &storepb.LabelValuesRequest{ Start: minT, End: maxT, + Limit: limit, Label: label, Matchers: convertMatchersToLabelMatcher(matchers), } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index d89aa96f43..dd3725cbac 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -91,6 +91,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses []interface{} limits BlocksStoreLimits queryLimiter *limiter.QueryLimiter + seriesLimit int expectedSeries []seriesResult expectedErr error expectedMetrics string @@ -618,6 +619,48 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { cortex_querier_storegateway_refetches_per_query_count 1 `, }, + "multiple store-gateway instances holds the required blocks with overlapping series with limit (multiple returned series)": { + seriesLimit: 1, + finderResult: bucketindex.Blocks{ + &bucketindex.Block{ID: block1}, + &bucketindex.Block{ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, []cortexpb.Sample{{Value: 2, TimestampMs: minT + 1}}, nil, nil), + mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, []cortexpb.Sample{{Value: 1, TimestampMs: minT}}, nil, nil), + mockHintsResponse(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, []cortexpb.Sample{{Value: 1, TimestampMs: minT}, {Value: 2, TimestampMs: minT + 1}}, nil, nil), + mockHintsResponse(block2), + }}: {block2}, + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, []cortexpb.Sample{{Value: 1, TimestampMs: minT}, {Value: 3, TimestampMs: minT + 1}}, nil, nil), + mockHintsResponse(block3), + }}: {block3}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + // TODO (johrry): Update this after passing limit in merge + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 1}, + {t: minT + 1, v: 2}, + }, + }, { + lbls: labels.New(metricNameLabel, series2Label), + values: []valueResult{ + {t: minT, v: 1}, + {t: minT + 1, v: 3}, + }, + }, + }, + }, "multiple store-gateway instances holds the required blocks with overlapping series (multiple returned histogram series)": { finderResult: bucketindex.Blocks{ &bucketindex.Block{ID: block1}, @@ -1484,6 +1527,15 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() + var hints *storage.SelectHints + if testData.seriesLimit > 0 { + hints = &storage.SelectHints{ + Limit: testData.seriesLimit, + Start: minT, + End: maxT, + } + } + ctx := user.InjectOrgID(context.Background(), "user-1") ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter) reg := prometheus.NewPedanticRegistry() @@ -1506,7 +1558,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), } - set := q.Select(ctx, true, nil, matchers...) + set := q.Select(ctx, true, hints, matchers...) if testData.expectedErr != nil { assert.EqualError(t, set.Err(), testData.expectedErr.Error()) assert.IsType(t, set.Err(), testData.expectedErr) @@ -1594,6 +1646,7 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { tests := map[string]struct { finderResult bucketindex.Blocks finderErr error + limit int storeSetResponses []interface{} expectedLabelNames []string expectedLabelValues []string // For __name__ @@ -1800,6 +1853,61 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { cortex_querier_storegateway_refetches_per_query_count 1 `, }, + "multiple store-gateway instances holds the required blocks with overlapping series with limit (multiple returned series)": { + limit: 2, + finderResult: bucketindex.Blocks{ + &bucketindex.Block{ID: block1}, + &bucketindex.Block{ID: block2}, + }, + // Block1 has series1 and series2 + // Block2 has only series1 + // Block3 has only series2 + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1, series2)[:2], + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1, series2)[:2], + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1)[:2], + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2}, + &storeGatewayClientMock{ + remoteAddr: "3.3.3.3", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2)[:2], + Warnings: []string{}, + Hints: mockNamesHints(block3), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series2), + Warnings: []string{}, + Hints: mockValuesHints(block3), + }, + }: {block3}, + }, + }, + expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), + }, "a single store-gateway instance has some missing blocks (consistency check failed)": { finderResult: bucketindex.Blocks{ &bucketindex.Block{ID: block1}, @@ -2012,6 +2120,13 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { for testName, testData := range tests { testData := testData + var hints *storage.LabelHints + if testData.limit > 0 { + hints = &storage.LabelHints{ + Limit: testData.limit, + } + } + t.Run(testName, func(t *testing.T) { t.Parallel() @@ -2036,7 +2151,7 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { } if testFunc == "LabelNames" { - names, warnings, err := q.LabelNames(ctx, nil) + names, warnings, err := q.LabelNames(ctx, hints) if testData.expectedErr != "" { require.Equal(t, testData.expectedErr, err.Error()) continue @@ -2053,7 +2168,7 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { } if testFunc == "LabelValues" { - values, warnings, err := q.LabelValues(ctx, labels.MetricName, nil) + values, warnings, err := q.LabelValues(ctx, labels.MetricName, hints) if testData.expectedErr != "" { require.Equal(t, testData.expectedErr, err.Error()) continue @@ -2339,6 +2454,7 @@ type storeGatewayClientMock struct { func (m *storeGatewayClientMock) Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (storegatewaypb.StoreGateway_SeriesClient, error) { seriesClient := &storeGatewaySeriesClientMock{ + limit: in.Limit, mockedResponses: m.mockedSeriesResponses, mockedSeriesStreamErr: m.mockedSeriesStreamErr, } @@ -2346,11 +2462,17 @@ func (m *storeGatewayClientMock) Series(ctx context.Context, in *storepb.SeriesR return seriesClient, m.mockedSeriesErr } -func (m *storeGatewayClientMock) LabelNames(context.Context, *storepb.LabelNamesRequest, ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { +func (m *storeGatewayClientMock) LabelNames(_ context.Context, r *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { + if r.Limit > 0 && len(m.mockedLabelNamesResponse.Names) > int(r.Limit) { + m.mockedLabelNamesResponse.Names = m.mockedLabelNamesResponse.Names[:r.Limit] + } return m.mockedLabelNamesResponse, nil } -func (m *storeGatewayClientMock) LabelValues(context.Context, *storepb.LabelValuesRequest, ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { +func (m *storeGatewayClientMock) LabelValues(_ context.Context, r *storepb.LabelValuesRequest, _ ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { + if r.Limit > 0 && len(m.mockedLabelValuesResponse.Values) > int(r.Limit) { + m.mockedLabelNamesResponse.Names = m.mockedLabelValuesResponse.Values[:r.Limit] + } return m.mockedLabelValuesResponse, m.mockedLabelValuesErr } @@ -2361,6 +2483,7 @@ func (m *storeGatewayClientMock) RemoteAddress() string { type storeGatewaySeriesClientMock struct { grpc.ClientStream + limit int64 mockedResponses []*storepb.SeriesResponse mockedSeriesStreamErr error }