From ec03c226783930ea26d0178806540023e4d662cb Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 25 Nov 2024 20:11:22 +0900 Subject: [PATCH] Allow ruler to retrieve proto format query response Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 5 + go.mod | 2 +- integration/ruler_test.go | 85 +++++--- pkg/cortex/modules.go | 2 +- pkg/querier/codec/protobuf_codec_test.go | 182 ++++++++++++++++ .../tripperware/instantquery/instant_query.go | 42 +++- .../instantquery/instant_query_test.go | 82 +++++++- pkg/querier/tripperware/query.go | 8 +- .../tripperware/queryrange/marshaling_test.go | 2 +- .../tripperware/queryrange/query_range.go | 2 +- .../queryrange/query_range_test.go | 4 +- .../queryrange/split_by_interval_test.go | 2 +- pkg/querier/tripperware/roundtrip.go | 2 +- pkg/querier/tripperware/roundtrip_test.go | 2 +- pkg/ruler/frontend_client.go | 45 +++- pkg/ruler/frontend_client_pool.go | 4 +- pkg/ruler/frontend_client_test.go | 195 +++++++++++++++++- pkg/ruler/frontend_decoder.go | 76 +++++++ pkg/ruler/frontend_decoder_test.go | 118 ++++++++++- pkg/ruler/ruler.go | 20 +- 21 files changed, 805 insertions(+), 76 deletions(-) create mode 100644 pkg/querier/codec/protobuf_codec_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ebdbdce8737..31c0ea56722 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [CHANGE] Change all max async concurrency default values `50` to `3` #6268 * [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265 * [CHANGE] Enable Compactor and Alertmanager in target all. #6204 +* [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345 * [FEATURE] Ruler: Pagination support for List Rules API. #6299 * [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527 * [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 67ae897e024..b166d2ced4a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4262,6 +4262,11 @@ The `ruler_config` configures the Cortex ruler. # CLI flag: -ruler.frontend-address [frontend_address: | default = ""] +# [Experimental] Query response format to get query results from Query Frontend +# when the rule evaluation. Supported values: json,protobuf +# CLI flag: -ruler.query-response-format +[query_response_format: | default = "protobuf"] + frontend_client: # gRPC client max receive message size (bytes). # CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size diff --git a/go.mod b/go.mod index c2d979d1c18..01cc831abc1 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 github.com/cespare/xxhash/v2 v2.3.0 github.com/google/go-cmp v0.6.0 + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/sercand/kuberesolver/v4 v4.0.0 go.opentelemetry.io/collector/pdata v1.20.0 golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 @@ -186,7 +187,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/ncw/swift v1.0.53 // indirect github.com/oklog/run v1.1.0 // indirect diff --git a/integration/ruler_test.go b/integration/ruler_test.go index f7d16507d1a..0164a09c918 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -1670,42 +1670,61 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) { distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") require.NoError(t, s.StartAndWaitReady(distributor, ingester)) - queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") - require.NoError(t, s.Start(queryFrontend)) - - ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(), - }), "") - querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), - }), "") - require.NoError(t, s.StartAndWaitReady(ruler, querier)) - - c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user) - require.NoError(t, err) + for _, format := range []string{"protobuf", "json"} { + t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) { + queryFrontendFlag := mergeFlags(flags, map[string]string{ + "-ruler.query-response-format": format, + }) + queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "") + require.NoError(t, s.Start(queryFrontend)) - expression := "metric" - groupName := "rule_group" - ruleName := "rule_name" - require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace)) + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(queryFrontendFlag, map[string]string{ + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier)) - rgMatcher := ruleGroupMatcher(user, namespace, groupName) - // Wait until ruler has loaded the group. - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) - // Wait until rule group has tried to evaluate the rule. - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + rulerFlag := mergeFlags(queryFrontendFlag, map[string]string{ + "-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }) + ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), rulerFlag, "") + require.NoError(t, s.StartAndWaitReady(ruler)) - matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user) - // Check that cortex_ruler_query_frontend_clients went up - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics)) - // Check that cortex_ruler_queries_total went up - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) - // Check that cortex_ruler_queries_failed_total is zero - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) - // Check that cortex_ruler_write_requests_total went up - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) - // Check that cortex_ruler_write_requests_failed_total is zero - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + t.Cleanup(func() { + _ = s.Stop(ruler) + _ = s.Stop(queryFrontend) + _ = s.Stop(querier) + }) + + c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user) + require.NoError(t, err) + + expression := "metric" // vector + //expression := "scalar(count(up == 1)) > bool 1" // scalar + groupName := "rule_group" + ruleName := "rule_name" + require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace)) + + rgMatcher := ruleGroupMatcher(user, namespace, groupName) + // Wait until ruler has loaded the group. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + // Wait until rule group has tried to evaluate the rule. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + // Make sure not to fail + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + + matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user) + // Check that cortex_ruler_query_frontend_clients went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics)) + // Check that cortex_ruler_queries_total went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_queries_failed_total is zero + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_write_requests_total went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_write_requests_failed_total is zero + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + }) + } } func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 222d321168d..720b2cf76fd 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -474,7 +474,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) - instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) + instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec, t.Cfg.Ruler.QueryResponseFormat) queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, diff --git a/pkg/querier/codec/protobuf_codec_test.go b/pkg/querier/codec/protobuf_codec_test.go new file mode 100644 index 00000000000..82125c11f86 --- /dev/null +++ b/pkg/querier/codec/protobuf_codec_test.go @@ -0,0 +1,182 @@ +package codec + +import ( + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + v1 "github.com/prometheus/prometheus/web/api/v1" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/tripperware" +) + +func TestPrometheusResponse_ProtoMarshalUnMarshal(t *testing.T) { + var tests = []struct { + name string + resp *v1.Response + expectedUnmarshalResp tripperware.PrometheusResponse + }{ + { + name: "vector", + resp: &v1.Response{ + Status: "success", + Data: &v1.QueryData{ + ResultType: "vector", + Result: promql.Vector{ + { + Metric: labels.FromStrings("name", "value"), + T: 1234, + F: 5.67, + }, + }, + }, + }, + expectedUnmarshalResp: tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "vector", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("name", "value")), + Sample: &cortexpb.Sample{ + TimestampMs: 1234, + Value: 5.67, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "matrix", + resp: &v1.Response{ + Status: "success", + Data: &v1.QueryData{ + ResultType: "matrix", + Result: promql.Matrix{ + { + Metric: labels.FromStrings("name1", "value1"), + Floats: []promql.FPoint{ + {T: 12, F: 3.4}, + {T: 56, F: 7.8}, + }, + }, + { + Metric: labels.FromStrings("name2", "value2"), + Floats: []promql.FPoint{ + {T: 12, F: 3.4}, + {T: 56, F: 7.8}, + }, + }, + }, + }, + }, + expectedUnmarshalResp: tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "matrix", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("name1", "value1")), + Samples: []cortexpb.Sample{ + { + TimestampMs: 12, + Value: 3.4, + }, + { + TimestampMs: 56, + Value: 7.8, + }, + }, + }, + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("name2", "value2")), + Samples: []cortexpb.Sample{ + { + TimestampMs: 12, + Value: 3.4, + }, + { + TimestampMs: 56, + Value: 7.8, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "scalar", + resp: &v1.Response{ + Status: "success", + Data: &v1.QueryData{ + ResultType: "scalar", + Result: promql.Scalar{T: 1000, V: 2}, + }, + }, + expectedUnmarshalResp: tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "scalar", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"scalar","result":[1,"2"]}`), + }, + }, + }, + }, + }, + { + name: "string", + resp: &v1.Response{ + Status: "success", + Data: &v1.QueryData{ + ResultType: "string", + Result: promql.String{T: 1000, V: "2"}, + }, + }, + expectedUnmarshalResp: tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "string", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"string","result":[1,"2"]}`), + }, + }, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + prometheusResponse, err := createPrometheusQueryResponse(test.resp) + require.NoError(t, err) + + b, err := proto.Marshal(prometheusResponse) + require.NoError(t, err) + + var resp tripperware.PrometheusResponse + err = proto.Unmarshal(b, &resp) + require.NoError(t, err) + + require.Equal(t, test.expectedUnmarshalResp, resp) + }) + } +} diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 034090a8b9d..285d3433b57 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -11,9 +11,11 @@ import ( "time" jsoniter "github.com/json-iterator/go" + "github.com/munnerz/goautoneg" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" + v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc/status" @@ -28,16 +30,20 @@ var ( SortMapKeys: true, ValidateJsonRawMessage: false, }.Froze() + + rulerMIMEType = v1.MIMEType{Type: "application", SubType: tripperware.QueryResponseCortexMIMESubType} + jsonMIMEType = v1.MIMEType{Type: "application", SubType: "json"} ) type instantQueryCodec struct { tripperware.Codec - compression tripperware.Compression - defaultCodecType tripperware.CodecType - now func() time.Time + compression tripperware.Compression + defaultCodecType tripperware.CodecType + now func() time.Time + rulerQueryResponseFormat string } -func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) instantQueryCodec { +func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string, rulerQueryResponseFormat string) instantQueryCodec { compression := tripperware.NonCompression // default if compressionStr == string(tripperware.GzipCompression) { compression = tripperware.GzipCompression @@ -49,9 +55,10 @@ func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) ins } return instantQueryCodec{ - compression: compression, - defaultCodecType: defaultCodecType, - now: time.Now, + compression: compression, + defaultCodecType: defaultCodecType, + now: time.Now, + rulerQueryResponseFormat: rulerQueryResponseFormat, } } @@ -167,7 +174,7 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ return req.WithContext(ctx), nil } -func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) { +func (c instantQueryCodec) EncodeResponse(ctx context.Context, req *http.Request, res tripperware.Response) (*http.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") defer sp.Finish() @@ -176,7 +183,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format") } - b, err := json.Marshal(a) + contentType, b, err := marshalResponse(a, req.Header.Get("Accept")) if err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err) } @@ -185,7 +192,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res resp := http.Response{ Header: http.Header{ - "Content-Type": []string{tripperware.ApplicationJson}, + "Content-Type": []string{contentType}, }, Body: io.NopCloser(bytes.NewBuffer(b)), StatusCode: http.StatusOK, @@ -213,3 +220,18 @@ func decorateWithParamName(err error, field string) error { } return fmt.Errorf(errTmpl, field, err) } + +func marshalResponse(resp *tripperware.PrometheusResponse, acceptHeader string) (string, []byte, error) { + for _, clause := range goautoneg.ParseAccept(acceptHeader) { + if jsonMIMEType.Satisfies(clause) { + b, err := json.Marshal(resp) + return tripperware.ApplicationJson, b, err + } else if rulerMIMEType.Satisfies(clause) { + b, err := resp.Marshal() + return tripperware.QueryResponseCortexMIMEType, b, err + } + } + + b, err := json.Marshal(resp) + return tripperware.ApplicationJson, b, err +} diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 718bc460ab1..c83cb80ddba 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -25,7 +25,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tripperware" ) -var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType)) +var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType), "json") const testHistogramResponse = `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"prometheus_http_request_duration_seconds","handler":"/metrics","instance":"localhost:9090","job":"prometheus"},"histogram":[1719528871.898,{"count":"6342","sum":"43.31319875499995","buckets":[[0,"0.0013810679320049755","0.0015060652591874421","1"],[0,"0.0015060652591874421","0.001642375811042411","7"],[0,"0.001642375811042411","0.0017910235218841233","5"],[0,"0.0017910235218841233","0.001953125","13"],[0,"0.001953125","0.0021298979153618314","19"],[0,"0.0021298979153618314","0.0023226701464896895","13"],[0,"0.0023226701464896895","0.002532889755177753","13"],[0,"0.002532889755177753","0.002762135864009951","15"],[0,"0.002762135864009951","0.0030121305183748843","12"],[0,"0.0030121305183748843","0.003284751622084822","34"],[0,"0.003284751622084822","0.0035820470437682465","188"],[0,"0.0035820470437682465","0.00390625","372"],[0,"0.00390625","0.004259795830723663","400"],[0,"0.004259795830723663","0.004645340292979379","411"],[0,"0.004645340292979379","0.005065779510355506","425"],[0,"0.005065779510355506","0.005524271728019902","425"],[0,"0.005524271728019902","0.0060242610367497685","521"],[0,"0.0060242610367497685","0.006569503244169644","621"],[0,"0.006569503244169644","0.007164094087536493","593"],[0,"0.007164094087536493","0.0078125","506"],[0,"0.0078125","0.008519591661447326","458"],[0,"0.008519591661447326","0.009290680585958758","346"],[0,"0.009290680585958758","0.010131559020711013","285"],[0,"0.010131559020711013","0.011048543456039804","196"],[0,"0.011048543456039804","0.012048522073499537","129"],[0,"0.012048522073499537","0.013139006488339287","85"],[0,"0.013139006488339287","0.014328188175072986","65"],[0,"0.014328188175072986","0.015625","54"],[0,"0.015625","0.01703918332289465","53"],[0,"0.01703918332289465","0.018581361171917516","20"],[0,"0.018581361171917516","0.020263118041422026","21"],[0,"0.020263118041422026","0.022097086912079608","15"],[0,"0.022097086912079608","0.024097044146999074","11"],[0,"0.024097044146999074","0.026278012976678575","2"],[0,"0.026278012976678575","0.028656376350145972","3"],[0,"0.028656376350145972","0.03125","3"],[0,"0.04052623608284405","0.044194173824159216","2"]]}]}]}}` @@ -199,6 +199,12 @@ func TestCompressedResponse(t *testing.T) { func TestResponse(t *testing.T) { t.Parallel() + jsonHttpReq := &http.Request{ + Header: map[string][]string{ + "Accept": {"application/json"}, + }, + } + for i, tc := range []struct { jsonBody string promBody *tripperware.PrometheusResponse @@ -263,6 +269,7 @@ func TestResponse(t *testing.T) { Result: tripperware.PrometheusQueryResult{ Result: &tripperware.PrometheusQueryResult_Matrix{ Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ { Labels: []cortexpb.LabelAdapter{ @@ -323,7 +330,7 @@ func TestResponse(t *testing.T) { promBody: &tripperware.PrometheusResponse{ Status: "success", Data: tripperware.PrometheusData{ - ResultType: model.ValString.String(), + ResultType: model.ValScalar.String(), Result: tripperware.PrometheusQueryResult{ Result: &tripperware.PrometheusQueryResult_RawBytes{ RawBytes: []byte(`{"resultType":"scalar","result":[1,"13"]}`), @@ -388,7 +395,8 @@ func TestResponse(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), ContentLength: int64(len(tc.jsonBody)), } - resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), resp) + + resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), jsonHttpReq, resp) require.NoError(t, err) assert.Equal(t, response, resp2) }) @@ -400,6 +408,11 @@ func TestMergeResponse(t *testing.T) { defaultReq := &tripperware.PrometheusRequest{ Query: "sum(up)", } + jsonHttpReq := &http.Request{ + Header: map[string][]string{ + "Accept": {"application/json"}, + }, + } for _, tc := range []struct { name string req tripperware.Request @@ -665,7 +678,7 @@ func TestMergeResponse(t *testing.T) { cancelCtx() return } - dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp) + dr, err := testInstantQueryCodec.EncodeResponse(ctx, jsonHttpReq, resp) assert.Equal(t, tc.expectedErr, err) contents, err := io.ReadAll(dr.Body) assert.Equal(t, tc.expectedErr, err) @@ -680,6 +693,11 @@ func TestMergeResponseProtobuf(t *testing.T) { defaultReq := &tripperware.PrometheusRequest{ Query: "sum(up)", } + jsonHttpReq := &http.Request{ + Header: map[string][]string{ + "Accept": {"application/json"}, + }, + } for _, tc := range []struct { name string req tripperware.Request @@ -1680,7 +1698,7 @@ func TestMergeResponseProtobuf(t *testing.T) { cancelCtx() return } - dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp) + dr, err := testInstantQueryCodec.EncodeResponse(ctx, jsonHttpReq, resp) assert.Equal(t, tc.expectedErr, err) contents, err := io.ReadAll(dr.Body) assert.Equal(t, tc.expectedErr, err) @@ -1690,6 +1708,60 @@ func TestMergeResponseProtobuf(t *testing.T) { } } +func Test_marshalResponseContentType(t *testing.T) { + mockResp := &tripperware.PrometheusResponse{} + + tests := []struct { + name string + acceptHeader string + expectedContentType string + }{ + { + name: "empty accept header", + acceptHeader: "", + expectedContentType: "application/json", + }, + { + name: "type and subtype are *", + acceptHeader: "*/*", + expectedContentType: "application/json", + }, + { + name: "sub type is *", + acceptHeader: "application/*", + expectedContentType: "application/json", + }, + { + name: "json type", + acceptHeader: "application/json", + expectedContentType: "application/json", + }, + { + name: "proto type", + acceptHeader: "application/x-protobuf", + expectedContentType: "application/json", + }, + { + name: "json type and cortex type", + acceptHeader: "application/json,application/x-cortex-query+proto", + expectedContentType: "application/json", + }, + { + name: "cortex type", + acceptHeader: "application/x-cortex-query+proto", + expectedContentType: "application/x-cortex-query+proto", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + contentType, _, err := marshalResponse(mockResp, test.acceptHeader) + require.NoError(t, err) + require.Equal(t, test.expectedContentType, contentType) + }) + } +} + func Benchmark_Decode(b *testing.B) { maxSamplesCount := 1000000 samples := make([]tripperware.SampleStream, maxSamplesCount) diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 28e7c2430b4..28bb47804c5 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -12,10 +12,9 @@ import ( "time" "unsafe" - "github.com/golang/snappy" - "github.com/go-kit/log" "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" @@ -48,6 +47,9 @@ const ( ProtobufCodecType CodecType = "protobuf" ApplicationProtobuf string = "application/x-protobuf" ApplicationJson string = "application/json" + + QueryResponseCortexMIMEType = "application/" + QueryResponseCortexMIMESubType + QueryResponseCortexMIMESubType = "x-cortex-query+proto" ) // Codec is used to encode/decode query range requests and responses so they can be passed down to middlewares. @@ -62,7 +64,7 @@ type Codec interface { // EncodeRequest encodes a Request into an http request. EncodeRequest(context.Context, Request) (*http.Request, error) // EncodeResponse encodes a Response into an http response. - EncodeResponse(context.Context, Response) (*http.Response, error) + EncodeResponse(context.Context, *http.Request, Response) (*http.Response, error) } // Merger is used by middlewares making multiple requests to merge back all responses into a single one. diff --git a/pkg/querier/tripperware/queryrange/marshaling_test.go b/pkg/querier/tripperware/queryrange/marshaling_test.go index 4652a0e10bd..d0efd0e8d4d 100644 --- a/pkg/querier/tripperware/queryrange/marshaling_test.go +++ b/pkg/querier/tripperware/queryrange/marshaling_test.go @@ -80,7 +80,7 @@ func BenchmarkPrometheusCodec_EncodeResponse(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - _, err := PrometheusCodec.EncodeResponse(context.Background(), res) + _, err := PrometheusCodec.EncodeResponse(context.Background(), nil, res) require.NoError(b, err) } } diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 46f1affb48a..a71dd57c1cc 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -229,7 +229,7 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return &resp, nil } -func (prometheusCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) { +func (prometheusCodec) EncodeResponse(ctx context.Context, _ *http.Request, res tripperware.Response) (*http.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") defer sp.Finish() diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index 256d2800ff6..780077274dc 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -307,7 +307,7 @@ func TestResponse(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), ContentLength: int64(len(tc.jsonBody)), } - resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp) + resp2, err := PrometheusCodec.EncodeResponse(context.Background(), nil, resp) require.NoError(t, err) assert.Equal(t, response, resp2) cancelCtx() @@ -431,7 +431,7 @@ func TestResponseWithStats(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), ContentLength: int64(len(tc.jsonBody)), } - resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp) + resp2, err := PrometheusCodec.EncodeResponse(context.Background(), nil, resp) require.NoError(t, err) assert.Equal(t, response, resp2) }) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 4d7e4a9a68a..028a1e16d85 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -276,7 +276,7 @@ func TestSplitByDay(t *testing.T) { mergedResponse, err := PrometheusCodec.MergeResponse(context.Background(), nil, parsedResponse, parsedResponse) require.NoError(t, err) - mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), mergedResponse) + mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), nil, mergedResponse) require.NoError(t, err) mergedHTTPResponseBody, err := io.ReadAll(mergedHTTPResponse.Body) diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index 69b46dd66b0..c92bf486a22 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -211,7 +211,7 @@ func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) { return nil, err } - return q.codec.EncodeResponse(r.Context(), response) + return q.codec.EncodeResponse(r.Context(), r, response) } // Do implements Handler. diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 263c9a9b0e4..40ac06598c1 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -59,7 +59,7 @@ func (c mockCodec) DecodeRequest(_ context.Context, r *http.Request, _ []string) return mockRequest{}, nil } -func (c mockCodec) EncodeResponse(_ context.Context, resp Response) (*http.Response, error) { +func (c mockCodec) EncodeResponse(_ context.Context, _ *http.Request, resp Response) (*http.Response, error) { r := resp.(*mockResponse) return &http.Response{ Header: http.Header{ diff --git a/pkg/ruler/frontend_client.go b/pkg/ruler/frontend_client.go index e75997ce1aa..a3f697f1bba 100644 --- a/pkg/ruler/frontend_client.go +++ b/pkg/ruler/frontend_client.go @@ -22,22 +22,29 @@ const ( orgIDHeader = "X-Scope-OrgID" instantQueryPath = "/api/v1/query" mimeTypeForm = "application/x-www-form-urlencoded" - contentTypeJSON = "application/json" ) +var jsonDecoder JsonDecoder +var protobufDecoder ProtobufDecoder + type FrontendClient struct { client httpgrpc.HTTPClient timeout time.Duration prometheusHTTPPrefix string - jsonDecoder JsonDecoder + queryResponseFormat string + decoders map[string]Decoder } -func NewFrontendClient(client httpgrpc.HTTPClient, timeout time.Duration, prometheusHTTPPrefix string) *FrontendClient { +func NewFrontendClient(client httpgrpc.HTTPClient, timeout time.Duration, prometheusHTTPPrefix, queryResponseFormat string) *FrontendClient { return &FrontendClient{ client: client, timeout: timeout, prometheusHTTPPrefix: prometheusHTTPPrefix, - jsonDecoder: JsonDecoder{}, + queryResponseFormat: queryResponseFormat, + decoders: map[string]Decoder{ + jsonDecoder.ContentType(): jsonDecoder, + protobufDecoder.ContentType(): protobufDecoder, + }, } } @@ -55,6 +62,14 @@ func (p *FrontendClient) makeRequest(ctx context.Context, qs string, ts time.Tim return nil, err } + acceptHeader := "" + switch p.queryResponseFormat { + case queryResponseFormatJson: + acceptHeader = jsonDecoder.ContentType() + case queryResponseFormatProtobuf: + acceptHeader = fmt.Sprintf("%s,%s", protobufDecoder.ContentType(), jsonDecoder.ContentType()) + } + req := &httpgrpc.HTTPRequest{ Method: http.MethodPost, Url: p.prometheusHTTPPrefix + instantQueryPath, @@ -63,7 +78,7 @@ func (p *FrontendClient) makeRequest(ctx context.Context, qs string, ts time.Tim {Key: textproto.CanonicalMIMEHeaderKey("User-Agent"), Values: []string{fmt.Sprintf("Cortex/%s", version.Version)}}, {Key: textproto.CanonicalMIMEHeaderKey("Content-Type"), Values: []string{mimeTypeForm}}, {Key: textproto.CanonicalMIMEHeaderKey("Content-Length"), Values: []string{strconv.Itoa(len(body))}}, - {Key: textproto.CanonicalMIMEHeaderKey("Accept"), Values: []string{contentTypeJSON}}, + {Key: textproto.CanonicalMIMEHeaderKey("Accept"), Values: []string{acceptHeader}}, {Key: textproto.CanonicalMIMEHeaderKey(orgIDHeader), Values: []string{orgID}}, }, } @@ -91,7 +106,15 @@ func (p *FrontendClient) InstantQuery(ctx context.Context, qs string, t time.Tim return nil, err } - vector, warning, err := p.jsonDecoder.Decode(resp.Body) + contentType := extractHeader(resp.Headers, "Content-Type") + decoder, ok := p.decoders[contentType] + if !ok { + err = fmt.Errorf("unknown content type: %s", contentType) + level.Error(log).Log("err", err, "query", qs) + return nil, err + } + + vector, warning, err := decoder.Decode(resp.Body) if err != nil { level.Error(log).Log("err", err, "query", qs) return nil, err @@ -103,3 +126,13 @@ func (p *FrontendClient) InstantQuery(ctx context.Context, qs string, t time.Tim return vector, nil } + +func extractHeader(headers []*httpgrpc.Header, target string) string { + for _, h := range headers { + if h.Key == target && len(h.Values) > 0 { + return h.Values[0] + } + } + + return "" +} diff --git a/pkg/ruler/frontend_client_pool.go b/pkg/ruler/frontend_client_pool.go index b9b512cfab1..7b131621aa6 100644 --- a/pkg/ruler/frontend_client_pool.go +++ b/pkg/ruler/frontend_client_pool.go @@ -20,6 +20,7 @@ import ( type frontendPool struct { timeout time.Duration + queryResponseFormat string prometheusHTTPPrefix string grpcConfig grpcclient.Config @@ -29,6 +30,7 @@ type frontendPool struct { func newFrontendPool(cfg Config, log log.Logger, reg prometheus.Registerer) *client.Pool { p := &frontendPool{ timeout: cfg.FrontendTimeout, + queryResponseFormat: cfg.QueryResponseFormat, prometheusHTTPPrefix: cfg.PrometheusHTTPPrefix, grpcConfig: cfg.GRPCClientConfig, frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -68,7 +70,7 @@ func (f *frontendPool) createFrontendClient(addr string) (client.PoolClient, err } return &frontendClient{ - FrontendClient: NewFrontendClient(httpgrpc.NewHTTPClient(conn), f.timeout, f.prometheusHTTPPrefix), + FrontendClient: NewFrontendClient(httpgrpc.NewHTTPClient(conn), f.timeout, f.prometheusHTTPPrefix, f.queryResponseFormat), HealthClient: grpc_health_v1.NewHealthClient(conn), }, nil } diff --git a/pkg/ruler/frontend_client_test.go b/pkg/ruler/frontend_client_test.go index 104780f58c5..cedaeacb36b 100644 --- a/pkg/ruler/frontend_client_test.go +++ b/pkg/ruler/frontend_client_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" @@ -28,7 +30,7 @@ func TestTimeout(t *testing.T) { } ctx := context.Background() ctx = user.InjectOrgID(ctx, "userID") - frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "json") _, err := frontendClient.InstantQuery(ctx, "query", time.Now()) require.Equal(t, context.DeadlineExceeded, err) } @@ -37,12 +39,12 @@ func TestNoOrgId(t *testing.T) { mockClientFn := func(ctx context.Context, _ *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { return nil, nil } - frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "json") _, err := frontendClient.InstantQuery(context.Background(), "query", time.Now()) require.Equal(t, user.ErrNoOrgID, err) } -func TestInstantQuery(t *testing.T) { +func TestInstantQueryJsonCodec(t *testing.T) { tests := []struct { description string responseBody string @@ -148,10 +150,195 @@ func TestInstantQuery(t *testing.T) { } ctx := context.Background() ctx = user.InjectOrgID(ctx, "userID") - frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "json") vector, err := frontendClient.InstantQuery(ctx, "query", time.Now()) require.Equal(t, test.expected, vector) require.Equal(t, test.expectedErr, err) }) } } + +func TestInstantQueryProtoCodec(t *testing.T) { + var tests = []struct { + description string + responseBody *tripperware.PrometheusResponse + expected promql.Vector + expectedErr error + }{ + { + description: "empty vector", + responseBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "vector", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{}, + }, + }, + }, + }, + }, + expected: promql.Vector{}, + expectedErr: nil, + }, + { + description: "vector with series", + responseBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "vector", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("foo", "bar")), + Sample: &cortexpb.Sample{ + Value: 1.234, + TimestampMs: 1724146338123, + }, + }, + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("bar", "baz")), + Sample: &cortexpb.Sample{ + Value: 5.678, + TimestampMs: 1724146338456, + }, + }, + }, + }, + }, + }, + }, + }, + expected: promql.Vector{ + { + Metric: labels.FromStrings("foo", "bar"), + T: 1724146338123, + F: 1.234, + }, + { + Metric: labels.FromStrings("bar", "baz"), + T: 1724146338456, + F: 5.678, + }, + }, + expectedErr: nil, + }, + { + description: "get scalar", + responseBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "scalar", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"scalar","result":[1724146338.123,"1.234"]}`), + }, + }, + }, + }, + expected: promql.Vector{ + { + Metric: labels.EmptyLabels(), + T: 1724146338123, + F: 1.234, + }, + }, + expectedErr: nil, + }, + { + description: "get matrix", + responseBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "matrix", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{}, + }, + }, + }, + }, + }, + expectedErr: errors.New("rule result is not a vector or scalar"), + }, + { + description: "get string", + responseBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "string", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"string","result":[1724146338.123,"string"]}`), + }, + }, + }, + }, + expectedErr: errors.New("rule result is not a vector or scalar"), + }, + } + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + mockClientFn := func(ctx context.Context, _ *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + d, err := test.responseBody.Marshal() + if err != nil { + return nil, err + } + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{"application/x-cortex-query+proto"}}, + }, + Body: d, + }, nil + } + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "userID") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "protobuf") + vector, err := frontendClient.InstantQuery(ctx, "query", time.Now()) + require.Equal(t, test.expected, vector) + require.Equal(t, test.expectedErr, err) + }) + } +} + +func Test_extractHeader(t *testing.T) { + tests := []struct { + description string + headers []*httpgrpc.Header + expectedOutput string + }{ + { + description: "protobuf", + headers: []*httpgrpc.Header{ + { + Key: "Content-Type", + Values: []string{"application/x-protobuf"}, + }, + }, + expectedOutput: "application/x-protobuf", + }, + { + description: "json", + headers: []*httpgrpc.Header{ + { + Key: "Content-Type", + Values: []string{"application/json"}, + }, + }, + expectedOutput: "application/json", + }, + } + + target := "Content-Type" + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + require.Equal(t, test.expectedOutput, extractHeader(test.headers, target)) + }) + } +} diff --git a/pkg/ruler/frontend_decoder.go b/pkg/ruler/frontend_decoder.go index c8e653a05f0..747d01583b2 100644 --- a/pkg/ruler/frontend_decoder.go +++ b/pkg/ruler/frontend_decoder.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util/api" ) @@ -18,8 +19,18 @@ const ( ) type JsonDecoder struct{} +type ProtobufDecoder struct{} type Warning []string +type Decoder interface { + Decode(body []byte) (promql.Vector, Warning, error) + ContentType() string +} + +func (j JsonDecoder) ContentType() string { + return "application/json" +} + func (j JsonDecoder) Decode(body []byte) (promql.Vector, Warning, error) { var response api.Response @@ -86,3 +97,68 @@ func (j JsonDecoder) scalarToPromQLVector(scalar model.Scalar) promql.Vector { Metric: labels.Labels{}, }} } + +func (p ProtobufDecoder) ContentType() string { + return tripperware.QueryResponseCortexMIMEType +} + +func (p ProtobufDecoder) Decode(body []byte) (promql.Vector, Warning, error) { + resp := tripperware.PrometheusResponse{} + if err := resp.Unmarshal(body); err != nil { + return nil, nil, err + } + + if resp.Status == statusError { + return nil, resp.Warnings, fmt.Errorf("failed to execute query with error: %s", resp.Error) + } + + switch resp.Data.ResultType { + case "scalar": + data := struct { + Type model.ValueType `json:"resultType"` + Result json.RawMessage `json:"result"` + }{} + + if err := json.Unmarshal(resp.Data.Result.GetRawBytes(), &data); err != nil { + return nil, nil, err + } + + var s model.Scalar + if err := json.Unmarshal(data.Result, &s); err != nil { + return nil, nil, err + } + return p.scalarToPromQLVector(s), resp.Warnings, nil + case "vector": + return p.vectorToPromQLVector(resp.Data.Result.GetVector()), resp.Warnings, nil + default: + return nil, resp.Warnings, errors.New("rule result is not a vector or scalar") + } +} + +func (p ProtobufDecoder) vectorToPromQLVector(vector *tripperware.Vector) promql.Vector { + v := make([]promql.Sample, 0, len(vector.Samples)) + for _, sample := range vector.Samples { + metric := make([]labels.Label, 0, len(sample.Labels)) + for _, lb := range sample.Labels { + metric = append(metric, labels.Label{ + Name: lb.Name, + Value: lb.Value, + }) + } + v = append(v, promql.Sample{ + T: sample.Sample.TimestampMs, + F: sample.Sample.Value, + Metric: metric, + }) + } + + return v +} + +func (p ProtobufDecoder) scalarToPromQLVector(s model.Scalar) promql.Vector { + return promql.Vector{promql.Sample{ + T: int64(s.Timestamp), + F: float64(s.Value), + Metric: labels.Labels{}, + }} +} diff --git a/pkg/ruler/frontend_decoder_test.go b/pkg/ruler/frontend_decoder_test.go index cc24fe732ba..2e945d259ce 100644 --- a/pkg/ruler/frontend_decoder_test.go +++ b/pkg/ruler/frontend_decoder_test.go @@ -7,11 +7,125 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/tripperware" ) -func TestDecode(t *testing.T) { - jsonDecoder := JsonDecoder{} +func TestProtoDecode(t *testing.T) { + tests := []struct { + description string + resp *tripperware.PrometheusResponse + expectedVector promql.Vector + expectedWarning []string + expectedErr error + }{ + { + description: "vector", + resp: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "vector", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("foo", "bar")), + Sample: &cortexpb.Sample{ + Value: 1.234, + TimestampMs: 1724146338123, + }, + }, + }, + }, + }, + }, + }, + Warnings: []string{"a", "b", "c"}, + }, + expectedVector: promql.Vector{ + { + Metric: labels.FromStrings("foo", "bar"), + T: 1724146338123, + F: 1.234, + }, + }, + expectedWarning: []string{"a", "b", "c"}, + }, + { + description: "matrix", + resp: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "matrix", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{}, + }, + }, + Warnings: []string{"a", "b", "c"}, + }, + expectedVector: nil, + expectedWarning: []string{"a", "b", "c"}, + expectedErr: errors.New("rule result is not a vector or scalar"), + }, + { + description: "scalar", + resp: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "scalar", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"scalar","result":[1724146338.123,"1.234"]}`), + }, + }, + }, + Warnings: []string{"a", "b", "c"}, + }, + expectedVector: promql.Vector{ + { + Metric: labels.EmptyLabels(), + T: 1724146338123, + F: 1.234, + }, + }, + expectedWarning: []string{"a", "b", "c"}, + }, + { + description: "string", + resp: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "string", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"string","result":[1724146338.123,"1.234"]}`), + }, + }, + }, + Warnings: []string{"a", "b", "c"}, + }, + expectedVector: nil, + expectedWarning: []string{"a", "b", "c"}, + expectedErr: errors.New("rule result is not a vector or scalar"), + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + b, err := test.resp.Marshal() + require.NoError(t, err) + + vector, _, err := protobufDecoder.Decode(b) + require.Equal(t, test.expectedErr, err) + require.Equal(t, test.expectedVector, vector) + require.Equal(t, test.expectedWarning, test.resp.Warnings) + }) + } +} +func TestJsonDecode(t *testing.T) { tests := []struct { description string body string diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index d77b4d0a413..5094e4d305d 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -47,10 +47,13 @@ import ( var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} + supportedQueryResponseFormats = []string{queryResponseFormatJson, queryResponseFormatProtobuf} + // Validation errors. - errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0") + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0") + errInvalidQueryResponseFormat = errors.New("invalid query response format") ) const ( @@ -82,6 +85,10 @@ const ( unknownHealthFilter string = "unknown" okHealthFilter string = "ok" errHealthFilter string = "err" + + // query response formats + queryResponseFormatJson = "json" + queryResponseFormatProtobuf = "protobuf" ) type DisabledRuleGroupErr struct { @@ -96,6 +103,8 @@ func (e *DisabledRuleGroupErr) Error() string { type Config struct { // This is used for query to query frontend to evaluate rules FrontendAddress string `yaml:"frontend_address"` + // Query response format of query frontend for evaluating rules + QueryResponseFormat string `yaml:"query_response_format"` // HTTP timeout duration when querying to query frontend to evaluate rules FrontendTimeout time.Duration `yaml:"-"` // Query frontend GRPC Client configuration. @@ -185,6 +194,10 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error { if cfg.ConcurrentEvalsEnabled && cfg.MaxConcurrentEvals <= 0 { return errInvalidMaxConcurrentEvals } + + if !util.StringsContain(supportedQueryResponseFormats, cfg.QueryResponseFormat) { + return errInvalidQueryResponseFormat + } return nil } @@ -207,6 +220,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flagext.DeprecatedFlag(f, "ruler.alertmanager-use-v2", "This flag is no longer functional. V1 API is deprecated and removed", util_log.Logger) f.StringVar(&cfg.FrontendAddress, "ruler.frontend-address", "", "[Experimental] GRPC listen address of the Query Frontend, in host:port format. If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries to Ingesters directly.") + f.StringVar(&cfg.QueryResponseFormat, "ruler.query-response-format", queryResponseFormatProtobuf, fmt.Sprintf("[Experimental] Query response format to get query results from Query Frontend when the rule evaluation. Supported values: %s", strings.Join(supportedQueryResponseFormats, ","))) cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.") f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 1*time.Minute, "How frequently to evaluate rules")