Skip to content

Commit

Permalink
Allow ruler to retrieve proto format query response
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 18, 2024
1 parent 7fb98ab commit 872120e
Show file tree
Hide file tree
Showing 24 changed files with 972 additions and 154 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [CHANGE] Change all max async concurrency default values `50` to `3` #6268
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
* [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4252,6 +4252,11 @@ The `ruler_config` configures the Cortex ruler.
# CLI flag: -ruler.frontend-address
[frontend_address: <string> | default = ""]
# [Experimental] Query response format to get query results from Query Frontend
# when the rule evaluation. Supported values: json,protobuf
# CLI flag: -ruler.query-response-format
[query_response_format: <string> | default = "protobuf"]
frontend_client:
# gRPC client max receive message size (bytes).
# CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size
Expand Down
85 changes: 52 additions & 33 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,42 +1670,61 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(ruler, querier))

c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)
for _, format := range []string{"protobuf", "json"} {
t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) {
queryFrontendFlag := mergeFlags(flags, map[string]string{
"-ruler.query-response-format": format,
})
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "")
require.NoError(t, s.Start(queryFrontend))

expression := "metric"
groupName := "rule_group"
ruleName := "rule_name"
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(queryFrontendFlag, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

rgMatcher := ruleGroupMatcher(user, namespace, groupName)
// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
rulerFlag := mergeFlags(queryFrontendFlag, map[string]string{
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
})
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), rulerFlag, "")
require.NoError(t, s.StartAndWaitReady(ruler))

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
// Check that cortex_ruler_query_frontend_clients went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
t.Cleanup(func() {
_ = s.Stop(ruler)
_ = s.Stop(queryFrontend)
_ = s.Stop(querier)
})

c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

expression := "metric" // vector
//expression := "scalar(count(up == 1)) > bool 1" // scalar
groupName := "rule_group"
ruleName := "rule_name"
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))

rgMatcher := ruleGroupMatcher(user, namespace, groupName)
// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Make sure not to fail
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
// Check that cortex_ruler_query_frontend_clients went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
})
}
}

func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec, t.Cfg.Ruler.QueryResponseFormat)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
Expand Down
14 changes: 14 additions & 0 deletions pkg/querier/codec/protobuf_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusRe
Samples: *getVectorSamples(data),
},
}
case model.ValScalar.String():
scalar := getScalar(data)
queryResult.Result = &tripperware.PrometheusQueryResult_Scalar{
Scalar: &scalar,
}
default:
json := jsoniter.ConfigCompatibleWithStandardLibrary
rawBytes, err := json.Marshal(data)
Expand Down Expand Up @@ -141,6 +146,15 @@ func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample {
return &vectorSamples
}

func getScalar(data *v1.QueryData) tripperware.Scalar {
s := data.Result.(promql.Scalar)

return tripperware.Scalar{
Value: s.V,
TimestampMs: s.T,
}
}

func getStats(builtin *stats.BuiltinStats) *tripperware.PrometheusResponseSamplesStats {
queryableSamplesStatsPerStepLen := len(builtin.Samples.TotalQueryableSamplesPerStep)
queryableSamplesStatsPerStep := make([]*tripperware.PrometheusResponseQueryableSamplesStatsPerStep, queryableSamplesStatsPerStepLen)
Expand Down
93 changes: 93 additions & 0 deletions pkg/querier/codec/protobug_codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package codec

import (
"testing"

jsoniter "github.com/json-iterator/go"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/stretchr/testify/require"
)

var (
json = jsoniter.Config{
EscapeHTML: false, // No HTML in our responses.
SortMapKeys: true,
ValidateJsonRawMessage: false,
}.Froze()
)

func TestPrometheusResponse_JsonMarshal(t *testing.T) {
var tests = []struct {
name string
resp *v1.Response
expectedJson string
}{
{
name: "scalar",
resp: &v1.Response{
Status: "success",
Data: &v1.QueryData{
ResultType: "scalar",
Result: promql.Scalar{T: 1000, V: 2},
},
},
expectedJson: `{"status":"success","data":{"resultType":"scalar","result":[1,"2"]}}`,
},
{
name: "vector",
resp: &v1.Response{
Status: "success",
Data: &v1.QueryData{
ResultType: "vector",
Result: promql.Vector{
{
Metric: labels.FromStrings("name", "value"),
T: 1234,
F: 5.67,
},
},
},
},
expectedJson: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"name":"value"},"value":[1.234,"5.67"]}]}}`,
},
{
name: "matrix",
resp: &v1.Response{
Status: "success",
Data: &v1.QueryData{
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
{
Metric: labels.FromStrings("name1", "value1"),
Floats: []promql.FPoint{
{T: 12, F: 3.4},
{T: 56, F: 7.8},
},
},
{
Metric: labels.FromStrings("name2", "value2"),
Floats: []promql.FPoint{
{T: 12, F: 3.4},
{T: 56, F: 7.8},
},
},
},
},
},
expectedJson: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"name1":"value1"},"values":[[0.012,"3.4"],[0.056,"7.8"]]},{"metric":{"name2":"value2"},"values":[[0.012,"3.4"],[0.056,"7.8"]]}]}}`,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
prometheusResponse, err := createPrometheusQueryResponse(test.resp)
require.NoError(t, err)

