From 5f93620ccbf993d27758053db938937954f425c7 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 18 Nov 2024 15:56:29 +0900 Subject: [PATCH] Allow ruler to retrieve proto format query response Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 5 + go.mod | 2 +- integration/ruler_test.go | 85 +-- pkg/cortex/modules.go | 2 +- pkg/querier/codec/protobuf_codec.go | 14 + pkg/querier/codec/protobug_codec_test.go | 93 ++++ .../tripperware/instantquery/instant_query.go | 52 +- .../instantquery/instant_query_test.go | 84 ++- pkg/querier/tripperware/query.go | 50 +- pkg/querier/tripperware/query.pb.go | 513 +++++++++++++++--- pkg/querier/tripperware/query.proto | 6 + pkg/querier/tripperware/query_test.go | 35 ++ .../tripperware/queryrange/marshaling_test.go | 2 +- .../tripperware/queryrange/query_range.go | 2 +- .../queryrange/query_range_test.go | 4 +- .../queryrange/split_by_interval_test.go | 2 +- pkg/querier/tripperware/roundtrip.go | 2 +- pkg/querier/tripperware/roundtrip_test.go | 2 +- pkg/ruler/frontend_client.go | 45 +- pkg/ruler/frontend_client_pool.go | 4 +- pkg/ruler/frontend_client_test.go | 42 +- pkg/ruler/frontend_decoder.go | 59 ++ pkg/ruler/frontend_decoder_test.go | 2 - pkg/ruler/ruler.go | 20 +- 25 files changed, 973 insertions(+), 155 deletions(-) create mode 100644 pkg/querier/codec/protobug_codec_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a86b2cd9cc4..a4a1b59d6ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [CHANGE] Change all max async concurrency default values `50` to `3` #6268 * [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265 * [CHANGE] Enable Compactor and Alertmanager in target all. #6204 +* [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345 * [FEATURE] Ruler: Pagination support for List Rules API. #6299 * [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527 * [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 211ad257116..9463d3a8347 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4252,6 +4252,11 @@ The `ruler_config` configures the Cortex ruler. # CLI flag: -ruler.frontend-address [frontend_address: | default = ""] +# [Experimental] Query response format to get query results from Query Frontend +# when the rule evaluation. Supported values: json,protobuf +# CLI flag: -ruler.query-response-format +[query_response_format: | default = "protobuf"] + frontend_client: # gRPC client max receive message size (bytes). # CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size diff --git a/go.mod b/go.mod index 9e1eaed558a..4e84b1f7e80 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 github.com/cespare/xxhash/v2 v2.3.0 github.com/google/go-cmp v0.6.0 + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/sercand/kuberesolver/v4 v4.0.0 go.opentelemetry.io/collector/pdata v1.19.0 golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 @@ -186,7 +187,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/ncw/swift v1.0.53 // indirect github.com/oklog/run v1.1.0 // indirect diff --git a/integration/ruler_test.go b/integration/ruler_test.go index f7d16507d1a..0164a09c918 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -1670,42 +1670,61 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) { distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") require.NoError(t, s.StartAndWaitReady(distributor, ingester)) - queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") - require.NoError(t, s.Start(queryFrontend)) - - ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(), - }), "") - querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), - }), "") - require.NoError(t, s.StartAndWaitReady(ruler, querier)) - - c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user) - require.NoError(t, err) + for _, format := range []string{"protobuf", "json"} { + t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) { + queryFrontendFlag := mergeFlags(flags, map[string]string{ + "-ruler.query-response-format": format, + }) + queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "") + require.NoError(t, s.Start(queryFrontend)) - expression := "metric" - groupName := "rule_group" - ruleName := "rule_name" - require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace)) + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(queryFrontendFlag, map[string]string{ + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier)) - rgMatcher := ruleGroupMatcher(user, namespace, groupName) - // Wait until ruler has loaded the group. - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) - // Wait until rule group has tried to evaluate the rule. - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + rulerFlag := mergeFlags(queryFrontendFlag, map[string]string{ + "-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }) + ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), rulerFlag, "") + require.NoError(t, s.StartAndWaitReady(ruler)) - matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user) - // Check that cortex_ruler_query_frontend_clients went up - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics)) - // Check that cortex_ruler_queries_total went up - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) - // Check that cortex_ruler_queries_failed_total is zero - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) - // Check that cortex_ruler_write_requests_total went up - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) - // Check that cortex_ruler_write_requests_failed_total is zero - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + t.Cleanup(func() { + _ = s.Stop(ruler) + _ = s.Stop(queryFrontend) + _ = s.Stop(querier) + }) + + c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user) + require.NoError(t, err) + + expression := "metric" // vector + //expression := "scalar(count(up == 1)) > bool 1" // scalar + groupName := "rule_group" + ruleName := "rule_name" + require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace)) + + rgMatcher := ruleGroupMatcher(user, namespace, groupName) + // Wait until ruler has loaded the group. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + // Wait until rule group has tried to evaluate the rule. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + // Make sure not to fail + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + + matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user) + // Check that cortex_ruler_query_frontend_clients went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics)) + // Check that cortex_ruler_queries_total went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_queries_failed_total is zero + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_write_requests_total went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_write_requests_failed_total is zero + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + }) + } } func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 00f0b10a203..f94a783121b 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -474,7 +474,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) - instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) + instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec, t.Cfg.Ruler.QueryResponseFormat) queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, diff --git a/pkg/querier/codec/protobuf_codec.go b/pkg/querier/codec/protobuf_codec.go index b835d573e8a..7052779502f 100644 --- a/pkg/querier/codec/protobuf_codec.go +++ b/pkg/querier/codec/protobuf_codec.go @@ -52,6 +52,11 @@ func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusRe Samples: *getVectorSamples(data), }, } + case model.ValScalar.String(): + scalar := getScalar(data) + queryResult.Result = &tripperware.PrometheusQueryResult_Scalar{ + Scalar: &scalar, + } default: json := jsoniter.ConfigCompatibleWithStandardLibrary rawBytes, err := json.Marshal(data) @@ -141,6 +146,15 @@ func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample { return &vectorSamples } +func getScalar(data *v1.QueryData) tripperware.Scalar { + s := data.Result.(promql.Scalar) + + return tripperware.Scalar{ + Value: s.V, + TimestampMs: s.T, + } +} + func getStats(builtin *stats.BuiltinStats) *tripperware.PrometheusResponseSamplesStats { queryableSamplesStatsPerStepLen := len(builtin.Samples.TotalQueryableSamplesPerStep) queryableSamplesStatsPerStep := make([]*tripperware.PrometheusResponseQueryableSamplesStatsPerStep, queryableSamplesStatsPerStepLen) diff --git a/pkg/querier/codec/protobug_codec_test.go b/pkg/querier/codec/protobug_codec_test.go new file mode 100644 index 00000000000..bb95aae23b8 --- /dev/null +++ b/pkg/querier/codec/protobug_codec_test.go @@ -0,0 +1,93 @@ +package codec + +import ( + "testing" + + jsoniter "github.com/json-iterator/go" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + v1 "github.com/prometheus/prometheus/web/api/v1" + "github.com/stretchr/testify/require" +) + +var ( + json = jsoniter.Config{ + EscapeHTML: false, // No HTML in our responses. + SortMapKeys: true, + ValidateJsonRawMessage: false, + }.Froze() +) + +func TestPrometheusResponse_JsonMarshal(t *testing.T) { + var tests = []struct { + name string + resp *v1.Response + expectedJson string + }{ + { + name: "scalar", + resp: &v1.Response{ + Status: "success", + Data: &v1.QueryData{ + ResultType: "scalar", + Result: promql.Scalar{T: 1000, V: 2}, + }, + }, + expectedJson: `{"status":"success","data":{"resultType":"scalar","result":[1,"2"]}}`, + }, + { + name: "vector", + resp: &v1.Response{ + Status: "success", + Data: &v1.QueryData{ + ResultType: "vector", + Result: promql.Vector{ + { + Metric: labels.FromStrings("name", "value"), + T: 1234, + F: 5.67, + }, + }, + }, + }, + expectedJson: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"name":"value"},"value":[1.234,"5.67"]}]}}`, + }, + { + name: "matrix", + resp: &v1.Response{ + Status: "success", + Data: &v1.QueryData{ + ResultType: parser.ValueTypeMatrix, + Result: promql.Matrix{ + { + Metric: labels.FromStrings("name1", "value1"), + Floats: []promql.FPoint{ + {T: 12, F: 3.4}, + {T: 56, F: 7.8}, + }, + }, + { + Metric: labels.FromStrings("name2", "value2"), + Floats: []promql.FPoint{ + {T: 12, F: 3.4}, + {T: 56, F: 7.8}, + }, + }, + }, + }, + }, + expectedJson: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"name1":"value1"},"values":[[0.012,"3.4"],[0.056,"7.8"]]},{"metric":{"name2":"value2"},"values":[[0.012,"3.4"],[0.056,"7.8"]]}]}}`, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + prometheusResponse, err := createPrometheusQueryResponse(test.resp) + require.NoError(t, err) + + b, err := json.Marshal(prometheusResponse) + require.NoError(t, err) + require.Equal(t, test.expectedJson, string(b)) + }) + } +} diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 034090a8b9d..98a81798cb8 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -11,9 +11,11 @@ import ( "time" jsoniter "github.com/json-iterator/go" + "github.com/munnerz/goautoneg" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" + v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc/status" @@ -28,16 +30,20 @@ var ( SortMapKeys: true, ValidateJsonRawMessage: false, }.Froze() + + protobufMIMEType = v1.MIMEType{Type: "application", SubType: "x-protobuf"} + jsonMIMEType = v1.MIMEType{Type: "application", SubType: "json"} ) type instantQueryCodec struct { tripperware.Codec - compression tripperware.Compression - defaultCodecType tripperware.CodecType - now func() time.Time + compression tripperware.Compression + defaultCodecType tripperware.CodecType + now func() time.Time + rulerQueryResponseFormat string } -func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) instantQueryCodec { +func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string, rulerQueryResponseFormat string) instantQueryCodec { compression := tripperware.NonCompression // default if compressionStr == string(tripperware.GzipCompression) { compression = tripperware.GzipCompression @@ -49,9 +55,10 @@ func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) ins } return instantQueryCodec{ - compression: compression, - defaultCodecType: defaultCodecType, - now: time.Now, + compression: compression, + defaultCodecType: defaultCodecType, + now: time.Now, + rulerQueryResponseFormat: rulerQueryResponseFormat, } } @@ -114,6 +121,12 @@ func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ if resp.Data.Result.GetVector().Samples == nil { resp.Data.Result.GetVector().Samples = []tripperware.Sample{} } + case model.ValScalar.String(): + if resp.Data.Result.GetScalar() == nil { + resp.Data.Result.Result = &tripperware.PrometheusQueryResult_Scalar{ + Scalar: &tripperware.Scalar{}, + } + } } if resp.Headers == nil { @@ -167,7 +180,7 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ return req.WithContext(ctx), nil } -func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) { +func (c instantQueryCodec) EncodeResponse(ctx context.Context, req *http.Request, res tripperware.Response) (*http.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") defer sp.Finish() @@ -176,7 +189,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format") } - b, err := json.Marshal(a) + contentType, b, err := marshalResponse(a, req.Header.Get("Accept")) if err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err) } @@ -185,7 +198,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res resp := http.Response{ Header: http.Header{ - "Content-Type": []string{tripperware.ApplicationJson}, + "Content-Type": []string{contentType}, }, Body: io.NopCloser(bytes.NewBuffer(b)), StatusCode: http.StatusOK, @@ -213,3 +226,22 @@ func decorateWithParamName(err error, field string) error { } return fmt.Errorf(errTmpl, field, err) } + +func marshalResponse(resp *tripperware.PrometheusResponse, acceptHeader string) (string, []byte, error) { + if acceptHeader == "" { + b, err := json.Marshal(resp) + return tripperware.ApplicationJson, b, err + } + + for _, clause := range goautoneg.ParseAccept(acceptHeader) { + if jsonMIMEType.Satisfies(clause) { + b, err := json.Marshal(resp) + return tripperware.ApplicationJson, b, err + } else if protobufMIMEType.Satisfies(clause) { + b, err := resp.Marshal() + return tripperware.ApplicationProtobuf, b, err + } + } + + return "", nil, fmt.Errorf("failed to marshal response with accept header: %s", acceptHeader) +} diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 718bc460ab1..a997b99dbd2 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -25,7 +25,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tripperware" ) -var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType)) +var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType), "json") const testHistogramResponse = `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"prometheus_http_request_duration_seconds","handler":"/metrics","instance":"localhost:9090","job":"prometheus"},"histogram":[1719528871.898,{"count":"6342","sum":"43.31319875499995","buckets":[[0,"0.0013810679320049755","0.0015060652591874421","1"],[0,"0.0015060652591874421","0.001642375811042411","7"],[0,"0.001642375811042411","0.0017910235218841233","5"],[0,"0.0017910235218841233","0.001953125","13"],[0,"0.001953125","0.0021298979153618314","19"],[0,"0.0021298979153618314","0.0023226701464896895","13"],[0,"0.0023226701464896895","0.002532889755177753","13"],[0,"0.002532889755177753","0.002762135864009951","15"],[0,"0.002762135864009951","0.0030121305183748843","12"],[0,"0.0030121305183748843","0.003284751622084822","34"],[0,"0.003284751622084822","0.0035820470437682465","188"],[0,"0.0035820470437682465","0.00390625","372"],[0,"0.00390625","0.004259795830723663","400"],[0,"0.004259795830723663","0.004645340292979379","411"],[0,"0.004645340292979379","0.005065779510355506","425"],[0,"0.005065779510355506","0.005524271728019902","425"],[0,"0.005524271728019902","0.0060242610367497685","521"],[0,"0.0060242610367497685","0.006569503244169644","621"],[0,"0.006569503244169644","0.007164094087536493","593"],[0,"0.007164094087536493","0.0078125","506"],[0,"0.0078125","0.008519591661447326","458"],[0,"0.008519591661447326","0.009290680585958758","346"],[0,"0.009290680585958758","0.010131559020711013","285"],[0,"0.010131559020711013","0.011048543456039804","196"],[0,"0.011048543456039804","0.012048522073499537","129"],[0,"0.012048522073499537","0.013139006488339287","85"],[0,"0.013139006488339287","0.014328188175072986","65"],[0,"0.014328188175072986","0.015625","54"],[0,"0.015625","0.01703918332289465","53"],[0,"0.01703918332289465","0.018581361171917516","20"],[0,"0.018581361171917516","0.020263118041422026","21"],[0,"0.020263118041422026","0.022097086912079608","15"],[0,"0.022097086912079608","0.024097044146999074","11"],[0,"0.024097044146999074","0.026278012976678575","2"],[0,"0.026278012976678575","0.028656376350145972","3"],[0,"0.028656376350145972","0.03125","3"],[0,"0.04052623608284405","0.044194173824159216","2"]]}]}]}}` @@ -199,6 +199,12 @@ func TestCompressedResponse(t *testing.T) { func TestResponse(t *testing.T) { t.Parallel() + jsonHttpReq := &http.Request{ + Header: map[string][]string{ + "Accept": {"application/json"}, + }, + } + for i, tc := range []struct { jsonBody string promBody *tripperware.PrometheusResponse @@ -263,6 +269,7 @@ func TestResponse(t *testing.T) { Result: tripperware.PrometheusQueryResult{ Result: &tripperware.PrometheusQueryResult_Matrix{ Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ { Labels: []cortexpb.LabelAdapter{ @@ -323,10 +330,13 @@ func TestResponse(t *testing.T) { promBody: &tripperware.PrometheusResponse{ Status: "success", Data: tripperware.PrometheusData{ - ResultType: model.ValString.String(), + ResultType: model.ValScalar.String(), Result: tripperware.PrometheusQueryResult{ - Result: &tripperware.PrometheusQueryResult_RawBytes{ - RawBytes: []byte(`{"resultType":"scalar","result":[1,"13"]}`), + Result: &tripperware.PrometheusQueryResult_Scalar{ + Scalar: &tripperware.Scalar{ + Value: 13, + TimestampMs: 1000, + }, }, }, }, @@ -388,7 +398,8 @@ func TestResponse(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), ContentLength: int64(len(tc.jsonBody)), } - resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), resp) + + resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), jsonHttpReq, resp) require.NoError(t, err) assert.Equal(t, response, resp2) }) @@ -400,6 +411,11 @@ func TestMergeResponse(t *testing.T) { defaultReq := &tripperware.PrometheusRequest{ Query: "sum(up)", } + jsonHttpReq := &http.Request{ + Header: map[string][]string{ + "Accept": {"application/json"}, + }, + } for _, tc := range []struct { name string req tripperware.Request @@ -665,7 +681,7 @@ func TestMergeResponse(t *testing.T) { cancelCtx() return } - dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp) + dr, err := testInstantQueryCodec.EncodeResponse(ctx, jsonHttpReq, resp) assert.Equal(t, tc.expectedErr, err) contents, err := io.ReadAll(dr.Body) assert.Equal(t, tc.expectedErr, err) @@ -680,6 +696,11 @@ func TestMergeResponseProtobuf(t *testing.T) { defaultReq := &tripperware.PrometheusRequest{ Query: "sum(up)", } + jsonHttpReq := &http.Request{ + Header: map[string][]string{ + "Accept": {"application/json"}, + }, + } for _, tc := range []struct { name string req tripperware.Request @@ -1680,7 +1701,7 @@ func TestMergeResponseProtobuf(t *testing.T) { cancelCtx() return } - dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp) + dr, err := testInstantQueryCodec.EncodeResponse(ctx, jsonHttpReq, resp) assert.Equal(t, tc.expectedErr, err) contents, err := io.ReadAll(dr.Body) assert.Equal(t, tc.expectedErr, err) @@ -1690,6 +1711,55 @@ func TestMergeResponseProtobuf(t *testing.T) { } } +func Test_marshalResponseContentType(t *testing.T) { + mockResp := &tripperware.PrometheusResponse{} + + tests := []struct { + name string + acceptHeader string + expectedContentType string + }{ + { + name: "empty accept header", + acceptHeader: "", + expectedContentType: "application/json", + }, + { + name: "type and subtype are *", + acceptHeader: "*/*", + expectedContentType: "application/json", + }, + { + name: "subnet is *", + acceptHeader: "application/*", + expectedContentType: "application/json", + }, + { + name: "json type", + acceptHeader: "application/json", + expectedContentType: "application/json", + }, + { + name: "proto type", + acceptHeader: "application/x-protobuf", + expectedContentType: "application/x-protobuf", + }, + { + name: "json type and proto type", + acceptHeader: "application/json,application/x-protobuf", + expectedContentType: "application/json", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + contentType, _, err := marshalResponse(mockResp, test.acceptHeader) + require.NoError(t, err) + require.Equal(t, test.expectedContentType, contentType) + }) + } +} + func Benchmark_Decode(b *testing.B) { maxSamplesCount := 1000000 samples := make([]tripperware.SampleStream, maxSamplesCount) diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 28e7c2430b4..4042ab96621 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -12,10 +12,9 @@ import ( "time" "unsafe" - "github.com/golang/snappy" - "github.com/go-kit/log" "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" @@ -62,7 +61,7 @@ type Codec interface { // EncodeRequest encodes a Request into an http request. EncodeRequest(context.Context, Request) (*http.Request, error) // EncodeResponse encodes a Response into an http response. - EncodeResponse(context.Context, Response) (*http.Response, error) + EncodeResponse(context.Context, *http.Request, Response) (*http.Response, error) } // Merger is used by middlewares making multiple requests to merge back all responses into a single one. @@ -533,6 +532,19 @@ func (s *PrometheusData) UnmarshalJSON(data []byte) error { SampleStreams: result.SampleStreams, }}, } + case model.ValScalar.String(): + var result struct { + Scalar Scalar `json:"result"` + } + if err := json.Unmarshal(data, &result); err != nil { + return err + } + s.Result = PrometheusQueryResult{ + Result: &PrometheusQueryResult_Scalar{Scalar: &Scalar{ + TimestampMs: result.Scalar.TimestampMs, + Value: result.Scalar.Value, + }}, + } default: s.Result = PrometheusQueryResult{ Result: &PrometheusQueryResult_RawBytes{data}, @@ -566,6 +578,17 @@ func (s *PrometheusData) MarshalJSON() ([]byte, error) { Stats: s.Stats, } return json.Marshal(res) + case model.ValScalar.String(): + res := struct { + ResultType string `json:"resultType"` + Data Scalar `json:"result"` + Stats *PrometheusResponseStats `json:"stats,omitempty"` + }{ + ResultType: s.ResultType, + Data: *s.Result.GetScalar(), + Stats: s.Stats, + } + return json.Marshal(res) default: return s.Result.GetRawBytes(), nil } @@ -769,3 +792,24 @@ func UnmarshalResponse(r *http.Response, buf []byte, resp *PrometheusResponse) e return json.Unmarshal(buf, resp) } } + +func (s *Scalar) MarshalJSON() ([]byte, error) { + return json.Marshal(model.Scalar{ + Timestamp: model.Time(s.TimestampMs), + Value: model.SampleValue(s.Value), + }) +} + +func (s *Scalar) UnmarshalJSON(buf []byte) error { + var sm model.Scalar + if err := json.Unmarshal(buf, &sm); err != nil { + return err + } + + *s = Scalar{ + TimestampMs: int64(sm.Timestamp), + Value: float64(sm.Value), + } + + return nil +} diff --git a/pkg/querier/tripperware/query.pb.go b/pkg/querier/tripperware/query.pb.go index 2e16fc9c6db..54a3ff10397 100644 --- a/pkg/querier/tripperware/query.pb.go +++ b/pkg/querier/tripperware/query.pb.go @@ -741,6 +741,7 @@ type PrometheusQueryResult struct { // *PrometheusQueryResult_Vector // *PrometheusQueryResult_RawBytes // *PrometheusQueryResult_Matrix + // *PrometheusQueryResult_Scalar Result isPrometheusQueryResult_Result `protobuf_oneof:"result"` } @@ -792,10 +793,14 @@ type PrometheusQueryResult_RawBytes struct { type PrometheusQueryResult_Matrix struct { Matrix *Matrix `protobuf:"bytes,3,opt,name=matrix,proto3,oneof"` } +type PrometheusQueryResult_Scalar struct { + Scalar *Scalar `protobuf:"bytes,4,opt,name=scalar,proto3,oneof"` +} func (*PrometheusQueryResult_Vector) isPrometheusQueryResult_Result() {} func (*PrometheusQueryResult_RawBytes) isPrometheusQueryResult_Result() {} func (*PrometheusQueryResult_Matrix) isPrometheusQueryResult_Result() {} +func (*PrometheusQueryResult_Scalar) isPrometheusQueryResult_Result() {} func (m *PrometheusQueryResult) GetResult() isPrometheusQueryResult_Result { if m != nil { @@ -825,12 +830,20 @@ func (m *PrometheusQueryResult) GetMatrix() *Matrix { return nil } +func (m *PrometheusQueryResult) GetScalar() *Scalar { + if x, ok := m.GetResult().(*PrometheusQueryResult_Scalar); ok { + return x.Scalar + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*PrometheusQueryResult) XXX_OneofWrappers() []interface{} { return []interface{}{ (*PrometheusQueryResult_Vector)(nil), (*PrometheusQueryResult_RawBytes)(nil), (*PrometheusQueryResult_Matrix)(nil), + (*PrometheusQueryResult_Scalar)(nil), } } @@ -972,6 +985,57 @@ func (m *Matrix) GetSampleStreams() []SampleStream { return nil } +type Scalar struct { + TimestampMs int64 `protobuf:"varint,1,opt,name=timestamp_ms,json=timestampMs,proto3" json:"timestamp_ms,omitempty"` + Value float64 `protobuf:"fixed64,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (m *Scalar) Reset() { *m = Scalar{} } +func (*Scalar) ProtoMessage() {} +func (*Scalar) Descriptor() ([]byte, []int) { + return fileDescriptor_5c6ac9b241082464, []int{16} +} +func (m *Scalar) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Scalar) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Scalar.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Scalar) XXX_Merge(src proto.Message) { + xxx_messageInfo_Scalar.Merge(m, src) +} +func (m *Scalar) XXX_Size() int { + return m.Size() +} +func (m *Scalar) XXX_DiscardUnknown() { + xxx_messageInfo_Scalar.DiscardUnknown(m) +} + +var xxx_messageInfo_Scalar proto.InternalMessageInfo + +func (m *Scalar) GetTimestampMs() int64 { + if m != nil { + return m.TimestampMs + } + return 0 +} + +func (m *Scalar) GetValue() float64 { + if m != nil { + return m.Value + } + return 0 +} + func init() { proto.RegisterType((*PrometheusResponse)(nil), "tripperware.PrometheusResponse") proto.RegisterType((*PrometheusData)(nil), "tripperware.PrometheusData") @@ -989,88 +1053,91 @@ func init() { proto.RegisterType((*Vector)(nil), "tripperware.Vector") proto.RegisterType((*Sample)(nil), "tripperware.Sample") proto.RegisterType((*Matrix)(nil), "tripperware.Matrix") + proto.RegisterType((*Scalar)(nil), "tripperware.Scalar") } func init() { proto.RegisterFile("query.proto", fileDescriptor_5c6ac9b241082464) } var fileDescriptor_5c6ac9b241082464 = []byte{ - // 1208 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x56, 0x4b, 0x6f, 0x1b, 0x55, - 0x14, 0xf6, 0xf8, 0x31, 0x76, 0x8e, 0xd3, 0xa4, 0xdc, 0xf4, 0xe1, 0x94, 0x32, 0x63, 0x46, 0x20, - 0x05, 0x41, 0x1d, 0x91, 0x0a, 0x10, 0x20, 0x2a, 0x3a, 0x50, 0x48, 0x0b, 0xa5, 0xed, 0x4d, 0x55, - 0x24, 0x36, 0xd5, 0xb5, 0x7d, 0xeb, 0x0c, 0xf1, 0x3c, 0x7a, 0xe7, 0x4e, 0x13, 0xb3, 0x62, 0xcd, - 0x02, 0xb1, 0x46, 0x62, 0xc1, 0x8e, 0x05, 0x3f, 0x24, 0xcb, 0x2e, 0x2b, 0x24, 0x46, 0xd4, 0xd9, - 0xa0, 0x59, 0xf5, 0x27, 0xa0, 0xfb, 0x18, 0x7b, 0x9c, 0x38, 0x89, 0xba, 0x62, 0xe3, 0xcc, 0x39, - 0xe7, 0x3b, 0xcf, 0x7b, 0x1e, 0x81, 0xe6, 0xe3, 0x84, 0xb2, 0x51, 0x27, 0x62, 0x21, 0x0f, 0x51, - 0x93, 0x33, 0x2f, 0x8a, 0x28, 0xdb, 0x25, 0x8c, 0x5e, 0x3a, 0x37, 0x08, 0x07, 0xa1, 0xe4, 0xaf, - 0x8b, 0x2f, 0x05, 0xb9, 0x64, 0x0d, 0xc2, 0x70, 0x30, 0xa4, 0xeb, 0x92, 0xea, 0x26, 0x8f, 0xd6, - 0xfb, 0x09, 0x23, 0xdc, 0x0b, 0x03, 0x2d, 0x5f, 0x3d, 0x2c, 0x27, 0x81, 0xb6, 0x7e, 0xe9, 0xc3, - 0x81, 0xc7, 0xb7, 0x93, 0x6e, 0xa7, 0x17, 0xfa, 0xeb, 0xbd, 0x90, 0x71, 0xba, 0x17, 0xb1, 0xf0, - 0x7b, 0xda, 0xe3, 0x9a, 0x5a, 0x8f, 0x76, 0x06, 0xb9, 0xa0, 0xab, 0x3f, 0x94, 0xaa, 0xf3, 0x53, - 0x05, 0xd0, 0x5d, 0x16, 0xfa, 0x94, 0x6f, 0xd3, 0x24, 0xc6, 0x34, 0x8e, 0xc2, 0x20, 0xa6, 0xc8, - 0x01, 0x73, 0x8b, 0x13, 0x9e, 0xc4, 0x2d, 0xa3, 0x6d, 0xac, 0x2d, 0xb8, 0x90, 0xa5, 0xb6, 0x19, - 0x4b, 0x0e, 0xd6, 0x12, 0xf4, 0x25, 0x54, 0x3f, 0x27, 0x9c, 0xb4, 0xca, 0x6d, 0x63, 0xad, 0xb9, - 0xf1, 0x6a, 0xa7, 0x90, 0x62, 0x67, 0x6a, 0x52, 0x40, 0xdc, 0x0b, 0xfb, 0xa9, 0x5d, 0xca, 0x52, - 0x7b, 0xa9, 0x4f, 0x38, 0x79, 0x27, 0xf4, 0x3d, 0x4e, 0xfd, 0x88, 0x8f, 0xb0, 0x34, 0x80, 0xde, - 0x83, 0x85, 0x1b, 0x8c, 0x85, 0xec, 0xfe, 0x28, 0xa2, 0xad, 0x8a, 0xf4, 0x77, 0x31, 0x4b, 0xed, - 0x15, 0x9a, 0x33, 0x0b, 0x1a, 0x53, 0x24, 0x7a, 0x0b, 0x6a, 0x92, 0x68, 0x55, 0xa5, 0xca, 0x4a, - 0x96, 0xda, 0xcb, 0x52, 0xa5, 0x00, 0x57, 0x08, 0xf4, 0x05, 0xd4, 0x37, 0x29, 0xe9, 0x53, 0x16, - 0xb7, 0x6a, 0xed, 0xca, 0x5a, 0x73, 0xe3, 0xcd, 0x63, 0xa2, 0xcd, 0x0b, 0xa0, 0xd0, 0x6e, 0x2d, - 0x4b, 0x6d, 0xe3, 0x0a, 0xce, 0x95, 0xd1, 0x06, 0x34, 0xbe, 0x25, 0x2c, 0xf0, 0x82, 0x41, 0xdc, - 0x32, 0xdb, 0x95, 0xb5, 0x05, 0xf7, 0x42, 0x96, 0xda, 0x68, 0x57, 0xf3, 0x0a, 0x8e, 0x27, 0x38, - 0x11, 0xe6, 0xcd, 0xe0, 0x51, 0x18, 0xb7, 0xea, 0x52, 0x41, 0x86, 0xe9, 0x09, 0x46, 0x31, 0x4c, - 0x89, 0x70, 0xfe, 0x36, 0x60, 0x69, 0xb6, 0x72, 0xa8, 0x03, 0x80, 0x69, 0x9c, 0x0c, 0xb9, 0x2c, - 0x8e, 0x7a, 0x8c, 0xa5, 0x2c, 0xb5, 0x81, 0x4d, 0xb8, 0xb8, 0x80, 0x40, 0xb7, 0xc0, 0x54, 0x94, - 0x7e, 0x16, 0xe7, 0x98, 0x44, 0xef, 0x89, 0xe6, 0x54, 0x48, 0x77, 0x49, 0xbf, 0x8e, 0xa9, 0x6c, - 0x62, 0x6d, 0x01, 0xdd, 0x81, 0x9a, 0x78, 0xf2, 0x58, 0xbe, 0x49, 0x73, 0xe3, 0x8d, 0x53, 0x6a, - 0x26, 0xda, 0x22, 0x56, 0xf9, 0x49, 0xb5, 0x62, 0x7e, 0x92, 0xe1, 0xec, 0xc0, 0xd2, 0x67, 0xa4, - 0xb7, 0x4d, 0xfb, 0x93, 0x3e, 0x5b, 0x85, 0xca, 0x0e, 0x1d, 0xe9, 0xbc, 0xea, 0x59, 0x6a, 0x0b, - 0x12, 0x8b, 0x1f, 0x74, 0x0d, 0xea, 0x74, 0x8f, 0xd3, 0x80, 0xc7, 0xad, 0xb2, 0x7c, 0xb3, 0x95, - 0x19, 0xff, 0x37, 0xa4, 0xcc, 0x5d, 0xd6, 0xb1, 0xe7, 0x58, 0x9c, 0x7f, 0x38, 0x7f, 0x1a, 0x60, - 0x2a, 0x10, 0xb2, 0x65, 0x22, 0x8c, 0x4b, 0x3f, 0x15, 0x77, 0x21, 0x4b, 0x6d, 0xc5, 0xc0, 0xea, - 0x8f, 0x08, 0x83, 0x06, 0x7d, 0x59, 0xb2, 0x8a, 0x0a, 0x83, 0x06, 0x7d, 0x2c, 0x7e, 0x50, 0x1b, - 0x1a, 0x9c, 0x91, 0x1e, 0x7d, 0xe8, 0xf5, 0x75, 0xa3, 0xe5, 0x4d, 0x21, 0xd9, 0x37, 0xfb, 0xe8, - 0x1a, 0x34, 0x98, 0xce, 0xa7, 0x55, 0x93, 0x95, 0x3a, 0xd7, 0x51, 0xb3, 0xda, 0xc9, 0x67, 0xb5, - 0x73, 0x3d, 0x18, 0xb9, 0x8b, 0x59, 0x6a, 0x4f, 0x90, 0x78, 0xf2, 0x75, 0xab, 0xda, 0xa8, 0x9c, - 0xad, 0x3a, 0xbf, 0x96, 0x61, 0x71, 0x8b, 0xf8, 0xd1, 0x90, 0x6e, 0x71, 0x46, 0x89, 0x8f, 0xf6, - 0xc0, 0x1c, 0x92, 0x2e, 0x1d, 0x8a, 0x11, 0x54, 0xe9, 0xe7, 0x13, 0xdc, 0xf9, 0x5a, 0xf0, 0xef, - 0x12, 0x8f, 0xb9, 0x5f, 0x89, 0xf4, 0xff, 0x4a, 0xed, 0x97, 0xda, 0x00, 0x4a, 0xff, 0x7a, 0x9f, - 0x44, 0x9c, 0x32, 0xf1, 0xee, 0x3e, 0xe5, 0xcc, 0xeb, 0x61, 0xed, 0x0f, 0x7d, 0x04, 0xf5, 0x58, - 0x46, 0x92, 0x57, 0xfe, 0xec, 0xd4, 0xb5, 0x0a, 0x71, 0xda, 0x32, 0x4f, 0xc8, 0x30, 0xa1, 0x31, - 0xce, 0x15, 0xd0, 0x7d, 0x80, 0x6d, 0x2f, 0xe6, 0xe1, 0x80, 0x11, 0x5f, 0x34, 0x8e, 0x50, 0x6f, - 0xcf, 0x3c, 0x9c, 0xb2, 0xb0, 0x99, 0x83, 0x64, 0x1a, 0x48, 0x9b, 0x2b, 0xe8, 0xe2, 0xc2, 0xb7, - 0xf3, 0x03, 0xac, 0xcc, 0x51, 0x43, 0xaf, 0xc3, 0x22, 0xf7, 0x7c, 0x1a, 0x73, 0xe2, 0x47, 0x0f, - 0x7d, 0xb5, 0xab, 0x2a, 0xb8, 0x39, 0xe1, 0xdd, 0x8e, 0xd1, 0xa7, 0xb0, 0x30, 0xb1, 0xa3, 0x47, - 0xe2, 0xf2, 0x49, 0xe1, 0xb8, 0x55, 0x11, 0x0a, 0x9e, 0x2a, 0x39, 0x8f, 0x61, 0xf9, 0x10, 0x06, - 0x9d, 0x83, 0x5a, 0x2f, 0x4c, 0x02, 0xd5, 0x4f, 0x06, 0x56, 0x04, 0x3a, 0x0b, 0x95, 0x38, 0x51, - 0x4e, 0x0c, 0x2c, 0x3e, 0xd1, 0xfb, 0x50, 0xef, 0x26, 0xbd, 0x1d, 0xca, 0xf3, 0x4a, 0xcc, 0xba, - 0x9e, 0x3a, 0x95, 0x20, 0x9c, 0x83, 0x9d, 0x18, 0x96, 0x0f, 0xc9, 0x90, 0x05, 0xd0, 0x0d, 0x93, - 0xa0, 0x4f, 0x98, 0x47, 0x55, 0xa2, 0x35, 0x5c, 0xe0, 0x88, 0x90, 0x86, 0xe1, 0x2e, 0x65, 0xda, - 0xbd, 0x22, 0x04, 0x37, 0x11, 0xee, 0xe4, 0x04, 0x1b, 0x58, 0x11, 0xd3, 0xf0, 0xab, 0x85, 0xf0, - 0x1d, 0x1f, 0x2e, 0x1e, 0x33, 0xd3, 0x08, 0x4f, 0x1b, 0xc2, 0x90, 0x25, 0x7c, 0xfb, 0xb4, 0x55, - 0xa0, 0xd0, 0x6a, 0x23, 0x34, 0xc5, 0x78, 0x6a, 0xfd, 0x49, 0xa3, 0x38, 0xfb, 0x65, 0xb0, 0x4e, - 0x56, 0x44, 0x77, 0xe0, 0x3c, 0x0f, 0x39, 0x19, 0xca, 0x5d, 0x45, 0xba, 0xc3, 0x5c, 0xaa, 0xc7, - 0x78, 0x35, 0x4b, 0xed, 0xf9, 0x00, 0x3c, 0x9f, 0x8d, 0x7e, 0x37, 0xe0, 0xf2, 0x5c, 0xc9, 0x5d, - 0xca, 0xb6, 0x38, 0x8d, 0x74, 0xbb, 0x7f, 0x7c, 0x4a, 0x76, 0x87, 0xb5, 0x65, 0xb4, 0xda, 0x84, - 0xdb, 0xce, 0x52, 0xfb, 0x44, 0x27, 0xf8, 0x44, 0x29, 0x7a, 0x17, 0x9a, 0x11, 0x25, 0x3b, 0x79, - 0xaa, 0x15, 0x99, 0xea, 0x72, 0x96, 0xda, 0x45, 0x36, 0x2e, 0x12, 0x8e, 0x07, 0x2f, 0x19, 0xa4, - 0xe8, 0x00, 0x39, 0xb8, 0x7a, 0x62, 0x14, 0x71, 0x64, 0x9c, 0xca, 0x47, 0xc6, 0xc9, 0xb9, 0x0f, - 0xad, 0xe3, 0x8e, 0x25, 0x5a, 0x85, 0xea, 0x37, 0xc4, 0xcf, 0x8f, 0x94, 0xde, 0x92, 0x92, 0x85, - 0x5e, 0x03, 0xf3, 0x81, 0x5c, 0x14, 0xb2, 0xc2, 0x13, 0xa1, 0x66, 0x3a, 0xbf, 0x19, 0x70, 0x7e, - 0xee, 0x69, 0x42, 0x57, 0xc0, 0x7c, 0x42, 0x7b, 0x3c, 0x64, 0xba, 0xf1, 0x66, 0x6f, 0xc0, 0x03, - 0x29, 0xda, 0x2c, 0x61, 0x0d, 0x42, 0x97, 0xa1, 0xc1, 0xc8, 0xae, 0x3b, 0xe2, 0x54, 0x45, 0xbf, - 0xb8, 0x59, 0xc2, 0x13, 0x8e, 0x30, 0xe6, 0x13, 0xce, 0xbc, 0x3d, 0x7d, 0xd0, 0x66, 0x8d, 0xdd, - 0x96, 0x22, 0x61, 0x4c, 0x81, 0xdc, 0x06, 0xe8, 0x83, 0xe8, 0x7c, 0x02, 0xa6, 0x72, 0x85, 0xae, - 0x16, 0x27, 0xe1, 0xe8, 0x51, 0xd2, 0xdb, 0x51, 0xed, 0x90, 0x49, 0xab, 0xff, 0x5c, 0x06, 0x53, - 0x49, 0xfe, 0xc7, 0xa5, 0xfe, 0x01, 0x98, 0x2a, 0x1e, 0xbd, 0x05, 0x8f, 0xee, 0xf4, 0x33, 0xfb, - 0xa9, 0x6d, 0x88, 0xd3, 0x28, 0xbb, 0x01, 0x6b, 0x38, 0xba, 0x57, 0xdc, 0xa0, 0xaa, 0x70, 0xa7, - 0x2f, 0xf4, 0x57, 0xb4, 0xad, 0xa9, 0x6a, 0x71, 0xa5, 0xde, 0x01, 0x53, 0x55, 0x1b, 0xdd, 0x80, - 0x33, 0x71, 0xe1, 0xe8, 0xe5, 0x65, 0x59, 0x9d, 0xe3, 0x40, 0x21, 0x74, 0x6d, 0x67, 0xb5, 0xdc, - 0xeb, 0x4f, 0x9f, 0x5b, 0xa5, 0x67, 0xcf, 0xad, 0xd2, 0x8b, 0xe7, 0x96, 0xf1, 0xe3, 0xd8, 0x32, - 0xfe, 0x18, 0x5b, 0xc6, 0xfe, 0xd8, 0x32, 0x9e, 0x8e, 0x2d, 0xe3, 0x9f, 0xb1, 0x65, 0xfc, 0x3b, - 0xb6, 0x4a, 0x2f, 0xc6, 0x96, 0xf1, 0xcb, 0x81, 0x55, 0x7a, 0x7a, 0x60, 0x95, 0x9e, 0x1d, 0x58, - 0xa5, 0xef, 0x8a, 0xff, 0x94, 0x77, 0x4d, 0x79, 0xab, 0xaf, 0xfe, 0x17, 0x00, 0x00, 0xff, 0xff, - 0xc9, 0x8e, 0x9e, 0x8e, 0xb7, 0x0b, 0x00, 0x00, + // 1242 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x57, 0x4d, 0x6f, 0x1b, 0xc5, + 0x1b, 0xf7, 0xc6, 0xf6, 0xc6, 0x79, 0x9c, 0x26, 0xfd, 0x4f, 0xfa, 0xe2, 0xf4, 0x5f, 0x76, 0xcd, + 0x0a, 0xa4, 0x20, 0xa8, 0x23, 0x52, 0x01, 0x02, 0x44, 0x45, 0x16, 0x0a, 0x69, 0xa1, 0xb4, 0x9d, + 0x54, 0x45, 0xe2, 0x52, 0x8d, 0xed, 0xa9, 0xb3, 0xc4, 0xfb, 0xd2, 0xd9, 0xd9, 0x26, 0xe6, 0xc4, + 0x99, 0x03, 0xe2, 0xcc, 0x8d, 0x1b, 0x07, 0x3e, 0x48, 0x0e, 0x1c, 0x7a, 0xac, 0x90, 0x58, 0x51, + 0xf7, 0x82, 0xf6, 0xd4, 0x8f, 0x80, 0xe6, 0x65, 0xd7, 0xeb, 0xc4, 0x49, 0xe8, 0x89, 0x8b, 0x33, + 0xf3, 0x3c, 0xbf, 0xe7, 0x75, 0x9e, 0x97, 0x0d, 0x34, 0x1f, 0x25, 0x94, 0x8d, 0x3a, 0x11, 0x0b, + 0x79, 0x88, 0x9a, 0x9c, 0x79, 0x51, 0x44, 0xd9, 0x1e, 0x61, 0xf4, 0xd2, 0xb9, 0x41, 0x38, 0x08, + 0x25, 0x7d, 0x5d, 0x9c, 0x14, 0xe4, 0x92, 0x35, 0x08, 0xc3, 0xc1, 0x90, 0xae, 0xcb, 0x5b, 0x37, + 0x79, 0xb8, 0xde, 0x4f, 0x18, 0xe1, 0x5e, 0x18, 0x68, 0xfe, 0xea, 0x61, 0x3e, 0x09, 0xb4, 0xf6, + 0x4b, 0xef, 0x0f, 0x3c, 0xbe, 0x93, 0x74, 0x3b, 0xbd, 0xd0, 0x5f, 0xef, 0x85, 0x8c, 0xd3, 0xfd, + 0x88, 0x85, 0xdf, 0xd2, 0x1e, 0xd7, 0xb7, 0xf5, 0x68, 0x77, 0x90, 0x33, 0xba, 0xfa, 0xa0, 0x44, + 0x9d, 0x1f, 0xaa, 0x80, 0xee, 0xb0, 0xd0, 0xa7, 0x7c, 0x87, 0x26, 0x31, 0xa6, 0x71, 0x14, 0x06, + 0x31, 0x45, 0x0e, 0x98, 0xdb, 0x9c, 0xf0, 0x24, 0x6e, 0x19, 0x6d, 0x63, 0x6d, 0xc1, 0x85, 0x2c, + 0xb5, 0xcd, 0x58, 0x52, 0xb0, 0xe6, 0xa0, 0xcf, 0xa1, 0xf6, 0x29, 0xe1, 0xa4, 0x35, 0xd7, 0x36, + 0xd6, 0x9a, 0x1b, 0xff, 0xef, 0x94, 0x42, 0xec, 0x4c, 0x54, 0x0a, 0x88, 0x7b, 0xe1, 0x20, 0xb5, + 0x2b, 0x59, 0x6a, 0x2f, 0xf5, 0x09, 0x27, 0x6f, 0x85, 0xbe, 0xc7, 0xa9, 0x1f, 0xf1, 0x11, 0x96, + 0x0a, 0xd0, 0x3b, 0xb0, 0x70, 0x9d, 0xb1, 0x90, 0xdd, 0x1b, 0x45, 0xb4, 0x55, 0x95, 0xf6, 0x2e, + 0x66, 0xa9, 0xbd, 0x42, 0x73, 0x62, 0x49, 0x62, 0x82, 0x44, 0x6f, 0x40, 0x5d, 0x5e, 0x5a, 0x35, + 0x29, 0xb2, 0x92, 0xa5, 0xf6, 0xb2, 0x14, 0x29, 0xc1, 0x15, 0x02, 0x7d, 0x06, 0xf3, 0x5b, 0x94, + 0xf4, 0x29, 0x8b, 0x5b, 0xf5, 0x76, 0x75, 0xad, 0xb9, 0xf1, 0xfa, 0x31, 0xde, 0xe6, 0x09, 0x50, + 0x68, 0xb7, 0x9e, 0xa5, 0xb6, 0x71, 0x05, 0xe7, 0xc2, 0x68, 0x03, 0x1a, 0x5f, 0x13, 0x16, 0x78, + 0xc1, 0x20, 0x6e, 0x99, 0xed, 0xea, 0xda, 0x82, 0x7b, 0x21, 0x4b, 0x6d, 0xb4, 0xa7, 0x69, 0x25, + 0xc3, 0x05, 0x4e, 0xb8, 0x79, 0x23, 0x78, 0x18, 0xc6, 0xad, 0x79, 0x29, 0x20, 0xdd, 0xf4, 0x04, + 0xa1, 0xec, 0xa6, 0x44, 0x38, 0x7f, 0x1a, 0xb0, 0x34, 0x9d, 0x39, 0xd4, 0x01, 0xc0, 0x34, 0x4e, + 0x86, 0x5c, 0x26, 0x47, 0x3d, 0xc6, 0x52, 0x96, 0xda, 0xc0, 0x0a, 0x2a, 0x2e, 0x21, 0xd0, 0x4d, + 0x30, 0xd5, 0x4d, 0x3f, 0x8b, 0x73, 0x4c, 0xa0, 0x77, 0x45, 0x71, 0x2a, 0xa4, 0xbb, 0xa4, 0x5f, + 0xc7, 0x54, 0x3a, 0xb1, 0xd6, 0x80, 0x6e, 0x43, 0x5d, 0x3c, 0x79, 0x2c, 0xdf, 0xa4, 0xb9, 0xf1, + 0xda, 0x29, 0x39, 0x13, 0x65, 0x11, 0xab, 0xf8, 0xa4, 0x58, 0x39, 0x3e, 0x49, 0x70, 0x76, 0x61, + 0xe9, 0x13, 0xd2, 0xdb, 0xa1, 0xfd, 0xa2, 0xce, 0x56, 0xa1, 0xba, 0x4b, 0x47, 0x3a, 0xae, 0xf9, + 0x2c, 0xb5, 0xc5, 0x15, 0x8b, 0x1f, 0x74, 0x0d, 0xe6, 0xe9, 0x3e, 0xa7, 0x01, 0x8f, 0x5b, 0x73, + 0xf2, 0xcd, 0x56, 0xa6, 0xec, 0x5f, 0x97, 0x3c, 0x77, 0x59, 0xfb, 0x9e, 0x63, 0x71, 0x7e, 0x70, + 0x7e, 0x33, 0xc0, 0x54, 0x20, 0x64, 0xcb, 0x40, 0x18, 0x97, 0x76, 0xaa, 0xee, 0x42, 0x96, 0xda, + 0x8a, 0x80, 0xd5, 0x1f, 0xe1, 0x06, 0x0d, 0xfa, 0x32, 0x65, 0x55, 0xe5, 0x06, 0x0d, 0xfa, 0x58, + 0xfc, 0xa0, 0x36, 0x34, 0x38, 0x23, 0x3d, 0xfa, 0xc0, 0xeb, 0xeb, 0x42, 0xcb, 0x8b, 0x42, 0x92, + 0x6f, 0xf4, 0xd1, 0x35, 0x68, 0x30, 0x1d, 0x4f, 0xab, 0x2e, 0x33, 0x75, 0xae, 0xa3, 0x7a, 0xb5, + 0x93, 0xf7, 0x6a, 0x67, 0x33, 0x18, 0xb9, 0x8b, 0x59, 0x6a, 0x17, 0x48, 0x5c, 0x9c, 0x6e, 0xd6, + 0x1a, 0xd5, 0xb3, 0x35, 0xe7, 0xe7, 0x39, 0x58, 0xdc, 0x26, 0x7e, 0x34, 0xa4, 0xdb, 0x9c, 0x51, + 0xe2, 0xa3, 0x7d, 0x30, 0x87, 0xa4, 0x4b, 0x87, 0xa2, 0x05, 0x55, 0xf8, 0x79, 0x07, 0x77, 0xbe, + 0x14, 0xf4, 0x3b, 0xc4, 0x63, 0xee, 0x17, 0x22, 0xfc, 0x3f, 0x52, 0xfb, 0xa5, 0x26, 0x80, 0x92, + 0xdf, 0xec, 0x93, 0x88, 0x53, 0x26, 0xde, 0xdd, 0xa7, 0x9c, 0x79, 0x3d, 0xac, 0xed, 0xa1, 0x0f, + 0x60, 0x3e, 0x96, 0x9e, 0xe4, 0x99, 0x3f, 0x3b, 0x31, 0xad, 0x5c, 0x9c, 0x94, 0xcc, 0x63, 0x32, + 0x4c, 0x68, 0x8c, 0x73, 0x01, 0x74, 0x0f, 0x60, 0xc7, 0x8b, 0x79, 0x38, 0x60, 0xc4, 0x17, 0x85, + 0x23, 0xc4, 0xdb, 0x53, 0x0f, 0xa7, 0x34, 0x6c, 0xe5, 0x20, 0x19, 0x06, 0xd2, 0xea, 0x4a, 0xb2, + 0xb8, 0x74, 0x76, 0xbe, 0x83, 0x95, 0x19, 0x62, 0xe8, 0x55, 0x58, 0xe4, 0x9e, 0x4f, 0x63, 0x4e, + 0xfc, 0xe8, 0x81, 0xaf, 0x66, 0x55, 0x15, 0x37, 0x0b, 0xda, 0xad, 0x18, 0x7d, 0x0c, 0x0b, 0x85, + 0x1e, 0xdd, 0x12, 0x97, 0x4f, 0x72, 0xc7, 0xad, 0x09, 0x57, 0xf0, 0x44, 0xc8, 0x79, 0x04, 0xcb, + 0x87, 0x30, 0xe8, 0x1c, 0xd4, 0x7b, 0x61, 0x12, 0xa8, 0x7a, 0x32, 0xb0, 0xba, 0xa0, 0xb3, 0x50, + 0x8d, 0x13, 0x65, 0xc4, 0xc0, 0xe2, 0x88, 0xde, 0x85, 0xf9, 0x6e, 0xd2, 0xdb, 0xa5, 0x3c, 0xcf, + 0xc4, 0xb4, 0xe9, 0x89, 0x51, 0x09, 0xc2, 0x39, 0xd8, 0x89, 0x61, 0xf9, 0x10, 0x0f, 0x59, 0x00, + 0xdd, 0x30, 0x09, 0xfa, 0x84, 0x79, 0x54, 0x05, 0x5a, 0xc7, 0x25, 0x8a, 0x70, 0x69, 0x18, 0xee, + 0x51, 0xa6, 0xcd, 0xab, 0x8b, 0xa0, 0x26, 0xc2, 0x9c, 0xec, 0x60, 0x03, 0xab, 0xcb, 0xc4, 0xfd, + 0x5a, 0xc9, 0x7d, 0xc7, 0x87, 0x8b, 0xc7, 0xf4, 0x34, 0xc2, 0x93, 0x82, 0x30, 0x64, 0x0a, 0xdf, + 0x3c, 0x6d, 0x14, 0x28, 0xb4, 0x9a, 0x08, 0x4d, 0xd1, 0x9e, 0x5a, 0xbe, 0x28, 0x14, 0xe7, 0x60, + 0x0e, 0xac, 0x93, 0x05, 0xd1, 0x6d, 0x38, 0xcf, 0x43, 0x4e, 0x86, 0x72, 0x56, 0x91, 0xee, 0x30, + 0xe7, 0xea, 0x36, 0x5e, 0xcd, 0x52, 0x7b, 0x36, 0x00, 0xcf, 0x26, 0xa3, 0x5f, 0x0c, 0xb8, 0x3c, + 0x93, 0x73, 0x87, 0xb2, 0x6d, 0x4e, 0x23, 0x5d, 0xee, 0x1f, 0x9e, 0x12, 0xdd, 0x61, 0x69, 0xe9, + 0xad, 0x56, 0xe1, 0xb6, 0xb3, 0xd4, 0x3e, 0xd1, 0x08, 0x3e, 0x91, 0x8b, 0xde, 0x86, 0x66, 0x44, + 0xc9, 0x6e, 0x1e, 0x6a, 0x55, 0x86, 0xba, 0x9c, 0xa5, 0x76, 0x99, 0x8c, 0xcb, 0x17, 0xc7, 0x83, + 0x97, 0x74, 0x52, 0x54, 0x80, 0x6c, 0x5c, 0xdd, 0x31, 0xea, 0x72, 0xa4, 0x9d, 0xe6, 0x8e, 0xb4, + 0x93, 0x73, 0x0f, 0x5a, 0xc7, 0x2d, 0x4b, 0xb4, 0x0a, 0xb5, 0xaf, 0x88, 0x9f, 0x2f, 0x29, 0x3d, + 0x25, 0x25, 0x09, 0xbd, 0x02, 0xe6, 0x7d, 0x39, 0x28, 0x64, 0x86, 0x0b, 0xa6, 0x26, 0x3a, 0xbf, + 0x1b, 0x70, 0x7e, 0xe6, 0x6a, 0x42, 0x57, 0xc0, 0x7c, 0x4c, 0x7b, 0x3c, 0x64, 0xba, 0xf0, 0xa6, + 0x77, 0xc0, 0x7d, 0xc9, 0xda, 0xaa, 0x60, 0x0d, 0x42, 0x97, 0xa1, 0xc1, 0xc8, 0x9e, 0x3b, 0xe2, + 0x54, 0x79, 0xbf, 0xb8, 0x55, 0xc1, 0x05, 0x45, 0x28, 0xf3, 0x09, 0x67, 0xde, 0xbe, 0x5e, 0x68, + 0xd3, 0xca, 0x6e, 0x49, 0x96, 0x50, 0xa6, 0x40, 0x02, 0x1e, 0xf7, 0xc8, 0x90, 0xa8, 0x0f, 0x8c, + 0xc3, 0xf0, 0x6d, 0xc9, 0x12, 0x70, 0x05, 0x72, 0x1b, 0xa0, 0xf7, 0xa7, 0xf3, 0x11, 0x98, 0xca, + 0x33, 0x74, 0xb5, 0xdc, 0x38, 0x47, 0x77, 0x98, 0x1e, 0xa6, 0x6a, 0xe4, 0x14, 0x9d, 0xf1, 0xe3, + 0x1c, 0x98, 0x8a, 0xf3, 0x1f, 0xee, 0x80, 0xf7, 0xc0, 0x54, 0xfe, 0xe8, 0xa1, 0x79, 0x74, 0x05, + 0x9c, 0x39, 0x48, 0x6d, 0x43, 0x6c, 0x52, 0x59, 0x3c, 0x58, 0xc3, 0xd1, 0xdd, 0xf2, 0xc0, 0x55, + 0x79, 0x3e, 0x7d, 0xfe, 0xff, 0x4f, 0xeb, 0x9a, 0x88, 0x96, 0x27, 0xf0, 0x6d, 0x30, 0xd5, 0xe3, + 0xa0, 0xeb, 0x70, 0x26, 0x2e, 0xed, 0xc8, 0x3c, 0x2d, 0xab, 0x33, 0x0c, 0x28, 0x84, 0xce, 0xed, + 0xb4, 0x94, 0xb3, 0x09, 0xa6, 0x7a, 0xbe, 0x7f, 0xb3, 0x41, 0x8a, 0x5e, 0xd1, 0x93, 0x55, 0x5e, + 0xdc, 0xcd, 0x27, 0xcf, 0xac, 0xca, 0xd3, 0x67, 0x56, 0xe5, 0xc5, 0x33, 0xcb, 0xf8, 0x7e, 0x6c, + 0x19, 0xbf, 0x8e, 0x2d, 0xe3, 0x60, 0x6c, 0x19, 0x4f, 0xc6, 0x96, 0xf1, 0xd7, 0xd8, 0x32, 0xfe, + 0x1e, 0x5b, 0x95, 0x17, 0x63, 0xcb, 0xf8, 0xe9, 0xb9, 0x55, 0x79, 0xf2, 0xdc, 0xaa, 0x3c, 0x7d, + 0x6e, 0x55, 0xbe, 0x29, 0xff, 0x1b, 0xd0, 0x35, 0xe5, 0xd7, 0xc1, 0xd5, 0x7f, 0x02, 0x00, 0x00, + 0xff, 0xff, 0x35, 0xca, 0x86, 0x18, 0x29, 0x0c, 0x00, 0x00, } func (this *PrometheusResponse) Equal(that interface{}) bool { @@ -1585,6 +1652,30 @@ func (this *PrometheusQueryResult_Matrix) Equal(that interface{}) bool { } return true } +func (this *PrometheusQueryResult_Scalar) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*PrometheusQueryResult_Scalar) + if !ok { + that2, ok := that.(PrometheusQueryResult_Scalar) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Scalar.Equal(that1.Scalar) { + return false + } + return true +} func (this *Vector) Equal(that interface{}) bool { if that == nil { return this == nil @@ -1678,6 +1769,33 @@ func (this *Matrix) Equal(that interface{}) bool { } return true } +func (this *Scalar) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Scalar) + if !ok { + that2, ok := that.(Scalar) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.TimestampMs != that1.TimestampMs { + return false + } + if this.Value != that1.Value { + return false + } + return true +} func (this *PrometheusResponse) GoString() string { if this == nil { return "nil" @@ -1856,7 +1974,7 @@ func (this *PrometheusQueryResult) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&tripperware.PrometheusQueryResult{") if this.Result != nil { s = append(s, "Result: "+fmt.Sprintf("%#v", this.Result)+",\n") @@ -1888,6 +2006,14 @@ func (this *PrometheusQueryResult_Matrix) GoString() string { `Matrix:` + fmt.Sprintf("%#v", this.Matrix) + `}`}, ", ") return s } +func (this *PrometheusQueryResult_Scalar) GoString() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&tripperware.PrometheusQueryResult_Scalar{` + + `Scalar:` + fmt.Sprintf("%#v", this.Scalar) + `}`}, ", ") + return s +} func (this *Vector) GoString() string { if this == nil { return "nil" @@ -1936,6 +2062,17 @@ func (this *Matrix) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *Scalar) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&tripperware.Scalar{") + s = append(s, "TimestampMs: "+fmt.Sprintf("%#v", this.TimestampMs)+",\n") + s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringQuery(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -2617,6 +2754,26 @@ func (m *PrometheusQueryResult_Matrix) MarshalToSizedBuffer(dAtA []byte) (int, e } return len(dAtA) - i, nil } +func (m *PrometheusQueryResult_Scalar) MarshalTo(dAtA []byte) (int, error) { + return m.MarshalToSizedBuffer(dAtA[:m.Size()]) +} + +func (m *PrometheusQueryResult_Scalar) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Scalar != nil { + { + size, err := m.Scalar.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + return len(dAtA) - i, nil +} func (m *Vector) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -2752,6 +2909,40 @@ func (m *Matrix) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *Scalar) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Scalar) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Scalar) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Value != 0 { + i -= 8 + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value)))) + i-- + dAtA[i] = 0x11 + } + if m.TimestampMs != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.TimestampMs)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func encodeVarintQuery(dAtA []byte, offset int, v uint64) int { offset -= sovQuery(v) base := offset @@ -3064,6 +3255,18 @@ func (m *PrometheusQueryResult_Matrix) Size() (n int) { } return n } +func (m *PrometheusQueryResult_Scalar) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Scalar != nil { + l = m.Scalar.Size() + n += 1 + l + sovQuery(uint64(l)) + } + return n +} func (m *Vector) Size() (n int) { if m == nil { return 0 @@ -3117,6 +3320,21 @@ func (m *Matrix) Size() (n int) { return n } +func (m *Scalar) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.TimestampMs != 0 { + n += 1 + sovQuery(uint64(m.TimestampMs)) + } + if m.Value != 0 { + n += 9 + } + return n +} + func sovQuery(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -3337,6 +3555,16 @@ func (this *PrometheusQueryResult_Matrix) String() string { }, "") return s } +func (this *PrometheusQueryResult_Scalar) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PrometheusQueryResult_Scalar{`, + `Scalar:` + strings.Replace(fmt.Sprintf("%v", this.Scalar), "Scalar", "Scalar", 1) + `,`, + `}`, + }, "") + return s +} func (this *Vector) String() string { if this == nil { return "nil" @@ -3379,6 +3607,17 @@ func (this *Matrix) String() string { }, "") return s } +func (this *Scalar) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Scalar{`, + `TimestampMs:` + fmt.Sprintf("%v", this.TimestampMs) + `,`, + `Value:` + fmt.Sprintf("%v", this.Value) + `,`, + `}`, + }, "") + return s +} func valueToStringQuery(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -5127,6 +5366,41 @@ func (m *PrometheusQueryResult) Unmarshal(dAtA []byte) error { } m.Result = &PrometheusQueryResult_Matrix{v} iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Scalar", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &Scalar{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Result = &PrometheusQueryResult_Scalar{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipQuery(dAtA[iNdEx:]) @@ -5484,6 +5758,89 @@ func (m *Matrix) Unmarshal(dAtA []byte) error { } return nil } +func (m *Scalar) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Scalar: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Scalar: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TimestampMs", wireType) + } + m.TimestampMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TimestampMs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var v uint64 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) + iNdEx += 8 + m.Value = float64(math.Float64frombits(v)) + default: + iNdEx = preIndex + skippy, err := skipQuery(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipQuery(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/querier/tripperware/query.proto b/pkg/querier/tripperware/query.proto index 013b73022a8..4b187e66f8c 100644 --- a/pkg/querier/tripperware/query.proto +++ b/pkg/querier/tripperware/query.proto @@ -93,6 +93,7 @@ message PrometheusQueryResult { Vector vector = 1; bytes rawBytes = 2; Matrix matrix = 3; + Scalar scalar = 4; } } @@ -109,3 +110,8 @@ message Sample { message Matrix { repeated SampleStream sampleStreams = 1 [(gogoproto.nullable) = false]; } + +message Scalar { + int64 timestamp_ms = 1; + double value = 2; +} diff --git a/pkg/querier/tripperware/query_test.go b/pkg/querier/tripperware/query_test.go index 08f149f43b0..32900f0ebc9 100644 --- a/pkg/querier/tripperware/query_test.go +++ b/pkg/querier/tripperware/query_test.go @@ -14,6 +14,41 @@ import ( "github.com/cortexproject/cortex/pkg/util" ) +func TestScalarJsonMarshalUnmarshal(t *testing.T) { + tests := []struct { + name string + scalar *Scalar + expected string + }{ + { + name: "empty scalar", + scalar: &Scalar{}, + expected: `[0,"0"]`, + }, + { + name: "normal scalar", + scalar: &Scalar{ + TimestampMs: 123, + Value: 456, + }, + expected: `[0.123,"456"]`, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + b, err := json.Marshal(test.scalar) + require.NoError(t, err) + require.Equal(t, test.expected, string(b)) + + var scalar Scalar + err = json.Unmarshal(b, &scalar) + require.NoError(t, err) + require.Equal(t, *test.scalar, scalar) + }) + } +} + // Same as https://github.com/prometheus/client_golang/blob/v1.19.1/api/prometheus/v1/api_test.go#L1577. func TestSampleHistogramPairJSONSerialization(t *testing.T) { tests := []struct { diff --git a/pkg/querier/tripperware/queryrange/marshaling_test.go b/pkg/querier/tripperware/queryrange/marshaling_test.go index 4652a0e10bd..d0efd0e8d4d 100644 --- a/pkg/querier/tripperware/queryrange/marshaling_test.go +++ b/pkg/querier/tripperware/queryrange/marshaling_test.go @@ -80,7 +80,7 @@ func BenchmarkPrometheusCodec_EncodeResponse(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - _, err := PrometheusCodec.EncodeResponse(context.Background(), res) + _, err := PrometheusCodec.EncodeResponse(context.Background(), nil, res) require.NoError(b, err) } } diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 46f1affb48a..a71dd57c1cc 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -229,7 +229,7 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return &resp, nil } -func (prometheusCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) { +func (prometheusCodec) EncodeResponse(ctx context.Context, _ *http.Request, res tripperware.Response) (*http.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") defer sp.Finish() diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index 256d2800ff6..780077274dc 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -307,7 +307,7 @@ func TestResponse(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), ContentLength: int64(len(tc.jsonBody)), } - resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp) + resp2, err := PrometheusCodec.EncodeResponse(context.Background(), nil, resp) require.NoError(t, err) assert.Equal(t, response, resp2) cancelCtx() @@ -431,7 +431,7 @@ func TestResponseWithStats(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), ContentLength: int64(len(tc.jsonBody)), } - resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp) + resp2, err := PrometheusCodec.EncodeResponse(context.Background(), nil, resp) require.NoError(t, err) assert.Equal(t, response, resp2) }) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 4d7e4a9a68a..028a1e16d85 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -276,7 +276,7 @@ func TestSplitByDay(t *testing.T) { mergedResponse, err := PrometheusCodec.MergeResponse(context.Background(), nil, parsedResponse, parsedResponse) require.NoError(t, err) - mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), mergedResponse) + mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), nil, mergedResponse) require.NoError(t, err) mergedHTTPResponseBody, err := io.ReadAll(mergedHTTPResponse.Body) diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index 4edcd51cc9b..73dad2f1184 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -205,7 +205,7 @@ func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) { return nil, err } - return q.codec.EncodeResponse(r.Context(), response) + return q.codec.EncodeResponse(r.Context(), r, response) } // Do implements Handler. diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 7e6cf474a0d..9cdb1b8de02 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -59,7 +59,7 @@ func (c mockCodec) DecodeRequest(_ context.Context, r *http.Request, _ []string) return mockRequest{}, nil } -func (c mockCodec) EncodeResponse(_ context.Context, resp Response) (*http.Response, error) { +func (c mockCodec) EncodeResponse(_ context.Context, _ *http.Request, resp Response) (*http.Response, error) { r := resp.(*mockResponse) return &http.Response{ Header: http.Header{ diff --git a/pkg/ruler/frontend_client.go b/pkg/ruler/frontend_client.go index e75997ce1aa..a3f697f1bba 100644 --- a/pkg/ruler/frontend_client.go +++ b/pkg/ruler/frontend_client.go @@ -22,22 +22,29 @@ const ( orgIDHeader = "X-Scope-OrgID" instantQueryPath = "/api/v1/query" mimeTypeForm = "application/x-www-form-urlencoded" - contentTypeJSON = "application/json" ) +var jsonDecoder JsonDecoder +var protobufDecoder ProtobufDecoder + type FrontendClient struct { client httpgrpc.HTTPClient timeout time.Duration prometheusHTTPPrefix string - jsonDecoder JsonDecoder + queryResponseFormat string + decoders map[string]Decoder } -func NewFrontendClient(client httpgrpc.HTTPClient, timeout time.Duration, prometheusHTTPPrefix string) *FrontendClient { +func NewFrontendClient(client httpgrpc.HTTPClient, timeout time.Duration, prometheusHTTPPrefix, queryResponseFormat string) *FrontendClient { return &FrontendClient{ client: client, timeout: timeout, prometheusHTTPPrefix: prometheusHTTPPrefix, - jsonDecoder: JsonDecoder{}, + queryResponseFormat: queryResponseFormat, + decoders: map[string]Decoder{ + jsonDecoder.ContentType(): jsonDecoder, + protobufDecoder.ContentType(): protobufDecoder, + }, } } @@ -55,6 +62,14 @@ func (p *FrontendClient) makeRequest(ctx context.Context, qs string, ts time.Tim return nil, err } + acceptHeader := "" + switch p.queryResponseFormat { + case queryResponseFormatJson: + acceptHeader = jsonDecoder.ContentType() + case queryResponseFormatProtobuf: + acceptHeader = fmt.Sprintf("%s,%s", protobufDecoder.ContentType(), jsonDecoder.ContentType()) + } + req := &httpgrpc.HTTPRequest{ Method: http.MethodPost, Url: p.prometheusHTTPPrefix + instantQueryPath, @@ -63,7 +78,7 @@ func (p *FrontendClient) makeRequest(ctx context.Context, qs string, ts time.Tim {Key: textproto.CanonicalMIMEHeaderKey("User-Agent"), Values: []string{fmt.Sprintf("Cortex/%s", version.Version)}}, {Key: textproto.CanonicalMIMEHeaderKey("Content-Type"), Values: []string{mimeTypeForm}}, {Key: textproto.CanonicalMIMEHeaderKey("Content-Length"), Values: []string{strconv.Itoa(len(body))}}, - {Key: textproto.CanonicalMIMEHeaderKey("Accept"), Values: []string{contentTypeJSON}}, + {Key: textproto.CanonicalMIMEHeaderKey("Accept"), Values: []string{acceptHeader}}, {Key: textproto.CanonicalMIMEHeaderKey(orgIDHeader), Values: []string{orgID}}, }, } @@ -91,7 +106,15 @@ func (p *FrontendClient) InstantQuery(ctx context.Context, qs string, t time.Tim return nil, err } - vector, warning, err := p.jsonDecoder.Decode(resp.Body) + contentType := extractHeader(resp.Headers, "Content-Type") + decoder, ok := p.decoders[contentType] + if !ok { + err = fmt.Errorf("unknown content type: %s", contentType) + level.Error(log).Log("err", err, "query", qs) + return nil, err + } + + vector, warning, err := decoder.Decode(resp.Body) if err != nil { level.Error(log).Log("err", err, "query", qs) return nil, err @@ -103,3 +126,13 @@ func (p *FrontendClient) InstantQuery(ctx context.Context, qs string, t time.Tim return vector, nil } + +func extractHeader(headers []*httpgrpc.Header, target string) string { + for _, h := range headers { + if h.Key == target && len(h.Values) > 0 { + return h.Values[0] + } + } + + return "" +} diff --git a/pkg/ruler/frontend_client_pool.go b/pkg/ruler/frontend_client_pool.go index b9b512cfab1..7b131621aa6 100644 --- a/pkg/ruler/frontend_client_pool.go +++ b/pkg/ruler/frontend_client_pool.go @@ -20,6 +20,7 @@ import ( type frontendPool struct { timeout time.Duration + queryResponseFormat string prometheusHTTPPrefix string grpcConfig grpcclient.Config @@ -29,6 +30,7 @@ type frontendPool struct { func newFrontendPool(cfg Config, log log.Logger, reg prometheus.Registerer) *client.Pool { p := &frontendPool{ timeout: cfg.FrontendTimeout, + queryResponseFormat: cfg.QueryResponseFormat, prometheusHTTPPrefix: cfg.PrometheusHTTPPrefix, grpcConfig: cfg.GRPCClientConfig, frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -68,7 +70,7 @@ func (f *frontendPool) createFrontendClient(addr string) (client.PoolClient, err } return &frontendClient{ - FrontendClient: NewFrontendClient(httpgrpc.NewHTTPClient(conn), f.timeout, f.prometheusHTTPPrefix), + FrontendClient: NewFrontendClient(httpgrpc.NewHTTPClient(conn), f.timeout, f.prometheusHTTPPrefix, f.queryResponseFormat), HealthClient: grpc_health_v1.NewHealthClient(conn), }, nil } diff --git a/pkg/ruler/frontend_client_test.go b/pkg/ruler/frontend_client_test.go index 104780f58c5..627c81a244d 100644 --- a/pkg/ruler/frontend_client_test.go +++ b/pkg/ruler/frontend_client_test.go @@ -28,7 +28,7 @@ func TestTimeout(t *testing.T) { } ctx := context.Background() ctx = user.InjectOrgID(ctx, "userID") - frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "json") _, err := frontendClient.InstantQuery(ctx, "query", time.Now()) require.Equal(t, context.DeadlineExceeded, err) } @@ -37,7 +37,7 @@ func TestNoOrgId(t *testing.T) { mockClientFn := func(ctx context.Context, _ *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { return nil, nil } - frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "json") _, err := frontendClient.InstantQuery(context.Background(), "query", time.Now()) require.Equal(t, user.ErrNoOrgID, err) } @@ -148,10 +148,46 @@ func TestInstantQuery(t *testing.T) { } ctx := context.Background() ctx = user.InjectOrgID(ctx, "userID") - frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "json") vector, err := frontendClient.InstantQuery(ctx, "query", time.Now()) require.Equal(t, test.expected, vector) require.Equal(t, test.expectedErr, err) }) } } + +func Test_extractHeader(t *testing.T) { + tests := []struct { + description string + headers []*httpgrpc.Header + expectedOutput string + }{ + { + description: "protobuf", + headers: []*httpgrpc.Header{ + { + Key: "Content-Type", + Values: []string{"application/x-protobuf"}, + }, + }, + expectedOutput: "application/x-protobuf", + }, + { + description: "json", + headers: []*httpgrpc.Header{ + { + Key: "Content-Type", + Values: []string{"application/json"}, + }, + }, + expectedOutput: "application/json", + }, + } + + target := "Content-Type" + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + require.Equal(t, test.expectedOutput, extractHeader(test.headers, target)) + }) + } +} diff --git a/pkg/ruler/frontend_decoder.go b/pkg/ruler/frontend_decoder.go index c8e653a05f0..48902f47ac0 100644 --- a/pkg/ruler/frontend_decoder.go +++ b/pkg/ruler/frontend_decoder.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util/api" ) @@ -18,8 +19,18 @@ const ( ) type JsonDecoder struct{} +type ProtobufDecoder struct{} type Warning []string +type Decoder interface { + Decode(body []byte) (promql.Vector, Warning, error) + ContentType() string +} + +func (j JsonDecoder) ContentType() string { + return "application/json" +} + func (j JsonDecoder) Decode(body []byte) (promql.Vector, Warning, error) { var response api.Response @@ -86,3 +97,51 @@ func (j JsonDecoder) scalarToPromQLVector(scalar model.Scalar) promql.Vector { Metric: labels.Labels{}, }} } + +func (p ProtobufDecoder) ContentType() string { + return "application/x-protobuf" +} + +func (p ProtobufDecoder) Decode(body []byte) (promql.Vector, Warning, error) { + resp := tripperware.PrometheusResponse{} + if err := resp.Unmarshal(body); err != nil { + return nil, nil, err + } + + switch data := resp.Data.Result.Result.(type) { + case *tripperware.PrometheusQueryResult_Scalar: + return p.scalarToPromQLVector(data.Scalar), nil, nil + case *tripperware.PrometheusQueryResult_Vector: + return p.vectorToPromQLVector(data.Vector), nil, nil + default: + return nil, nil, errors.New("rule result is not a vector or scalar") + } +} + +func (p ProtobufDecoder) vectorToPromQLVector(vector *tripperware.Vector) promql.Vector { + v := make([]promql.Sample, 0, len(vector.Samples)) + for _, sample := range vector.Samples { + metric := make([]labels.Label, 0, len(sample.Labels)) + for _, lb := range sample.Labels { + metric = append(metric, labels.Label{ + Name: lb.Name, + Value: lb.Value, + }) + } + v = append(v, promql.Sample{ + T: sample.Sample.TimestampMs, + F: sample.Sample.Value, + Metric: metric, + }) + } + + return v +} + +func (p ProtobufDecoder) scalarToPromQLVector(scalar *tripperware.Scalar) promql.Vector { + return promql.Vector{promql.Sample{ + T: scalar.TimestampMs, + F: scalar.Value, + Metric: labels.Labels{}, + }} +} diff --git a/pkg/ruler/frontend_decoder_test.go b/pkg/ruler/frontend_decoder_test.go index cc24fe732ba..34ac5860f03 100644 --- a/pkg/ruler/frontend_decoder_test.go +++ b/pkg/ruler/frontend_decoder_test.go @@ -10,8 +10,6 @@ import ( ) func TestDecode(t *testing.T) { - jsonDecoder := JsonDecoder{} - tests := []struct { description string body string diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index d77b4d0a413..5094e4d305d 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -47,10 +47,13 @@ import ( var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} + supportedQueryResponseFormats = []string{queryResponseFormatJson, queryResponseFormatProtobuf} + // Validation errors. - errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0") + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0") + errInvalidQueryResponseFormat = errors.New("invalid query response format") ) const ( @@ -82,6 +85,10 @@ const ( unknownHealthFilter string = "unknown" okHealthFilter string = "ok" errHealthFilter string = "err" + + // query response formats + queryResponseFormatJson = "json" + queryResponseFormatProtobuf = "protobuf" ) type DisabledRuleGroupErr struct { @@ -96,6 +103,8 @@ func (e *DisabledRuleGroupErr) Error() string { type Config struct { // This is used for query to query frontend to evaluate rules FrontendAddress string `yaml:"frontend_address"` + // Query response format of query frontend for evaluating rules + QueryResponseFormat string `yaml:"query_response_format"` // HTTP timeout duration when querying to query frontend to evaluate rules FrontendTimeout time.Duration `yaml:"-"` // Query frontend GRPC Client configuration. @@ -185,6 +194,10 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error { if cfg.ConcurrentEvalsEnabled && cfg.MaxConcurrentEvals <= 0 { return errInvalidMaxConcurrentEvals } + + if !util.StringsContain(supportedQueryResponseFormats, cfg.QueryResponseFormat) { + return errInvalidQueryResponseFormat + } return nil } @@ -207,6 +220,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flagext.DeprecatedFlag(f, "ruler.alertmanager-use-v2", "This flag is no longer functional. V1 API is deprecated and removed", util_log.Logger) f.StringVar(&cfg.FrontendAddress, "ruler.frontend-address", "", "[Experimental] GRPC listen address of the Query Frontend, in host:port format. If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries to Ingesters directly.") + f.StringVar(&cfg.QueryResponseFormat, "ruler.query-response-format", queryResponseFormatProtobuf, fmt.Sprintf("[Experimental] Query response format to get query results from Query Frontend when the rule evaluation. Supported values: %s", strings.Join(supportedQueryResponseFormats, ","))) cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.") f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 1*time.Minute, "How frequently to evaluate rules")