Skip to content

Commit

Permalink
Allow ruler to retrieve proto format query response (cortexproject#6345)
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 authored Dec 31, 2024
1 parent 4953086 commit e476001
Show file tree
Hide file tree
Showing 22 changed files with 791 additions and 123 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
* [CHANGE] Update the `cortex_ingester_inflight_push_requests` metric to represent the maximum number of inflight requests recorded in the last minute. #6437
* [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4326,6 +4326,12 @@ The `ruler_config` configures the Cortex ruler.
# CLI flag: -ruler.frontend-address
[frontend_address: <string> | default = ""]
# [Experimental] Query response format to get query results from Query Frontend
# when the rule evaluation. It will only take effect when
# `-ruler.frontend-address` is configured. Supported values: json,protobuf
# CLI flag: -ruler.query-response-format
[query_response_format: <string> | default = "protobuf"]

frontend_client:
# gRPC client max receive message size (bytes).
# CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size
Expand Down
4 changes: 3 additions & 1 deletion docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ Cortex is an actively developed project and we want to encourage the introductio

Currently experimental features are:

- Ruler: Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address` )
- Ruler
- Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address`).
- When `-ruler.frontend-address` is specified, the response format can be specified (via `-ruler.query-response-format`).
- S3 Server Side Encryption (SSE) using KMS (including per-tenant KMS config overrides).
- Azure blob storage.
- Zone awareness based replication.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/google/go-cmp v0.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/sercand/kuberesolver/v5 v5.1.1
github.com/tjhop/slog-gokit v0.1.2
go.opentelemetry.io/collector/pdata v1.22.0
Expand Down Expand Up @@ -188,7 +189,6 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/oklog/run v1.1.0 // indirect
Expand Down
88 changes: 55 additions & 33 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,42 +1670,64 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(ruler, querier))

c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)
for _, format := range []string{"protobuf", "json"} {
t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) {
queryFrontendFlag := mergeFlags(flags, map[string]string{
"-ruler.query-response-format": format,
})
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "")
require.NoError(t, s.Start(queryFrontend))

expression := "metric"
groupName := "rule_group"
ruleName := "rule_name"
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(queryFrontendFlag, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

rgMatcher := ruleGroupMatcher(user, namespace, groupName)
// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
rulerFlag := mergeFlags(queryFrontendFlag, map[string]string{
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
})
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), rulerFlag, "")
require.NoError(t, s.StartAndWaitReady(ruler))

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
// Check that cortex_ruler_query_frontend_clients went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
t.Cleanup(func() {
_ = s.Stop(ruler)
_ = s.Stop(queryFrontend)
_ = s.Stop(querier)
})

c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

expression := "metric" // vector
//expression := "scalar(count(up == 1)) > bool 1" // scalar
groupName := "rule_group"
ruleName := "rule_name"
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))

rgMatcher := ruleGroupMatcher(user, namespace, groupName)
// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Make sure not to fail
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
sourceMatcher := labels.MustNewMatcher(labels.MatchEqual, "source", "ruler")
// Check that cortex_ruler_query_frontend_clients went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_query_frontend_queries_total went up
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
})
}
}

func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/codec/protobuf_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func (p ProtobufCodec) ContentType() v1.MIMEType {
if !p.CortexInternal {
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
}
// TODO: switch to use constants.
return v1.MIMEType{Type: "application", SubType: "x-cortex-query+proto"}

return v1.MIMEType{Type: "application", SubType: tripperware.QueryResponseCortexMIMESubType}
}