b, err := json.Marshal(prometheusResponse)
require.NoError(t, err)
require.Equal(t, test.expectedJson, string(b))
})
}
}
52 changes: 42 additions & 10 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"time"

jsoniter "github.com/json-iterator/go"
"github.com/munnerz/goautoneg"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc/status"

Expand All @@ -28,16 +30,20 @@ var (
SortMapKeys: true,
ValidateJsonRawMessage: false,
}.Froze()

protobufMIMEType = v1.MIMEType{Type: "application", SubType: "x-protobuf"}
jsonMIMEType = v1.MIMEType{Type: "application", SubType: "json"}
)

type instantQueryCodec struct {
tripperware.Codec
compression tripperware.Compression
defaultCodecType tripperware.CodecType
now func() time.Time
compression tripperware.Compression
defaultCodecType tripperware.CodecType
now func() time.Time
rulerQueryResponseFormat string
}

func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) instantQueryCodec {
func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string, rulerQueryResponseFormat string) instantQueryCodec {
compression := tripperware.NonCompression // default
if compressionStr == string(tripperware.GzipCompression) {
compression = tripperware.GzipCompression
Expand All @@ -49,9 +55,10 @@ func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) ins
}

return instantQueryCodec{
compression: compression,
defaultCodecType: defaultCodecType,
now: time.Now,
compression: compression,
defaultCodecType: defaultCodecType,
now: time.Now,
rulerQueryResponseFormat: rulerQueryResponseFormat,
}
}

Expand Down Expand Up @@ -114,6 +121,12 @@ func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _
if resp.Data.Result.GetVector().Samples == nil {
resp.Data.Result.GetVector().Samples = []tripperware.Sample{}
}
case model.ValScalar.String():
if resp.Data.Result.GetScalar() == nil {
resp.Data.Result.Result = &tripperware.PrometheusQueryResult_Scalar{
Scalar: &tripperware.Scalar{},
}
}
}

if resp.Headers == nil {
Expand Down Expand Up @@ -167,7 +180,7 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
return req.WithContext(ctx), nil
}

func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) {
func (c instantQueryCodec) EncodeResponse(ctx context.Context, req *http.Request, res tripperware.Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()

Expand All @@ -176,7 +189,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format")
}

b, err := json.Marshal(a)
contentType, b, err := marshalResponse(a, req.Header.Get("Accept"))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)
}
Expand All @@ -185,7 +198,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res

resp := http.Response{
Header: http.Header{
"Content-Type": []string{tripperware.ApplicationJson},
"Content-Type": []string{contentType},
},
Body: io.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
Expand Down Expand Up @@ -213,3 +226,22 @@ func decorateWithParamName(err error, field string) error {
}
return fmt.Errorf(errTmpl, field, err)
}

func marshalResponse(resp *tripperware.PrometheusResponse, acceptHeader string) (string, []byte, error) {
if acceptHeader == "" {
b, err := json.Marshal(resp)
return tripperware.ApplicationJson, b, err
}

for _, clause := range goautoneg.ParseAccept(acceptHeader) {
if jsonMIMEType.Satisfies(clause) {
b, err := json.Marshal(resp)
return tripperware.ApplicationJson, b, err
} else if protobufMIMEType.Satisfies(clause) {
b, err := resp.Marshal()
return tripperware.ApplicationProtobuf, b, err
}
}

return "", nil, fmt.Errorf("failed to marshal response with accept header: %s", acceptHeader)
}
Loading

0 comments on commit 872120e

Please sign in to comment.