Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Frontend: add new field for dense native histogram format #6199

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ func NewQuerierHandler(
)

// JSON codec is already installed. Install Protobuf codec to give the option for using either.
api.InstallCodec(codec.ProtobufCodec{})
api.InstallCodec(codec.ProtobufCodec{CortexInternal: false})
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
api.InstallCodec(codec.ProtobufCodec{CortexInternal: true})

router := mux.NewRouter()

Expand Down
69 changes: 46 additions & 23 deletions pkg/querier/codec/protobuf_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@ import (
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

type ProtobufCodec struct{}
type ProtobufCodec struct {
// cortexInternal enables encoding the whole native histogram data fields in response instead of keeping
// only few sparse information like the default JSON/Protobuf codec does.
// This will be used by Cortex Ruler to get native histograms data from Cortex Query Frontend because
// rule evaluation requires the full native histogram data.
CortexInternal bool
}

func (p ProtobufCodec) ContentType() v1.MIMEType {
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
if !p.CortexInternal {
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
}
// TODO: switch to use constants.
return v1.MIMEType{Type: "application", SubType: "x-cortex-query+proto"}
}

func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {
Expand All @@ -29,15 +39,15 @@ func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {

// ProtobufCodec implementation is derived from https://github.com/prometheus/prometheus/blob/main/web/api/v1/json_codec.go
func (p ProtobufCodec) Encode(resp *v1.Response) ([]byte, error) {
prometheusQueryResponse, err := createPrometheusQueryResponse(resp)
prometheusQueryResponse, err := createPrometheusQueryResponse(resp, p.CortexInternal)
if err != nil {
return []byte{}, err
}
b, err := proto.Marshal(prometheusQueryResponse)
return b, err
}

func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusResponse, error) {
func createPrometheusQueryResponse(resp *v1.Response, cortexInternal bool) (*tripperware.PrometheusResponse, error) {
var data = resp.Data.(*v1.QueryData)

var queryResult tripperware.PrometheusQueryResult
Expand All @@ -51,7 +61,10 @@ func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusRe
case model.ValVector.String():
queryResult.Result = &tripperware.PrometheusQueryResult_Vector{
Vector: &tripperware.Vector{
Samples: *getVectorSamples(data),
// cortexInternal tries to encode native histogram as dense format instead of sparse format.
// This is only used for vector response type since internal response is only available for Ruler
// client and Ruler only expects vector or scalar response type.
Samples: *getVectorSamples(data, cortexInternal),
},
}
default:
Expand Down Expand Up @@ -139,7 +152,7 @@ func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
return &sampleStreams
}

func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample {
func getVectorSamples(data *v1.QueryData, cortexInternal bool) *[]tripperware.Sample {
vectorSamplesLen := len(data.Result.(promql.Vector))
vectorSamples := make([]tripperware.Sample, vectorSamplesLen)

Expand All @@ -158,27 +171,37 @@ func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample {
}
vectorSamples[i].Labels = labels

if sample.H != nil {
bucketsLen := len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets)
if sample.H.ZeroCount > 0 {
bucketsLen = len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets) + 1
}
buckets := make([]*tripperware.HistogramBucket, bucketsLen)
it := sample.H.AllBucketIterator()
getBuckets(buckets, it)
vectorSamples[i].Histogram = &tripperware.SampleHistogramPair{
TimestampMs: sample.T,
Histogram: tripperware.SampleHistogram{
Count: sample.H.Count,
Sum: sample.H.Sum,
Buckets: buckets,
},
}
} else {
// Float samples only.
if sample.H == nil {
vectorSamples[i].Sample = &cortexpb.Sample{
TimestampMs: sample.T,
Value: sample.F,
}
continue
}

// Cortex Internal request. Encode dense float native histograms.
if cortexInternal {
hp := cortexpb.FloatHistogramToHistogramProto(sample.T, sample.H)
vectorSamples[i].RawHistogram = &hp
continue
}

// Encode sparse native histograms.
bucketsLen := len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets)
if sample.H.ZeroCount > 0 {
bucketsLen = len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets) + 1
}
buckets := make([]*tripperware.HistogramBucket, bucketsLen)
it := sample.H.AllBucketIterator()
getBuckets(buckets, it)
vectorSamples[i].Histogram = &tripperware.SampleHistogramPair{
TimestampMs: sample.T,
Histogram: tripperware.SampleHistogram{
Count: sample.H.Count,
Sum: sample.H.Sum,
Buckets: buckets,
},
}
}
return &vectorSamples
Expand Down
112 changes: 69 additions & 43 deletions pkg/querier/codec/protobuf_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,29 @@ import (
)

func TestProtobufCodec_Encode(t *testing.T) {
testFloatHistogram := &histogram.FloatHistogram{
Schema: 2,
ZeroThreshold: 0.001,
ZeroCount: 12,
Count: 10,
Sum: 20,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 1, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
PositiveBuckets: []float64{1, 2, 2, 1, 1},
NegativeBuckets: []float64{2, 1},
}
testProtoHistogram := cortexpb.FloatHistogramToHistogramProto(1000, testFloatHistogram)

tests := []struct {
data interface{}
expected *tripperware.PrometheusResponse
name string
data *v1.QueryData
cortexInternal bool
expected *tripperware.PrometheusResponse
}{
{
data: &v1.QueryData{
Expand Down Expand Up @@ -207,23 +227,8 @@ func TestProtobufCodec_Encode(t *testing.T) {
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
promql.Series{
Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{
Schema: 2,
ZeroThreshold: 0.001,
ZeroCount: 12,
Count: 10,
Sum: 20,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 1, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
PositiveBuckets: []float64{1, 2, 2, 1, 1},
NegativeBuckets: []float64{2, 1},
}, T: 1000}},
Metric: labels.FromStrings("__name__", "foo"),
Histograms: []promql.HPoint{{H: testFloatHistogram, T: 1000}},
Metric: labels.FromStrings("__name__", "foo"),
},
},
},
Expand Down Expand Up @@ -313,22 +318,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
promql.Sample{
Metric: labels.FromStrings("__name__", "foo"),
T: 1000,
H: &histogram.FloatHistogram{
Schema: 2,
ZeroThreshold: 0.001,
ZeroCount: 12,
Count: 10,
Sum: 20,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 1, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
PositiveBuckets: []float64{1, 2, 2, 1, 1},
NegativeBuckets: []float64{2, 1},
},
H: testFloatHistogram,
},
},
},
Expand Down Expand Up @@ -409,17 +399,53 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
},
{
name: "cortex internal with native histogram",
cortexInternal: true,
data: &v1.QueryData{
ResultType: parser.ValueTypeVector,
Result: promql.Vector{
promql.Sample{
Metric: labels.FromStrings("__name__", "foo"),
T: 1000,
H: testFloatHistogram,
},
},
},
expected: &tripperware.PrometheusResponse{
Status: tripperware.StatusSuccess,
Data: tripperware.PrometheusData{
ResultType: model.ValVector.String(),
Result: tripperware.PrometheusQueryResult{
Result: &tripperware.PrometheusQueryResult_Vector{
Vector: &tripperware.Vector{
Samples: []tripperware.Sample{
{
Labels: []cortexpb.LabelAdapter{
{Name: "__name__", Value: "foo"},
},
RawHistogram: &testProtoHistogram,
},
},
},
},
},
},
},
},
}

codec := ProtobufCodec{}
for _, test := range tests {
body, err := codec.Encode(&v1.Response{
Status: tripperware.StatusSuccess,
Data: test.data,
t.Run(test.name, func(t *testing.T) {
codec := ProtobufCodec{CortexInternal: test.cortexInternal}
body, err := codec.Encode(&v1.Response{
Status: tripperware.StatusSuccess,
Data: test.data,
})
require.NoError(t, err)
b, err := proto.Marshal(test.expected)
require.NoError(t, err)
require.Equal(t, string(b), string(body))
})
require.NoError(t, err)
b, err := proto.Marshal(test.expected)
require.NoError(t, err)
require.Equal(t, string(b), string(body))
}
}
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
return samples[searchResult:]
}

// sliceHistogram assumes given histogram are sorted by timestamp in ascending order and
// sliceHistograms assumes given histogram are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
Expand Down
Loading
Loading