Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ruler to retrieve proto format query response #6345

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
* [CHANGE] Update the `cortex_ingester_inflight_push_requests` metric to represent the maximum number of inflight requests recorded in the last minute. #6437
* [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4317,6 +4317,12 @@ The `ruler_config` configures the Cortex ruler.
# CLI flag: -ruler.frontend-address
[frontend_address: <string> | default = ""]

# [Experimental] Query response format to get query results from Query Frontend
# when the rule evaluation. It will only take effect when
# `-ruler.frontend-address` is configured. Supported values: json,protobuf
# CLI flag: -ruler.query-response-format
[query_response_format: <string> | default = "protobuf"]

frontend_client:
# gRPC client max receive message size (bytes).
# CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size
Expand Down
4 changes: 3 additions & 1 deletion docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ Cortex is an actively developed project and we want to encourage the introductio

Currently experimental features are:

- Ruler: Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address` )
- Ruler
- Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address`).
- When `-ruler.frontend-address` is specified, the response format can be specified (via `-ruler.query-response-format`).
- S3 Server Side Encryption (SSE) using KMS (including per-tenant KMS config overrides).
- Azure blob storage.
- Zone awareness based replication.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/google/go-cmp v0.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/sercand/kuberesolver/v5 v5.1.1
github.com/tjhop/slog-gokit v0.1.2
go.opentelemetry.io/collector/pdata v1.22.0
Expand Down Expand Up @@ -189,7 +190,6 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/oklog/run v1.1.0 // indirect
Expand Down
88 changes: 55 additions & 33 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,42 +1670,64 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(ruler, querier))

c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)
for _, format := range []string{"protobuf", "json"} {
t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) {
queryFrontendFlag := mergeFlags(flags, map[string]string{
"-ruler.query-response-format": format,
})
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "")
require.NoError(t, s.Start(queryFrontend))

expression := "metric"
groupName := "rule_group"
ruleName := "rule_name"
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(queryFrontendFlag, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

rgMatcher := ruleGroupMatcher(user, namespace, groupName)
// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
rulerFlag := mergeFlags(queryFrontendFlag, map[string]string{
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
})
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), rulerFlag, "")
require.NoError(t, s.StartAndWaitReady(ruler))

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
// Check that cortex_ruler_query_frontend_clients went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
t.Cleanup(func() {
_ = s.Stop(ruler)
_ = s.Stop(queryFrontend)
_ = s.Stop(querier)
})

c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

expression := "metric" // vector
//expression := "scalar(count(up == 1)) > bool 1" // scalar
groupName := "rule_group"
ruleName := "rule_name"
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))

rgMatcher := ruleGroupMatcher(user, namespace, groupName)
// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Make sure not to fail
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
sourceMatcher := labels.MustNewMatcher(labels.MatchEqual, "source", "ruler")
// Check that cortex_ruler_query_frontend_clients went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_query_frontend_queries_total went up
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
})
}
}

func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/codec/protobuf_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func (p ProtobufCodec) ContentType() v1.MIMEType {
if !p.CortexInternal {
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
}
// TODO: switch to use constants.
return v1.MIMEType{Type: "application", SubType: "x-cortex-query+proto"}

return v1.MIMEType{Type: "application", SubType: tripperware.QueryResponseCortexMIMESubType}
}

