Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Frontend: add new field for dense native histogram format #6199

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ func NewQuerierHandler(
)

// JSON codec is already installed. Install Protobuf codec to give the option for using either.
api.InstallCodec(codec.ProtobufCodec{})
api.InstallCodec(codec.ProtobufCodec{CortexInternal: false})
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
api.InstallCodec(codec.ProtobufCodec{CortexInternal: true})

router := mux.NewRouter()

Expand Down
75 changes: 49 additions & 26 deletions pkg/querier/codec/protobuf_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@ import (
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

type ProtobufCodec struct{}
type ProtobufCodec struct {
// cortexInternal enables encoding the whole native histogram data fields in response instead of keeping
// only few sparse information like the default JSON/Protobuf codec does.
// This will be used by Cortex Ruler to get native histograms data from Cortex Query Frontend because
// rule evaluation requires the full native histogram data.
CortexInternal bool
}

func (p ProtobufCodec) ContentType() v1.MIMEType {
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
if !p.CortexInternal {
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
}
// TODO: switch to use constants.
return v1.MIMEType{Type: "application", SubType: "application/x-cortex-query+proto"}
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
}

func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {
Expand All @@ -29,15 +39,15 @@ func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {

// ProtobufCodec implementation is derived from https://github.com/prometheus/prometheus/blob/main/web/api/v1/json_codec.go
func (p ProtobufCodec) Encode(resp *v1.Response) ([]byte, error) {
prometheusQueryResponse, err := createPrometheusQueryResponse(resp)
prometheusQueryResponse, err := createPrometheusQueryResponse(resp, p.CortexInternal)
if err != nil {
return []byte{}, err
}
b, err := proto.Marshal(prometheusQueryResponse)
return b, err
}

func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusResponse, error) {
func createPrometheusQueryResponse(resp *v1.Response, cortexInternal bool) (*tripperware.PrometheusResponse, error) {
var data = resp.Data.(*v1.QueryData)

var queryResult tripperware.PrometheusQueryResult
Expand All @@ -51,7 +61,10 @@ func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusRe
case model.ValVector.String():
queryResult.Result = &tripperware.PrometheusQueryResult_Vector{
Vector: &tripperware.Vector{
Samples: *getVectorSamples(data),
// cortexInternal tries to encode native histogram as dense format instead of sparse format.
// This is only used for vector response type since internal response is only available for Ruler
// client and Ruler only expects vector or scalar response type.
Samples: *getVectorSamples(data, cortexInternal),
},
}
default:
Expand Down Expand Up @@ -113,9 +126,9 @@ func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
}

histogramsLen := len(sampleStream.Histograms)
var histograms []tripperware.SampleHistogramPair
var histograms []*tripperware.SampleHistogramPair
if histogramsLen > 0 {
histograms = make([]tripperware.SampleHistogramPair, histogramsLen)
histograms = make([]*tripperware.SampleHistogramPair, histogramsLen)
for j := 0; j < histogramsLen; j++ {
bucketsLen := len(sampleStream.Histograms[j].H.NegativeBuckets) + len(sampleStream.Histograms[j].H.PositiveBuckets)
if sampleStream.Histograms[j].H.ZeroCount > 0 {
Expand All @@ -124,7 +137,7 @@ func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
buckets := make([]*tripperware.HistogramBucket, bucketsLen)
it := sampleStream.Histograms[j].H.AllBucketIterator()
getBuckets(buckets, it)
histograms[j] = tripperware.SampleHistogramPair{
histograms[j] = &tripperware.SampleHistogramPair{
TimestampMs: sampleStream.Histograms[j].T,
Histogram: tripperware.SampleHistogram{
Count: sampleStream.Histograms[j].H.Count,
Expand All @@ -139,7 +152,7 @@ func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
return &sampleStreams
}

func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample {
func getVectorSamples(data *v1.QueryData, cortexInternal bool) *[]tripperware.Sample {
vectorSamplesLen := len(data.Result.(promql.Vector))
vectorSamples := make([]tripperware.Sample, vectorSamplesLen)

Expand All @@ -158,27 +171,37 @@ func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample {
}
vectorSamples[i].Labels = labels

if sample.H != nil {
bucketsLen := len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets)
if sample.H.ZeroCount > 0 {
bucketsLen = len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets) + 1
}
buckets := make([]*tripperware.HistogramBucket, bucketsLen)
it := sample.H.AllBucketIterator()
getBuckets(buckets, it)
vectorSamples[i].Histogram = &tripperware.SampleHistogramPair{
TimestampMs: sample.T,
Histogram: tripperware.SampleHistogram{
Count: sample.H.Count,
Sum: sample.H.Sum,
Buckets: buckets,
},
}
} else {
// Float samples only.
if sample.H == nil {
vectorSamples[i].Sample = &cortexpb.Sample{
TimestampMs: sample.T,
Value: sample.F,
}
continue
}

// Cortex Internal request. Encode dense float native histograms.
if cortexInternal {
hp := cortexpb.FloatHistogramToHistogramProto(sample.T, sample.H)
vectorSamples[i].RawHistogram = &hp
continue
}

// Encode sparse native histograms.
bucketsLen := len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets)
if sample.H.ZeroCount > 0 {
bucketsLen = len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets) + 1
}
buckets := make([]*tripperware.HistogramBucket, bucketsLen)
it := sample.H.AllBucketIterator()
getBuckets(buckets, it)
vectorSamples[i].Histogram = &tripperware.SampleHistogramPair{
TimestampMs: sample.T,
Histogram: tripperware.SampleHistogram{
Count: sample.H.Count,
Sum: sample.H.Sum,
Buckets: buckets,
},
}
}
return &vectorSamples
Expand Down
114 changes: 70 additions & 44 deletions pkg/querier/codec/protobuf_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,29 @@ import (
)

func TestProtobufCodec_Encode(t *testing.T) {
testFloatHistogram := &histogram.FloatHistogram{
Schema: 2,
ZeroThreshold: 0.001,
ZeroCount: 12,
Count: 10,
Sum: 20,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 1, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
PositiveBuckets: []float64{1, 2, 2, 1, 1},
NegativeBuckets: []float64{2, 1},
}
testProtoHistogram := cortexpb.FloatHistogramToHistogramProto(1000, testFloatHistogram)

tests := []struct {
data interface{}
expected *tripperware.PrometheusResponse
name string
data *v1.QueryData
cortexInternal bool
expected *tripperware.PrometheusResponse
}{
{
data: &v1.QueryData{
Expand Down Expand Up @@ -207,23 +227,8 @@ func TestProtobufCodec_Encode(t *testing.T) {
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
promql.Series{
Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{
Schema: 2,
ZeroThreshold: 0.001,
ZeroCount: 12,
Count: 10,
Sum: 20,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 1, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
PositiveBuckets: []float64{1, 2, 2, 1, 1},
NegativeBuckets: []float64{2, 1},
}, T: 1000}},
Metric: labels.FromStrings("__name__", "foo"),
Histograms: []promql.HPoint{{H: testFloatHistogram, T: 1000}},
Metric: labels.FromStrings("__name__", "foo"),
},
},
},
Expand All @@ -239,7 +244,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
Labels: []cortexpb.LabelAdapter{
{Name: "__name__", Value: "foo"},
},
Histograms: []tripperware.SampleHistogramPair{
Histograms: []*tripperware.SampleHistogramPair{
{
TimestampMs: 1000,
Histogram: tripperware.SampleHistogram{
Expand Down Expand Up @@ -313,22 +318,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
promql.Sample{
Metric: labels.FromStrings("__name__", "foo"),
T: 1000,
H: &histogram.FloatHistogram{
Schema: 2,
ZeroThreshold: 0.001,
ZeroCount: 12,
Count: 10,
Sum: 20,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 1, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
PositiveBuckets: []float64{1, 2, 2, 1, 1},
NegativeBuckets: []float64{2, 1},
},
H: testFloatHistogram,
},
},
},
Expand Down Expand Up @@ -409,17 +399,53 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
},
{
name: "cortex internal with native histogram",
cortexInternal: true,
data: &v1.QueryData{
ResultType: parser.ValueTypeVector,
Result: promql.Vector{
promql.Sample{
Metric: labels.FromStrings("__name__", "foo"),
T: 1000,
H: testFloatHistogram,
},
},
},
expected: &tripperware.PrometheusResponse{
Status: tripperware.StatusSuccess,
Data: tripperware.PrometheusData{
ResultType: model.ValVector.String(),
Result: tripperware.PrometheusQueryResult{
Result: &tripperware.PrometheusQueryResult_Vector{
Vector: &tripperware.Vector{
Samples: []tripperware.Sample{
{
Labels: []cortexpb.LabelAdapter{
{Name: "__name__", Value: "foo"},
},
RawHistogram: &testProtoHistogram,
},
},
},
},
},
},
},
},
}

codec := ProtobufCodec{}
for _, test := range tests {
body, err := codec.Encode(&v1.Response{
Status: tripperware.StatusSuccess,
Data: test.data,
t.Run(test.name, func(t *testing.T) {
codec := ProtobufCodec{CortexInternal: test.cortexInternal}
body, err := codec.Encode(&v1.Response{
Status: tripperware.StatusSuccess,
Data: test.data,
})
require.NoError(t, err)
b, err := proto.Marshal(test.expected)
require.NoError(t, err)
require.Equal(t, string(b), string(body))
})
require.NoError(t, err)
b, err := proto.Marshal(test.expected)
require.NoError(t, err)
require.Equal(t, string(b), string(body))
}
}
32 changes: 31 additions & 1 deletion pkg/querier/tripperware/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,18 @@ func mergeSampleStreams(output map[string]SampleStream, sampleStreams []SampleSt
stream.Histograms = sliceHistograms(stream.Histograms, existingEndTs)
}
}
// Same for above.
if len(existing.RawHistograms) > 0 && len(stream.RawHistograms) > 0 {
existingEndTs := existing.RawHistograms[len(existing.RawHistograms)-1].GetTimestampMs()
if existingEndTs == stream.RawHistograms[0].GetTimestampMs() {
stream.RawHistograms = stream.RawHistograms[1:]
} else if existingEndTs > stream.RawHistograms[0].GetTimestampMs() {
stream.RawHistograms = sliceRawHistograms(stream.RawHistograms, existingEndTs)
}
}
existing.Samples = append(existing.Samples, stream.Samples...)
existing.Histograms = append(existing.Histograms, stream.Histograms...)
existing.RawHistograms = append(existing.RawHistograms, stream.RawHistograms...)

output[metric] = existing
}
Expand Down Expand Up @@ -404,7 +414,27 @@ func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
func sliceHistograms(histograms []SampleHistogramPair, minTs int64) []SampleHistogramPair {
func sliceHistograms(histograms []*SampleHistogramPair, minTs int64) []*SampleHistogramPair {
if len(histograms) <= 0 || minTs < histograms[0].GetTimestampMs() {
return histograms
}

if len(histograms) > 0 && minTs > histograms[len(histograms)-1].GetTimestampMs() {
return histograms[len(histograms):]
}

searchResult := sort.Search(len(histograms), func(i int) bool {
return histograms[i].GetTimestampMs() > minTs
})

return histograms[searchResult:]
}

// sliceRawHistograms assumes given histogram are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
func sliceRawHistograms(histograms []*cortexpb.Histogram, minTs int64) []*cortexpb.Histogram {
if len(histograms) <= 0 || minTs < histograms[0].GetTimestampMs() {
return histograms
}
Expand Down
Loading
Loading