func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {
Expand Down
41 changes: 7 additions & 34 deletions pkg/querier/codec/protobuf_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
expected *tripperware.PrometheusResponse
}{
{
name: "vector",
data: &v1.QueryData{
ResultType: parser.ValueTypeVector,
Result: promql.Vector{
Expand Down Expand Up @@ -89,6 +90,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
name: "scalar",
data: &v1.QueryData{
ResultType: parser.ValueTypeScalar,
Result: promql.Scalar{T: 1000, V: 1},
Expand All @@ -106,6 +108,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
name: "matrix",
data: &v1.QueryData{
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
Expand Down Expand Up @@ -151,39 +154,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
data: &v1.QueryData{
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 1, T: 1000}},
Metric: labels.FromStrings("__name__", "foo"),
},
},
},
expected: &tripperware.PrometheusResponse{
Status: tripperware.StatusSuccess,
Data: tripperware.PrometheusData{
ResultType: model.ValMatrix.String(),
Result: tripperware.PrometheusQueryResult{
Result: &tripperware.PrometheusQueryResult_Matrix{
Matrix: &tripperware.Matrix{
SampleStreams: []tripperware.SampleStream{
{
Labels: []cortexpb.LabelAdapter{
{Name: "__name__", Value: "foo"},
},
Samples: []cortexpb.Sample{
{Value: 1, TimestampMs: 1000},
},
},
},
},
},
},
},
},
},
{
name: "matrix with multiple float samples",
data: &v1.QueryData{
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
Expand Down Expand Up @@ -227,6 +198,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
name: "matrix with histogram and not cortex internal",
data: &v1.QueryData{
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
Expand Down Expand Up @@ -316,6 +288,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
name: "vector with histogram and not cortex internal",
data: &v1.QueryData{
ResultType: parser.ValueTypeVector,
Result: promql.Vector{
Expand Down Expand Up @@ -404,7 +377,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
name: "cortex internal with native histogram",
name: "vector with histogram and cortex internal",
cortexInternal: true,
data: &v1.QueryData{
ResultType: parser.ValueTypeVector,
Expand Down
50 changes: 40 additions & 10 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"time"

jsoniter "github.com/json-iterator/go"
"github.com/munnerz/goautoneg"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc/status"

Expand All @@ -29,6 +31,9 @@ var (
SortMapKeys: true,
ValidateJsonRawMessage: false,
}.Froze()

rulerMIMEType = v1.MIMEType{Type: "application", SubType: tripperware.QueryResponseCortexMIMESubType}
jsonMIMEType = v1.MIMEType{Type: "application", SubType: "json"}
)

type instantQueryCodec struct {
Expand Down Expand Up @@ -68,12 +73,18 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
result.Stats = r.FormValue("stats")
result.Path = r.URL.Path

// Include the specified headers from http request in prometheusRequest.
for _, header := range forwardHeaders {
for h, hv := range r.Header {
if strings.EqualFold(h, header) {
result.Headers[h] = hv
break
isSourceRuler := strings.Contains(r.Header.Get("User-Agent"), tripperware.RulerUserAgent)
if isSourceRuler {
// When the source is the Ruler, then forward whole headers
result.Headers = r.Header
} else {
// Include the specified headers from http request in prometheusRequest.
for _, header := range forwardHeaders {
for h, hv := range r.Header {
if strings.EqualFold(h, header) {
result.Headers[h] = hv
break
}
}
}
}
Expand Down Expand Up @@ -155,7 +166,11 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
}
}

tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression)
isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent)
if !isSourceRuler {
// When the source is the Ruler, skip set header
tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression)
}

req := &http.Request{
Method: "GET",
Expand All @@ -168,7 +183,7 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
return req.WithContext(ctx), nil
}

func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) {
func (c instantQueryCodec) EncodeResponse(ctx context.Context, req *http.Request, res tripperware.Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()

Expand All @@ -180,7 +195,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
queryStats := stats.FromContext(ctx)
tripperware.SetQueryResponseStats(a, queryStats)

b, err := json.Marshal(a)
contentType, b, err := marshalResponse(a, req.Header.Get("Accept"))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)
}
Expand All @@ -189,7 +204,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res

resp := http.Response{
Header: http.Header{
"Content-Type": []string{tripperware.ApplicationJson},
"Content-Type": []string{contentType},
},
Body: io.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
Expand Down Expand Up @@ -217,3 +232,18 @@ func decorateWithParamName(err error, field string) error {
}
return fmt.Errorf(errTmpl, field, err)
}

func marshalResponse(resp *tripperware.PrometheusResponse, acceptHeader string) (string, []byte, error) {
for _, clause := range goautoneg.ParseAccept(acceptHeader) {
if jsonMIMEType.Satisfies(clause) {
b, err := json.Marshal(resp)
return tripperware.ApplicationJson, b, err
} else if rulerMIMEType.Satisfies(clause) {
b, err := resp.Marshal()
return tripperware.QueryResponseCortexMIMEType, b, err
}
}

b, err := json.Marshal(resp)
return tripperware.ApplicationJson, b, err
}
Loading

0 comments on commit e476001

Please sign in to comment.