func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {
Expand Down
41 changes: 7 additions & 34 deletions pkg/querier/codec/protobuf_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
expected *tripperware.PrometheusResponse
}{
{
name: "vector",
data: &v1.QueryData{
ResultType: parser.ValueTypeVector,
Result: promql.Vector{
Expand Down Expand Up @@ -89,6 +90,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
name: "scalar",
data: &v1.QueryData{
ResultType: parser.ValueTypeScalar,
Result: promql.Scalar{T: 1000, V: 1},
Expand All @@ -106,6 +108,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
name: "matrix",
data: &v1.QueryData{
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
Expand Down Expand Up @@ -151,39 +154,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
data: &v1.QueryData{
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 1, T: 1000}},
Metric: labels.FromStrings("__name__", "foo"),
},
},
},
expected: &tripperware.PrometheusResponse{
Status: tripperware.StatusSuccess,
Data: tripperware.PrometheusData{
ResultType: model.ValMatrix.String(),
Result: tripperware.PrometheusQueryResult{
Result: &tripperware.PrometheusQueryResult_Matrix{
Matrix: &tripperware.Matrix{
SampleStreams: []tripperware.SampleStream{
{
Labels: []cortexpb.LabelAdapter{
{Name: "__name__", Value: "foo"},
},
Samples: []cortexpb.Sample{
{Value: 1, TimestampMs: 1000},
},
},
},
},
},
},
},
},
},
{
name: "matrix with multiple float samples",
data: &v1.QueryData{
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
Expand Down Expand Up @@ -227,6 +198,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
name: "matrix with histogram and not cortex internal",
data: &v1.QueryData{
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
Expand Down Expand Up @@ -316,6 +288,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
name: "vector with histogram and not cortex internal",
data: &v1.QueryData{
ResultType: parser.ValueTypeVector,
Result: promql.Vector{
Expand Down Expand Up @@ -404,7 +377,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
{
name: "cortex internal with native histogram",
name: "vector with histogram and cortex internal",
cortexInternal: true,
data: &v1.QueryData{
ResultType: parser.ValueTypeVector,
Expand Down
50 changes: 40 additions & 10 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"time"

jsoniter "github.com/json-iterator/go"
"github.com/munnerz/goautoneg"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc/status"

Expand All @@ -29,6 +31,9 @@ var (
SortMapKeys: true,
ValidateJsonRawMessage: false,
}.Froze()

rulerMIMEType = v1.MIMEType{Type: "application", SubType: tripperware.QueryResponseCortexMIMESubType}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do rulerMIMEType = tripperware.QueryResponseCortexMIMEType here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tripperware.QueryResponseCortexMIMEType is a string, so it is hard to use it to MIMEType directly.

jsonMIMEType = v1.MIMEType{Type: "application", SubType: "json"}
)

type instantQueryCodec struct {
Expand Down Expand Up @@ -68,12 +73,18 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
result.Stats = r.FormValue("stats")
result.Path = r.URL.Path

// Include the specified headers from http request in prometheusRequest.
for _, header := range forwardHeaders {
for h, hv := range r.Header {
if strings.EqualFold(h, header) {
result.Headers[h] = hv
break
isSourceRuler := strings.Contains(r.Header.Get("User-Agent"), tripperware.RulerUserAgent)
if isSourceRuler {
// When the source is the Ruler, then forward whole headers
result.Headers = r.Header
Copy link
Contributor

@rajagopalanand rajagopalanand Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Consider returning here vs else block

} else {
// Include the specified headers from http request in prometheusRequest.
for _, header := range forwardHeaders {
for h, hv := range r.Header {
if strings.EqualFold(h, header) {
result.Headers[h] = hv
break
}
}
}
}
Expand Down Expand Up @@ -155,7 +166,11 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
}
}

tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression)
isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent)
if !isSourceRuler {
// When the source is the Ruler, skip set header
tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression)
}

req := &http.Request{
Method: "GET",
Expand All @@ -168,7 +183,7 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
return req.WithContext(ctx), nil
}

func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) {
func (c instantQueryCodec) EncodeResponse(ctx context.Context, req *http.Request, res tripperware.Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()

Expand All @@ -180,7 +195,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
queryStats := stats.FromContext(ctx)
tripperware.SetQueryResponseStats(a, queryStats)

b, err := json.Marshal(a)
contentType, b, err := marshalResponse(a, req.Header.Get("Accept"))
Copy link
Contributor

@yeya24 yeya24 Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only try to return non JSON response format if we know the request comes from Ruler?
If the request doesn't come from Ruler I feel we want to enforce JSON response even though they specify protobuf in their protocol like Accept: protobuf, json. To reduce impact as this protobuf format is Cortex specific.

But tbh Idk if we can identify ruler requests or not. We probably have to do what you are doing here.

WDYT? @alanprot @danielblando

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yeya24
One way is to define a new MIME type like application/x-protobuf-ruler (just example) and then use it to accept header.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.. maybe we should define our internal MIME type so external calls will only get that format if they really wanna.
I would not add "ruler" in the format though. Maybe something like application/x-cortex.query.v1+proto (idk the latest best practices to define those custom Mime types :P )

Copy link
Contributor

@yeya24 yeya24 Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds worth trying. Let's hear from other maintainers, too

if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)
}
Expand All @@ -189,7 +204,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res

resp := http.Response{
Header: http.Header{
"Content-Type": []string{tripperware.ApplicationJson},
"Content-Type": []string{contentType},
},
Body: io.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
Expand Down Expand Up @@ -217,3 +232,18 @@ func decorateWithParamName(err error, field string) error {
}
return fmt.Errorf(errTmpl, field, err)
}

func marshalResponse(resp *tripperware.PrometheusResponse, acceptHeader string) (string, []byte, error) {
for _, clause := range goautoneg.ParseAccept(acceptHeader) {
if jsonMIMEType.Satisfies(clause) {
b, err := json.Marshal(resp)
return tripperware.ApplicationJson, b, err
} else if rulerMIMEType.Satisfies(clause) {
b, err := resp.Marshal()
return tripperware.QueryResponseCortexMIMEType, b, err
}
}

b, err := json.Marshal(resp)
return tripperware.ApplicationJson, b, err
}
Loading
Loading