Skip to content

Commit

Permalink
add new histogram field for full native histogram representation
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Dec 3, 2024
1 parent bc1b5be commit a153624
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 126 deletions.
32 changes: 31 additions & 1 deletion pkg/querier/tripperware/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,18 @@ func mergeSampleStreams(output map[string]SampleStream, sampleStreams []SampleSt
stream.Histograms = sliceHistograms(stream.Histograms, existingEndTs)
}
}
// Same for above.
if len(existing.RawHistograms) > 0 && len(stream.RawHistograms) > 0 {
existingEndTs := existing.RawHistograms[len(existing.RawHistograms)-1].GetTimestampMs()
if existingEndTs == stream.RawHistograms[0].GetTimestampMs() {
stream.RawHistograms = stream.RawHistograms[1:]
} else if existingEndTs > stream.RawHistograms[0].GetTimestampMs() {
stream.RawHistograms = sliceRawHistograms(stream.RawHistograms, existingEndTs)
}
}
existing.Samples = append(existing.Samples, stream.Samples...)
existing.Histograms = append(existing.Histograms, stream.Histograms...)
existing.RawHistograms = append(existing.RawHistograms, stream.RawHistograms...)

output[metric] = existing
}
Expand Down Expand Up @@ -404,7 +414,27 @@ func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
func sliceHistograms(histograms []SampleHistogramPair, minTs int64) []SampleHistogramPair {
func sliceHistograms(histograms []*SampleHistogramPair, minTs int64) []*SampleHistogramPair {
if len(histograms) <= 0 || minTs < histograms[0].GetTimestampMs() {
return histograms
}

if len(histograms) > 0 && minTs > histograms[len(histograms)-1].GetTimestampMs() {
return histograms[len(histograms):]
}

searchResult := sort.Search(len(histograms), func(i int) bool {
return histograms[i].GetTimestampMs() > minTs
})

return histograms[searchResult:]
}

// sliceRawHistograms assumes given histogram are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
func sliceRawHistograms(histograms []*cortexpb.Histogram, minTs int64) []*cortexpb.Histogram {
if len(histograms) <= 0 || minTs < histograms[0].GetTimestampMs() {
return histograms
}
Expand Down
60 changes: 30 additions & 30 deletions pkg/querier/tripperware/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ func TestMergeSampleStreams(t *testing.T) {
sampleStreams: []SampleStream{
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
},
expectedOutput: map[string]SampleStream{
ingester_client.LabelsToKeyString(lbls): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
Expand All @@ -122,7 +122,7 @@ func TestMergeSampleStreams(t *testing.T) {
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
},
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
Expand All @@ -133,7 +133,7 @@ func TestMergeSampleStreams(t *testing.T) {
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
},
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
Expand Down Expand Up @@ -176,15 +176,15 @@ func TestMergeSampleStreams(t *testing.T) {
sampleStreams: []SampleStream{
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
Expand All @@ -194,7 +194,7 @@ func TestMergeSampleStreams(t *testing.T) {
expectedOutput: map[string]SampleStream{
ingester_client.LabelsToKeyString(lbls): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand All @@ -213,7 +213,7 @@ func TestMergeSampleStreams(t *testing.T) {
{Value: 2, TimestampMs: 2},
{Value: 3, TimestampMs: 3},
},
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand All @@ -226,7 +226,7 @@ func TestMergeSampleStreams(t *testing.T) {
{Value: 1, TimestampMs: 1},
{Value: 4, TimestampMs: 4},
},
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
Expand All @@ -242,7 +242,7 @@ func TestMergeSampleStreams(t *testing.T) {
{Value: 3, TimestampMs: 3},
{Value: 4, TimestampMs: 4},
},
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand Down Expand Up @@ -310,30 +310,30 @@ func TestMergeSampleStreams(t *testing.T) {
sampleStreams: []SampleStream{
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram2, TimestampMs: 1},
{Histogram: testHistogram2, TimestampMs: 4},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls1),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls1),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram2, TimestampMs: 2},
{Histogram: testHistogram2, TimestampMs: 3},
},
Expand All @@ -342,7 +342,7 @@ func TestMergeSampleStreams(t *testing.T) {
expectedOutput: map[string]SampleStream{
ingester_client.LabelsToKeyString(lbls): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand All @@ -351,7 +351,7 @@ func TestMergeSampleStreams(t *testing.T) {
},
ingester_client.LabelsToKeyString(lbls1): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls1),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
Expand Down Expand Up @@ -453,21 +453,21 @@ func TestSliceHistograms(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
name string
histograms []SampleHistogramPair
histograms []*SampleHistogramPair
minTs int64
expectedHistograms []SampleHistogramPair
expectedHistograms []*SampleHistogramPair
}{
{name: "empty histograms"},
{
name: "minTs smaller than first histogram's timestamp",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
},
},
minTs: 0,
expectedHistograms: []SampleHistogramPair{
expectedHistograms: []*SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
Expand All @@ -476,7 +476,7 @@ func TestSliceHistograms(t *testing.T) {
},
{
name: "input histograms are not sorted, return all histograms",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 3,
Histogram: testHistogram1,
Expand All @@ -487,7 +487,7 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 2,
expectedHistograms: []SampleHistogramPair{
expectedHistograms: []*SampleHistogramPair{
{
TimestampMs: 3,
Histogram: testHistogram1,
Expand All @@ -500,7 +500,7 @@ func TestSliceHistograms(t *testing.T) {
},
{
name: "minTs greater than the last histogram's timestamp",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
Expand All @@ -511,11 +511,11 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 3,
expectedHistograms: []SampleHistogramPair{},
expectedHistograms: []*SampleHistogramPair{},
},
{
name: "input histograms not sorted, minTs greater than the last histogram's timestamp",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 0,
Histogram: testHistogram1,
Expand All @@ -530,11 +530,11 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 2,
expectedHistograms: []SampleHistogramPair{},
expectedHistograms: []*SampleHistogramPair{},
},
{
name: "input histograms are sorted",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 2,
Histogram: testHistogram1,
Expand All @@ -549,7 +549,7 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 1,
expectedHistograms: []SampleHistogramPair{
expectedHistograms: []*SampleHistogramPair{
{
TimestampMs: 2,
Histogram: testHistogram1,
Expand All @@ -566,7 +566,7 @@ func TestSliceHistograms(t *testing.T) {
},
{
name: "input histograms are sorted, get sliced histograms",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
Expand All @@ -581,7 +581,7 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 2,
expectedHistograms: []SampleHistogramPair{
expectedHistograms: []*SampleHistogramPair{
{
TimestampMs: 3,
Histogram: testHistogram1,
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/tripperware/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func decodeSampleStream(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
for iter.ReadArray() {
h := SampleHistogramPair{}
UnmarshalSampleHistogramPairJSON(unsafe.Pointer(&h), iter)
ss.Histograms = append(ss.Histograms, h)
ss.Histograms = append(ss.Histograms, &h)
}
default:
iter.ReportError("unmarshal SampleStream", fmt.Sprint("unexpected key:", field))
Expand Down Expand Up @@ -323,7 +323,7 @@ func encodeSampleStream(ptr unsafe.Pointer, stream *jsoniter.Stream) {
if i > 0 {
stream.WriteMore()
}
MarshalSampleHistogramPairJSON(unsafe.Pointer(&h), stream)
MarshalSampleHistogramPairJSON(unsafe.Pointer(h), stream)
}
stream.WriteArrayEnd()
}
Expand Down
Loading

0 comments on commit a153624

Please sign in to comment.